module Ganeti.JQScheduler
( JQStatus
, jqLivelock
, jqForkLock
, emptyJQStatus
, selectJobsToRun
, scheduleSomeJobs
, initJQScheduler
, enqueueNewJobs
, dequeueJob
, setJobPriority
, cleanupIfDead
, configChangeNeedsRescheduling
) where
import Control.Applicative (liftA2, (<$>))
import Control.Arrow
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Function (on)
import Data.Functor ((<$))
import Data.IORef
import Data.List
import Data.Maybe
import qualified Data.Map as Map
import Data.Ord (comparing)
import Data.Set (Set)
import qualified Data.Set as S
import System.INotify
import Ganeti.BasicTypes
import Ganeti.Constants as C
import Ganeti.Errors
import Ganeti.JQScheduler.Filtering (applyingFilter, jobFiltering)
import Ganeti.JQScheduler.Types
import Ganeti.JQScheduler.ReasonRateLimiting (reasonRateLimit)
import Ganeti.JQueue as JQ
import Ganeti.JSON (fromContainer)
import Ganeti.Lens hiding (chosen)
import Ganeti.Logging
import Ganeti.Objects
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Livelock
import Ganeti.Utils.MVarLock
data JQStatus = JQStatus
{ jqJobs :: IORef Queue
, jqConfig :: IORef (Result ConfigData)
, jqLivelock :: Livelock
, jqForkLock :: Lock
}
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
(_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
forkLock <- newLock
return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
, jqForkLock = forkLock }
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onRunningJobs = over qRunningL
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onQueuedJobs = over qEnqueuedL
unreadJob :: QueuedJob -> JobWithStat
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
watchInterval :: Int
watchInterval = C.luxidJobqueuePollInterval * 1000000
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
getConfigValue param defaultvalue =
liftM (genericResult (const defaultvalue) (param . configCluster))
. readIORef . jqConfig
getMaxRunningJobs :: JQStatus -> IO Int
getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
getMaxTrackedJobs :: JQStatus -> IO Int
getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
getRQL :: JQStatus -> IO Int
getRQL = liftM (length . qRunning) . readIORef . jqJobs
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f)
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
let jid = qjId job
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Checking if " ++ fpath ++ " changed on disk."
changedResult <- try $ needsReload fstat fpath
:: IO (Either IOError (Maybe FStat))
let changed = either (const $ Just nullFStat) id changedResult
case changed of
Nothing -> do
logDebug $ "File " ++ fpath ++ " not changed on disk."
return Nothing
Just fstat' -> do
let jids = show $ fromJobId jid
logDebug $ "Rereading job " ++ jids
readResult <- loadJobFromDisk qdir True jid
case readResult of
Bad s -> do
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
return Nothing
Ok (job', _) -> do
logDebug $ "Read job " ++ jids ++ ", status is "
++ show (calcJobStatus job')
return . Just $ jWS {jStat=fstat', jJob=job'}
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
updateJobStatus job' =
let jid = qjId $ jJob job' in
map (\job -> if qjId (jJob job) == jid then job' else job)
updateJob :: JQStatus -> JobWithStat -> IO ()
updateJob state jb = do
jb' <- readJobStatus jb
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
logDebug "Scheduler noticed a job to have finished."
cleanupFinishedJobs state
scheduleSomeJobs state
moveJob :: Lens' Queue [JobWithStat]
-> Lens' Queue [JobWithStat]
-> JobId
-> Queue
-> (Queue, Maybe JobWithStat)
moveJob fromQ toQ jid queue =
case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
(job : _, queue') -> (over toQ (++ [job]) queue', Just job)
_ -> (queue, Nothing)
moveJobAtomic :: Lens' Queue [JobWithStat]
-> Lens' Queue [JobWithStat]
-> JobId
-> JQStatus
-> IO (Maybe JobWithStat)
moveJobAtomic fromQ toQ jid qstat =
atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
manipulateRunningJob qstat jid k = do
jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
case jobOpt of
Nothing -> return Nothing
Just _ -> (Just `liftM` k)
`finally` moveJobAtomic qManipulatedL qRunningL jid qstat
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
sortoutFinishedJobs queue =
let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
in (queue {qRunning=run'}, fin)
cleanupFinishedJobs :: JQStatus -> IO ()
cleanupFinishedJobs qstate = do
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
jlist = commaJoin $ map showJob finished
unless (null finished)
. logInfo $ "Finished jobs: " ++ jlist
mapM_ (maybe (return ()) killINotify . jINotify) finished
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
jobWatcher state jWS e = do
let jid = qjId $ jJob jWS
jids = show $ fromJobId jid
logInfo $ "Scheduler notified of change of job " ++ jids
logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
let inotify = jINotify jWS
when (e == Ignored && isJust inotify) $ do
qdir <- queueDir
let fpath = liveJobFile qdir jid
_ <- addWatch (fromJust inotify) [Modify, Delete] fpath
(jobWatcher state jWS)
return ()
updateJob state jWS
attachWatcher :: JQStatus -> JobWithStat -> IO ()
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
max_watch <- getMaxTrackedJobs state
rql <- getRQL state
if rql < max_watch
then do
inotify <- initINotify
qdir <- queueDir
let fpath = liveJobFile qdir . qjId $ jJob jWS
jWS' = jWS { jINotify=Just inotify }
logDebug $ "Attaching queue watcher for " ++ fpath
_ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
modifyJobs state . onRunningJobs $ updateJobStatus jWS'
else logDebug $ "Not attaching watcher for job "
++ (show . fromJobId . qjId $ jJob jWS)
++ ", run queue length is " ++ show rql
jobEligible :: Queue -> JobWithStat -> Bool
jobEligible queue jWS =
let jdeps = getJobDependencies $ jJob jWS
blocks = flip elem jdeps . qjId . jJob
in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue
selectJobsToRun :: Int
-> Set FilterRule
-> Queue
-> (Queue, [JobWithStat])
selectJobsToRun count filters queue =
let n = count length (qRunning queue) length (qManipulated queue)
chosen = take n
. jobFiltering queue filters
. reasonRateLimit queue
. sortBy (comparing (calcJobPriority . jJob))
. filter (jobEligible queue)
$ qEnqueued queue
remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
logFailedJobs :: (MonadLog m)
=> [(JobWithStat, GanetiException)] -> m (S.Set JobId)
logFailedJobs [] = return S.empty
logFailedJobs jobs = do
let jids = S.fromList . map (qjId . jJob . fst) $ jobs
jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
++ show (map snd jobs)
return jids
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
-> IO ()
failJobs cfg qstate jobs = do
qdir <- queueDir
now <- currentTimestamp
jids <- logFailedJobs jobs
let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
logWarning $ "Failing jobs " ++ sjobs
modifyJobs qstate $ onRunningJobs rmJobs
let trySaveJob :: JobWithStat -> ResultT String IO ()
trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
reason jid msg =
( "gnt:daemon:luxid:startjobs"
, "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
, reasonTrailTimestamp now )
failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
failAndSaveJobWithStat (jws, err) =
trySaveJob . over jJobL (failJob err) $ jws
mapM_ (runResultT . failAndSaveJobWithStat) jobs
logDebug $ "Failed jobs " ++ sjobs
cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
cancelRejectedJobs qstate cfg filters = do
enqueuedJobs <- map jJob . qEnqueued <$> readIORef (jqJobs qstate)
let jobsToCancel =
[ (job, fr) | job <- enqueuedJobs
, Just fr <- [applyingFilter filters job]
, frAction fr == Reject ]
qDir <- queueDir
forM_ jobsToCancel $ \(job, fr) -> do
let jid = qjId job
logDebug $ "Cancelling job " ++ show (fromJobId jid)
++ " because it was REJECTed by filter rule " ++ frUuid fr
dequeueResult <- dequeueJob qstate jid
case dequeueResult of
Ok True -> do
now <- currentTimestamp
r <- runResultT
$ writeAndReplicateJob cfg qDir (cancelQueuedJob now job)
case r of
Ok _ -> return ()
Bad err -> logError $
"Failed to write config when cancelling job: " ++ err
Ok False -> do
logDebug $ "Job " ++ show (fromJobId jid)
++ " not queued; trying to cancel directly"
_ <- cancelJob False (jqLivelock qstate) jid
return ()
Bad s -> logError s
scheduleSomeJobs :: JQStatus -> IO ()
scheduleSomeJobs qstate = do
cfgR <- readIORef (jqConfig qstate)
case cfgR of
Bad err -> do
let msg = "Configuration unavailable: " ++ err
logError msg
Ok cfg -> do
let filters = S.fromList . Map.elems . fromContainer $ configFilters cfg
cancelRejectedJobs qstate cfg filters
count <- getMaxRunningJobs qstate
chosen <- atomicModifyIORef (jqJobs qstate)
(selectJobsToRun count filters)
let jobs = map jJob chosen
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
$ map (show . fromJobId . qjId) jobs
mapM_ (attachWatcher qstate) chosen
result <- JQ.startJobs (jqLivelock qstate) (jqForkLock qstate) jobs
let badWith (x, Bad y) = Just (x, y)
badWith _ = Nothing
let failed = mapMaybe badWith $ zip chosen result
unless (null failed) $ failJobs cfg qstate failed
showQueue :: Queue -> String
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
let showids = show . map (fromJobId . qjId . jJob)
in "Waiting jobs: " ++ showids waiting
++ "; running jobs: " ++ showids running
checkForDeath :: JQStatus -> JobWithStat -> IO Bool
checkForDeath state jobWS = do
let job = jJob jobWS
jid = qjId job
sjid = show $ fromJobId jid
livelock = qjLivelock job
logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
died <- maybe (return False) isDead
. mfilter (/= jqLivelock state)
$ livelock
logDebug $ "Death of " ++ sjid ++ ": " ++ show died
when died $ do
logInfo $ "Detected death of job " ++ sjid
void . manipulateRunningJob state jid . runResultT $ do
jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
unless (jobFinalized . jJob $ jobWS') . void $ do
now <- liftIO currentTimestamp
qDir <- liftIO queueDir
let reason = ( "gnt:daemon:luxid:deathdetection"
, "detected death of job " ++ sjid
, reasonTrailTimestamp now )
failedJob = failQueuedJob reason now $ jJob jobWS'
cfg <- mkResultT . readIORef $ jqConfig state
writeAndReplicateJob cfg qDir failedJob
return died
cleanupIfDead :: JQStatus -> JobId -> IO Bool
cleanupIfDead state jid = do
logDebug $ "Extra job-death detection for " ++ show (fromJobId jid)
jobs <- readIORef (jqJobs state)
let jobWS = find ((==) jid . qjId . jJob) $ qRunning jobs
maybe (return True) (checkForDeath state) jobWS
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
threadDelay watchInterval
logDebug "Job queue watcher timer fired"
jobs <- readIORef (jqJobs qstate)
mapM_ (checkForDeath qstate) $ qRunning jobs
jobs' <- readIORef (jqJobs qstate)
mapM_ (updateJob qstate) $ qRunning jobs'
cleanupFinishedJobs qstate
jobs'' <- readIORef (jqJobs qstate)
logInfo $ showQueue jobs''
scheduleSomeJobs qstate
logDebug "Job queue watcher cycle finished"
readJobFromDisk :: JobId -> IO (Result JobWithStat)
readJobFromDisk jid = do
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Reading " ++ fpath
tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
let fstat = either (const nullFStat) id tryFstat
loadResult <- JQ.loadJobFromDisk qdir False jid
return $ liftM (JobWithStat Nothing fstat . fst) loadResult
readJobsFromDisk :: IO [JobWithStat]
readJobsFromDisk = do
logInfo "Loading job queue"
qdir <- queueDir
eitherJids <- JQ.getJobIDs [qdir]
let jids = genericResult (const []) JQ.sortJobIDs eitherJids
jidsstring = commaJoin $ map (show . fromJobId) jids
logInfo $ "Non-archived jobs on disk: " ++ jidsstring
jobs <- mapM readJobFromDisk jids
return $ justOk jobs
initJQScheduler :: JQStatus -> IO ()
initJQScheduler qstate = do
alljobs <- readJobsFromDisk
let jobs = filter (not . jobFinalized . jJob) alljobs
(running, queued) = partition (jobStarted . jJob) jobs
modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
jqjobs <- readIORef (jqJobs qstate)
logInfo $ showQueue jqjobs
scheduleSomeJobs qstate
logInfo "Starting time-based job queue watcher"
_ <- forkIO $ onTimeWatcher qstate
return ()
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
enqueueNewJobs state jobs = do
logInfo . (++) "New jobs enqueued: " . commaJoin
$ map (show . fromJobId . qjId) jobs
let jobs' = map unreadJob jobs
insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
modifyJobs state (onQueuedJobs addJobs)
scheduleSomeJobs state
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
rmJob jid q =
let isJid = (jid ==) . qjId . jJob
(found, queued') = partition isJid $ qEnqueued q
isRunning = any isJid $ qRunning q
sJid = (++) "Job " . show $ fromJobId jid
in case (found, isRunning) of
([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
(_:_, _) -> (q, Bad $ "Queue in inconsistent state."
++ sJid ++ " queued multiple times")
(_, True) -> (q, Ok Nothing)
_ -> (q, Bad $ sJid ++ " not found in queue")
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
dequeueJob state jid = do
result <- atomicModifyIORef (jqJobs state) $ rmJob jid
let result' = fmap isJust result
logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
++ " is " ++ show result'
return result'
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
setJobPriority state jid prio = runResultT $ do
maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
case maybeJob of
Nothing -> return Nothing
Just job -> do
let job' = changeJobPriority prio job
qDir <- liftIO queueDir
mkResultT $ writeJobToDisk qDir job'
liftIO $ enqueueNewJobs state [job']
return $ Just job'
configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool
configChangeNeedsRescheduling oldConfig newConfig =
or
[ configFilters oldConfig /= configFilters newConfig
]