module Ganeti.JQScheduler.Filtering
( applyingFilter
, jobFiltering
, matchPredicate
, matches
) where
import qualified Data.ByteString as BS
import Data.List
import Data.Maybe
import qualified Data.Map as Map
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Constants (opcodeReasonSrcRlib2, opcodeReasonAuthUser)
import Ganeti.Errors
import Ganeti.Lens hiding (chosen)
import Ganeti.JQScheduler.Types
import Ganeti.JQueue (QueuedJob(..))
import Ganeti.JQueue.Lens
import Ganeti.JSON
import Ganeti.Objects (FilterRule(..), FilterAction(..), FilterPredicate(..),
filterRuleOrder)
import Ganeti.OpCodes (OpCode)
import Ganeti.OpCodes.Lens
import Ganeti.Query.Language
import Ganeti.Query.Filter (evaluateFilterM, evaluateFilterJSON, Comparator,
FilterOp(..), toCompFun)
import Ganeti.SlotMap
import Ganeti.Types (JobId(..), ReasonElem)
accessOpCodeField :: OpCode -> String -> ErrorResult J.JSValue
accessOpCodeField opc s = case nestedAccessByKeyDotted s (J.showJSON opc) of
J.Ok x -> Ok x
J.Error e -> Bad . ParameterError $ e
opCodesOf :: QueuedJob -> [OpCode]
opCodesOf job =
job ^.. qjOpsL . traverse . qoInputL . validOpCodeL . metaOpCodeL
reasonsOf :: QueuedJob -> [ReasonElem]
reasonsOf job = job ^.. qjOpsL . traverse . qoInputL . validOpCodeL
. metaParamsL . opReasonL . traverse
userOf :: QueuedJob -> String -> String
userOf job default_user =
foldl extractRapiUser default_user $ reasonsOf job
where extractRapiUser current_user (source, reason, _) =
if source == opcodeReasonSrcRlib2
then fromMaybe current_user (stripPrefix opcodeReasonAuthUser reason)
else current_user
evaluateFilterComparator :: (Ord field)
=> Filter field
-> (Comparator -> field -> FilterValue -> Maybe Bool)
-> Bool
evaluateFilterComparator fil opFun =
fromMaybe False $
evaluateFilterM
(\filterOp -> case filterOp of
Comp cmp -> opFun (toCompFun cmp)
_ -> \_ _ -> Nothing
)
fil
matchPredicate :: QueuedJob
-> JobId
-> FilterPredicate
-> Bool
matchPredicate job watermark predicate = case predicate of
FPJobId fil ->
let jid = qjId job
jidInt = fromIntegral (fromJobId jid)
in evaluateFilterComparator fil $ \comp field val -> case field of
"id" -> case val of
NumericValue i -> Just $ jidInt `comp` i
QuotedString "watermark" -> Just $ jid `comp` watermark
QuotedString _ -> Nothing
_ -> Nothing
FPOpCode fil ->
let opMatches opc = genericResult (const False) id $ do
jsonFilter <- traverse (accessOpCodeField opc) fil
evaluateFilterJSON jsonFilter
in any opMatches (opCodesOf job)
FPReason fil ->
let reasonMatches (source, reason, timestamp) =
evaluateFilterComparator fil $ \comp field val -> case field of
"source" -> Just $ QuotedString source `comp` val
"reason" -> Just $ QuotedString reason `comp` val
"timestamp" -> Just $ NumericValue timestamp `comp` val
_ -> Nothing
in any reasonMatches (reasonsOf job)
FPUser fil -> evaluateFilterComparator fil $ \comp field val -> case field of
"user" -> Just $ (QuotedString $ userOf job "") `comp` val
_ -> Nothing
matches :: QueuedJob -> FilterRule -> Bool
matches job FilterRule{ frPredicates, frWatermark } =
all (matchPredicate job frWatermark) frPredicates
orderFilters :: Set FilterRule -> [FilterRule]
orderFilters = sortBy filterRuleOrder . Set.toList
applyingFilter :: Set FilterRule -> QueuedJob -> Maybe FilterRule
applyingFilter filters job =
find ((Continue /=) . frAction)
. filter (matches job)
. orderFilters
$ filters
type RateLimitSlotMap = SlotMap BS.ByteString
data FilterChainState = FilterChainState
{ rateLimitSlotMap :: RateLimitSlotMap
} deriving (Eq, Ord, Show)
tryFitSlots :: FilterChainState
-> CountMap BS.ByteString
-> Maybe FilterChainState
tryFitSlots st@FilterChainState{ rateLimitSlotMap = slotMap } countMap =
if slotMap `hasSlotsFor` countMap
then Just st{ rateLimitSlotMap = slotMap `occupySlots` countMap }
else Nothing
queueRateLimitSlotMap :: Queue -> Set FilterRule -> RateLimitSlotMap
queueRateLimitSlotMap queue filters =
let
emptyFilterSlots =
Map.fromList
[ (uuid, Slot 0 n)
| FilterRule{ frUuid = uuid
, frAction = RateLimit n } <- Set.toList filters ]
runningJobSlots = Map.fromListWith (+)
[ (frUuid, 1) | Just FilterRule{ frUuid, frAction = RateLimit _ } <-
map (applyingFilter filters . jJob)
$ qRunning queue ++ qManipulated queue ]
in
emptyFilterSlots `occupySlots` runningJobSlots
jobFiltering :: Queue -> Set FilterRule -> [JobWithStat] -> [JobWithStat]
jobFiltering queue filters =
let
processFilters :: FilterChainState
-> JobWithStat
-> (FilterChainState, Maybe JobWithStat)
processFilters state job =
case applyingFilter filters (jJob job) of
Nothing -> (state, Just job)
Just FilterRule{ frUuid, frAction } -> case frAction of
Accept -> (state, Just job)
Continue -> (state, Just job)
Pause -> (state, Nothing)
Reject -> (state, Nothing)
RateLimit _ ->
let jobSlots = Map.fromList [(frUuid, 1)]
in case tryFitSlots state jobSlots of
Nothing -> (state, Nothing)
Just state' -> (state', Just job)
in catMaybes . snd . mapAccumL processFilters FilterChainState
{ rateLimitSlotMap = queueRateLimitSlotMap queue filters
}