module Test.Ganeti.JQScheduler (testJQScheduler) where
import Control.Applicative
import Control.Lens ((&), (.~), _2)
import qualified Data.ByteString.UTF8 as UTF8
import Data.List (inits)
import Data.Maybe
import qualified Data.Map as Map
import Data.Set (Set, difference)
import qualified Data.Set as Set
import Data.Traversable (traverse)
import Text.JSON (JSValue(..))
import Test.HUnit
import Test.QuickCheck
import Test.Ganeti.JQueue.Objects (genQueuedOpCode, genJobId, justNoTs)
import Test.Ganeti.SlotMap (genTestKey, overfullKeys)
import Test.Ganeti.TestCommon
import Test.Ganeti.TestHelper
import Test.Ganeti.Types ()
import Ganeti.JQScheduler.Filtering
import Ganeti.JQScheduler.ReasonRateLimiting
import Ganeti.JQScheduler.Types
import Ganeti.JQueue.Lens
import Ganeti.JQueue.Objects
import Ganeti.Objects (FilterRule(..), FilterPredicate(..), FilterAction(..),
filterRuleOrder)
import Ganeti.OpCodes
import Ganeti.OpCodes.Lens
import Ganeti.Query.Language (Filter(..), FilterValue(..))
import Ganeti.SlotMap
import Ganeti.Types
import Ganeti.Utils (isSubsequenceOf, newUUID)
genRateLimitReason :: Gen String
genRateLimitReason = do
Slot{ slotLimit = n } <- arbitrary
l <- genTestKey
return $ "rate-limit:" ++ show n ++ ":" ++ l
instance Arbitrary QueuedJob where
arbitrary = do
jid <- genJobId
ops <- resize 5 . listOf1 $ do
o <- genQueuedOpCode
limitString <- genRateLimitReason
return $
o & qoInputL . validOpCodeL . metaParamsL . opReasonL . traverse . _2
.~ limitString
return $ QueuedJob jid ops justNoTs justNoTs justNoTs Nothing Nothing
instance Arbitrary JobWithStat where
arbitrary = nullJobWithStat <$> arbitrary
shrink job = [ job { jJob = x } | x <- shrink (jJob job) ]
instance Arbitrary Queue where
arbitrary = do
let genJobsUniqueJIDs :: [JobWithStat] -> Gen [JobWithStat]
genJobsUniqueJIDs = listOfUniqueBy arbitrary (qjId . jJob)
queued <- genJobsUniqueJIDs []
running <- genJobsUniqueJIDs queued
manip <- genJobsUniqueJIDs (queued ++ running)
return $ Queue queued running manip
shrink q =
[ q { qEnqueued = x } | x <- shrink (qEnqueued q) ] ++
[ q { qRunning = x } | x <- shrink (qRunning q) ] ++
[ q { qManipulated = x } | x <- shrink (qManipulated q) ]
case_parseReasonRateLimit :: Assertion
case_parseReasonRateLimit = do
assertBool "default case" $
let a = parseReasonRateLimit "rate-limit:20:my label"
b = parseReasonRateLimit "rate-limit:21:my label"
in and
[ a == Just ("20:my label", 20)
, b == Just ("21:my label", 21)
]
assertEqual "be picky about whitespace"
Nothing
(parseReasonRateLimit " rate-limit:20:my label")
prop_slotMapFromJob_conflicting_buckets :: Property
prop_slotMapFromJob_conflicting_buckets = do
let sameBucketReasonStringGen :: Gen (String, String)
sameBucketReasonStringGen = do
(Positive (n :: Int), Positive (m :: Int)) <- arbitrary
l <- genPrintableAsciiString
return ( "rate-limit:" ++ show n ++ ":" ++ l
, "rate-limit:" ++ show m ++ ":" ++ l )
forAll sameBucketReasonStringGen $ \(s1, s2) ->
(s1 /= s2) ==> do
(lab1, lim1) <- parseReasonRateLimit s1
(lab2, _ ) <- parseReasonRateLimit s2
let sm = Map.fromList [(lab1, Slot 1 lim1)]
cm = Map.fromList [(lab2, 1)]
in return $
(sm `occupySlots` cm) ==? Map.fromList [ (lab1, Slot 1 lim1)
, (lab2, Slot 1 0)
] :: Gen Property
case_reasonRateLimit :: Assertion
case_reasonRateLimit = do
let mkJobWithReason jobNum reasonTrail = do
opc <- genSample genQueuedOpCode
jid <- makeJobId jobNum
let opc' = opc & (qoInputL . validOpCodeL . metaParamsL . opReasonL)
.~ reasonTrail
return . nullJobWithStat
$ QueuedJob
{ qjId = jid
, qjOps = [opc']
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
, qjLivelock = Nothing
, qjProcessId = Nothing
}
j1 <- mkJobWithReason 1 [("source1", "rate-limit:2:hello", 0)]
j2 <- mkJobWithReason 2 [("source1", "rate-limit:2:hello", 0)]
j3 <- mkJobWithReason 3 [("source1", "rate-limit:2:hello", 0)]
assertEqual "[j1] should not be rate-limited"
[j1]
(reasonRateLimit (Queue [j1] [] []) [j1])
assertEqual "[j1, j2] should not be rate-limited"
[j1, j2]
(reasonRateLimit (Queue [j1, j2] [] []) [j1, j2])
assertEqual "j3 should be rate-limited 1"
[j1, j2]
(reasonRateLimit (Queue [j1, j2, j3] [] []) [j1, j2, j3])
assertEqual "j3 should be rate-limited 2"
[j2]
(reasonRateLimit (Queue [j2, j3] [j1] []) [j2, j3])
assertEqual "j3 should be rate-limited 3"
[]
(reasonRateLimit (Queue [j3] [j1] [j2]) [j3])
prop_reasonRateLimit :: Property
prop_reasonRateLimit =
forAllShrink arbitrary shrink $ \q ->
let slotMapFromJobWithStat = slotMapFromJobs . map jJob
enqueued = qEnqueued q
toRun = reasonRateLimit q enqueued
oldSlots = slotMapFromJobWithStat (qRunning q)
newSlots = slotMapFromJobWithStat (qRunning q ++ toRun)
newSlotsNoLimits = slotMapFromJobWithStat (qRunning q ++ enqueued)
in
cover
(any ((> 1) . slotOccupied) . Map.elems $ newSlotsNoLimits)
50
"some jobs have the same rate-limit bucket"
. cover
(overfullKeys newSlotsNoLimits
`difference` overfullKeys oldSlots /= Set.empty)
50
"queued jobs cannot be started because of rate limiting"
$ conjoin
[ counterexample "scheduled jobs must be subsequence" $
toRun `isSubsequenceOf` enqueued
, counterexample "no job may exceed its bucket limits, except from\
\ jobs that were already running with exceeded\
\ limits; those must not increase" $
conjoin
[ if occup <= limit
then passTest
else Map.lookup k oldSlots ==? Just slot
| (k, slot@(Slot occup limit)) <- Map.toList newSlots ]
]
prop_filterRuleOrder :: Property
prop_filterRuleOrder = property $ do
a <- arbitrary
b <- arbitrary `suchThat` ((frUuid a /=) . frUuid)
return $ filterRuleOrder a b ==? (frPriority a, frWatermark a, frUuid a)
`compare`
(frPriority b, frWatermark b, frUuid b)
case_matchPredicate :: Assertion
case_matchPredicate = do
jid1 <- makeJobId 1
clusterName <- mkNonEmpty "cluster1"
let job =
QueuedJob
{ qjId = jid1
, qjOps =
[ QueuedOpCode
{ qoInput = ValidOpCode MetaOpCode
{ metaParams = CommonOpParams
{ opDryRun = Nothing
, opDebugLevel = Nothing
, opPriority = OpPrioHigh
, opDepends = Just []
, opComment = Nothing
, opReason = [("source1", "reason1", 1234)]
}
, metaOpCode = OpClusterRename
{ opName = clusterName
}
}
, qoStatus = OP_STATUS_QUEUED
, qoResult = JSNull
, qoLog = []
, qoPriority = 1
, qoStartTimestamp = Nothing
, qoExecTimestamp = Nothing
, qoEndTimestamp = Nothing
}
]
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
, qjLivelock = Nothing
, qjProcessId = Nothing
}
let watermark = jid1
check = matchPredicate job watermark
assertEqual "matching jobid filter"
True
. check $ FPJobId (EQFilter "id" (NumericValue 1))
assertEqual "non-matching jobid filter"
False
. check $ FPJobId (EQFilter "id" (NumericValue 2))
assertEqual "non-matching jobid filter (string passed)"
False
. check $ FPJobId (EQFilter "id" (QuotedString "1"))
assertEqual "matching jobid watermark filter"
True
. check $ FPJobId (EQFilter "id" (QuotedString "watermark"))
assertEqual "matching opcode filter (type of opcode)"
True
. check $ FPOpCode (EQFilter "OP_ID" (QuotedString "OP_CLUSTER_RENAME"))
assertEqual "non-matching opcode filter (type of opcode)"
False
. check $ FPOpCode (EQFilter "OP_ID" (QuotedString "OP_INSTANCE_CREATE"))
assertEqual "matching opcode filter (nested access)"
True
. check $ FPOpCode (EQFilter "name" (QuotedString "cluster1"))
assertEqual "non-matching opcode filter (nonexistent nested access)"
False
. check $ FPOpCode (EQFilter "something" (QuotedString "cluster1"))
assertEqual "matching reason filter (reason field)"
True
. check $ FPReason (EQFilter "reason" (QuotedString "reason1"))
assertEqual "non-matching reason filter (reason field)"
False
. check $ FPReason (EQFilter "reason" (QuotedString "reasonGarbage"))
assertEqual "matching reason filter (source field)"
True
. check $ FPReason (EQFilter "source" (QuotedString "source1"))
assertEqual "matching reason filter (timestamp field)"
True
. check $ FPReason (EQFilter "timestamp" (NumericValue 1234))
assertEqual "non-matching reason filter (nonexistent field)"
False
. check $ FPReason (EQFilter "something" (QuotedString ""))
prop_applyingFilter :: Property
prop_applyingFilter =
forAllShrink arbitrary shrink $ \(job, filters) ->
let applying = applyingFilter (Set.fromList filters) job
in isJust applying ==> case applying of
Just f -> job `matches` f && frAction f /= Continue
Nothing -> True
case_jobFiltering :: Assertion
case_jobFiltering = do
clusterName <- mkNonEmpty "cluster1"
jid1 <- makeJobId 1
jid2 <- makeJobId 2
jid3 <- makeJobId 3
jid4 <- makeJobId 4
unsetPrio <- mkNonNegative 1234
uuid1 <- fmap UTF8.fromString newUUID
let j1 =
nullJobWithStat QueuedJob
{ qjId = jid1
, qjOps =
[ QueuedOpCode
{ qoInput = ValidOpCode MetaOpCode
{ metaParams = CommonOpParams
{ opDryRun = Nothing
, opDebugLevel = Nothing
, opPriority = OpPrioHigh
, opDepends = Just []
, opComment = Nothing
, opReason = [("source1", "reason1", 1234)]}
, metaOpCode = OpClusterRename
{ opName = clusterName
}
}
, qoStatus = OP_STATUS_QUEUED
, qoResult = JSNull
, qoLog = []
, qoPriority = 1
, qoStartTimestamp = Nothing
, qoExecTimestamp = Nothing
, qoEndTimestamp = Nothing
}
]
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
, qjLivelock = Nothing
, qjProcessId = Nothing
}
j2 = j1 & jJobL . qjIdL .~ jid2
j3 = j1 & jJobL . qjIdL .~ jid3
j4 = j1 & jJobL . qjIdL .~ jid4
fr1 =
FilterRule
{ frWatermark = jid1
, frPriority = unsetPrio
, frPredicates = [FPJobId (EQFilter "id" (NumericValue 1))]
, frAction = Reject
, frReasonTrail = []
, frUuid = uuid1
}
rule fr = do
uuid <- fmap UTF8.fromString newUUID
return fr{ frUuid = uuid }
chain :: [FilterRule] -> Set FilterRule
chain frs
| any ((/= unsetPrio) . frPriority) frs =
error "Filter was passed to `chain` that already had a priority."
| otherwise =
Set.fromList
[ fr{ frPriority = prio }
| (fr, Just prio) <- zip frs (map mkNonNegative [1..]) ]
fr2 <- rule fr1{ frAction = Accept }
fr3 <- rule fr1{ frAction = Pause }
fr4 <- rule fr1{ frPredicates =
[FPJobId (GTFilter "id" (QuotedString "watermark"))]
}
fr5 <- rule fr1{ frPredicates = [] }
fr6 <- rule fr5{ frAction = Continue }
fr7 <- rule fr6{ frAction = RateLimit 2 }
fr8 <- rule fr4{ frAction = Continue, frWatermark = jid1 }
fr9 <- rule fr8{ frAction = RateLimit 2 }
assertEqual "j1 should be rejected (by fr1)"
[]
(jobFiltering (Queue [j1] [] []) (chain [fr1]) [j1])
assertEqual "j1 should be rejected (by fr1, it has priority)"
[]
(jobFiltering (Queue [j1] [] []) (chain [fr1, fr2]) [j1])
assertEqual "j1 should be accepted (by fr2, it has priority)"
[j1]
(jobFiltering (Queue [j1] [] []) (chain [fr2, fr1]) [j1])
assertEqual "j1 should be paused (by fr3)"
[]
(jobFiltering (Queue [j1] [] []) (chain [fr3]) [j1])
assertEqual "j2 should be rejected (over watermark1)"
[j1]
(jobFiltering (Queue [j1, j2] [] []) (chain [fr4]) [j1, j2])
assertEqual "all jobs should be rejected (since no predicates)"
[]
(jobFiltering (Queue [j1, j2] [] []) (chain [fr5]) [j1, j2])
assertEqual "j3 should be rate-limited"
[j1, j2]
(jobFiltering (Queue [j1, j2, j3] [] []) (chain [fr6, fr7]) [j1, j2, j3])
assertEqual "j4 should be rate-limited"
[j1, j2, j3]
(jobFiltering (Queue [j1, j2, j3, j4] [] []) (chain [fr8, fr9])
[j1, j2, j3, j4])
prop_jobFiltering :: Property
prop_jobFiltering =
forAllShrink arbitrary shrink $ \q ->
forAllShrink (resize 4 arbitrary) shrink $ \(NonEmpty filterList) ->
let running = qRunning q ++ qManipulated q
enqueued = qEnqueued q
filters = Set.fromList filterList
toRun = jobFiltering q filters enqueued
exceeds :: Int -> FilterRule -> [JobWithStat] -> Bool
exceeds n fr jobs =
n < (length
. filter ((frUuid fr ==) . frUuid)
. mapMaybe (applyingFilter filters)
$ map jJob jobs)
actionName = head . words . show
allActions = map actionName [ Accept, Continue, Pause, Reject
, RateLimit 0 ]
applyingActions = map (actionName . frAction)
. mapMaybe (applyingFilter filters)
$ map jJob enqueued
perc = 4
actionCovers =
foldr (.) id
[ stableCover (a `elem` applyingActions) perc ("is " ++ a)
| a <- allActions ]
in (enqueued /= []) ==> actionCovers $ conjoin
[ counterexample "scheduled jobs must be subsequence" $
toRun `isSubsequenceOf` enqueued
, counterexample "a reason for each job (not) being scheduled" .
flip all enqueued $ \job ->
case applyingFilter filters (jJob job) of
Nothing -> job `elem` toRun
Just fr@FilterRule{ frAction } -> case frAction of
Accept -> job `elem` toRun
Continue -> error "must not happen"
Pause -> job `notElem` toRun
Reject -> job `notElem` toRun
RateLimit n ->
let
jobsBefore = takeWhile (/= job) enqueued
in if job `elem` toRun
then not . exceeds n fr $ running
++ jobsBefore ++ [job]
else any (exceeds n fr . (running ++))
(inits $ jobsBefore ++ [job])
]
testSuite "JQScheduler"
[ 'case_parseReasonRateLimit
, 'prop_slotMapFromJob_conflicting_buckets
, 'case_reasonRateLimit
, 'prop_reasonRateLimit
, 'prop_filterRuleOrder
, 'case_matchPredicate
, 'prop_applyingFilter
, 'case_jobFiltering
, 'prop_jobFiltering
]