module Ganeti.JQScheduler
( JQStatus
, jqLivelock
, jqForkLock
, emptyJQStatus
, initJQScheduler
, enqueueNewJobs
, dequeueJob
, setJobPriority
) where
import Control.Applicative (liftA2)
import Control.Arrow
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Function (on)
import Data.Functor ((<$))
import Data.IORef
import Data.List
import Data.Maybe
import qualified Data.Set as S
import System.INotify
import Ganeti.BasicTypes
import Ganeti.Constants as C
import Ganeti.Errors
import Ganeti.JQueue as JQ
import Ganeti.Lens hiding (chosen)
import Ganeti.Logging
import Ganeti.Objects
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Livelock
import Ganeti.Utils.MVarLock
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
, jStat :: FStat
, jJob :: QueuedJob
}
$(makeCustomLenses' ''JobWithStat ['jJob])
data Queue = Queue { qEnqueued :: [JobWithStat]
, qRunning :: [JobWithStat]
, qManipulated :: [JobWithStat]
}
$(makeCustomLenses ''Queue)
data JQStatus = JQStatus
{ jqJobs :: IORef Queue
, jqConfig :: IORef (Result ConfigData)
, jqLivelock :: Livelock
, jqForkLock :: Lock
}
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
(_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
forkLock <- newLock
return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
, jqForkLock = forkLock }
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onRunningJobs = over qRunningL
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onQueuedJobs = over qEnqueuedL
unreadJob :: QueuedJob -> JobWithStat
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
watchInterval :: Int
watchInterval = C.luxidJobqueuePollInterval * 1000000
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
getConfigValue param defaultvalue =
liftM (genericResult (const defaultvalue) (param . configCluster))
. readIORef . jqConfig
getMaxRunningJobs :: JQStatus -> IO Int
getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
getMaxTrackedJobs :: JQStatus -> IO Int
getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
getRQL :: JQStatus -> IO Int
getRQL = liftM (length . qRunning) . readIORef . jqJobs
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f)
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
let jid = qjId job
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Checking if " ++ fpath ++ " changed on disk."
changedResult <- try $ needsReload fstat fpath
:: IO (Either IOError (Maybe FStat))
let changed = either (const $ Just nullFStat) id changedResult
case changed of
Nothing -> do
logDebug $ "File " ++ fpath ++ " not changed on disk."
return Nothing
Just fstat' -> do
let jids = show $ fromJobId jid
logDebug $ "Rereading job " ++ jids
readResult <- loadJobFromDisk qdir True jid
case readResult of
Bad s -> do
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
return Nothing
Ok (job', _) -> do
logDebug $ "Read job " ++ jids ++ ", status is "
++ show (calcJobStatus job')
return . Just $ jWS {jStat=fstat', jJob=job'}
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
updateJobStatus job' =
let jid = qjId $ jJob job' in
map (\job -> if qjId (jJob job) == jid then job' else job)
updateJob :: JQStatus -> JobWithStat -> IO ()
updateJob state jb = do
jb' <- readJobStatus jb
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
logDebug "Scheduler noticed a job to have finished."
cleanupFinishedJobs state
scheduleSomeJobs state
moveJob :: Lens' Queue [JobWithStat]
-> Lens' Queue [JobWithStat]
-> JobId
-> Queue
-> (Queue, Maybe JobWithStat)
moveJob fromQ toQ jid queue =
case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
(job : _, queue') -> (over toQ (++ [job]) queue', Just job)
_ -> (queue, Nothing)
moveJobAtomic :: Lens' Queue [JobWithStat]
-> Lens' Queue [JobWithStat]
-> JobId
-> JQStatus
-> IO (Maybe JobWithStat)
moveJobAtomic fromQ toQ jid qstat =
atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
manipulateRunningJob qstat jid k = do
jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
case jobOpt of
Nothing -> return Nothing
Just _ -> (Just `liftM` k)
`finally` moveJobAtomic qManipulatedL qRunningL jid qstat
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
sortoutFinishedJobs queue =
let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
in (queue {qRunning=run'}, fin)
cleanupFinishedJobs :: JQStatus -> IO ()
cleanupFinishedJobs qstate = do
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
jlist = commaJoin $ map showJob finished
unless (null finished)
. logInfo $ "Finished jobs: " ++ jlist
mapM_ (maybe (return ()) killINotify . jINotify) finished
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
jobWatcher state jWS e = do
let jid = qjId $ jJob jWS
jids = show $ fromJobId jid
logInfo $ "Scheduler notified of change of job " ++ jids
logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
let inotify = jINotify jWS
when (e == Ignored && isJust inotify) $ do
qdir <- queueDir
let fpath = liveJobFile qdir jid
_ <- addWatch (fromJust inotify) [Modify, Delete] fpath
(jobWatcher state jWS)
return ()
updateJob state jWS
attachWatcher :: JQStatus -> JobWithStat -> IO ()
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
max_watch <- getMaxTrackedJobs state
rql <- getRQL state
if rql < max_watch
then do
inotify <- initINotify
qdir <- queueDir
let fpath = liveJobFile qdir . qjId $ jJob jWS
jWS' = jWS { jINotify=Just inotify }
logDebug $ "Attaching queue watcher for " ++ fpath
_ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
modifyJobs state . onRunningJobs $ updateJobStatus jWS'
else logDebug $ "Not attaching watcher for job "
++ (show . fromJobId . qjId $ jJob jWS)
++ ", run queue length is " ++ show rql
jobEligible :: Queue -> JobWithStat -> Bool
jobEligible queue jWS =
let jdeps = getJobDependencies $ jJob jWS
blocks = flip elem jdeps . qjId . jJob
in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
selectJobsToRun count queue =
let n = count length (qRunning queue) length (qManipulated queue)
chosen = take n . filter (jobEligible queue) $ qEnqueued queue
remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
logFailedJobs :: (MonadLog m)
=> [(JobWithStat, GanetiException)] -> m (S.Set JobId)
logFailedJobs [] = return S.empty
logFailedJobs jobs = do
let jids = S.fromList . map (qjId . jJob . fst) $ jobs
jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
++ show (map snd jobs)
return jids
requeueJobs :: JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
requeueJobs qstate jobs = do
jids <- logFailedJobs jobs
let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
logWarning "Rescheduling jobs"
modifyJobs qstate $ onQueuedJobs (map fst jobs ++)
. onRunningJobs rmJobs
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
-> IO ()
failJobs cfg qstate jobs = do
qdir <- queueDir
now <- currentTimestamp
jids <- logFailedJobs jobs
let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
logWarning $ "Failing jobs " ++ sjobs
modifyJobs qstate $ onRunningJobs rmJobs
let trySaveJob :: JobWithStat -> ResultT String IO ()
trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
reason jid msg =
( "gnt:daemon:luxid:startjobs"
, "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
, reasonTrailTimestamp now )
failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
failAndSaveJobWithStat (jws, err) =
trySaveJob . over jJobL (failJob err) $ jws
mapM_ (runResultT . failAndSaveJobWithStat) jobs
logDebug $ "Failed jobs " ++ sjobs
scheduleSomeJobs :: JQStatus -> IO ()
scheduleSomeJobs qstate = do
count <- getMaxRunningJobs qstate
chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
let jobs = map jJob chosen
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
$ map (show . fromJobId . qjId) jobs
mapM_ (attachWatcher qstate) chosen
cfgR <- readIORef (jqConfig qstate)
case cfgR of
Bad err -> do
let msg = "Configuration unavailable: " ++ err
logError msg
requeueJobs qstate . map (\x -> (x, strMsg msg)) $ chosen
Ok cfg -> do
result <- JQ.startJobs cfg (jqLivelock qstate) (jqForkLock qstate) jobs
let badWith (x, Bad y) = Just (x, y)
badWith _ = Nothing
let failed = mapMaybe badWith $ zip chosen result
unless (null failed) $ failJobs cfg qstate failed
showQueue :: Queue -> String
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
let showids = show . map (fromJobId . qjId . jJob)
in "Waiting jobs: " ++ showids waiting
++ "; running jobs: " ++ showids running
checkForDeath :: JQStatus -> JobWithStat -> IO ()
checkForDeath state jobWS = do
let job = jJob jobWS
jid = qjId job
sjid = show $ fromJobId jid
livelock = qjLivelock job
logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
died <- maybe (return False) isDead
. mfilter (/= jqLivelock state)
$ livelock
when died $ do
logInfo $ "Detected death of job " ++ sjid
void . manipulateRunningJob state jid . runResultT $ do
jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
unless (jobFinalized . jJob $ jobWS') . void $ do
now <- liftIO currentTimestamp
qDir <- liftIO queueDir
let reason = ( "gnt:daemon:luxid:deathdetection"
, "detected death of job " ++ sjid
, reasonTrailTimestamp now )
failedJob = failQueuedJob reason now $ jJob jobWS'
cfg <- mkResultT . readIORef $ jqConfig state
writeAndReplicateJob cfg qDir failedJob
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
threadDelay watchInterval
logDebug "Job queue watcher timer fired"
jobs <- readIORef (jqJobs qstate)
mapM_ (checkForDeath qstate) $ qRunning jobs
jobs' <- readIORef (jqJobs qstate)
mapM_ (updateJob qstate) $ qRunning jobs'
cleanupFinishedJobs qstate
jobs'' <- readIORef (jqJobs qstate)
logInfo $ showQueue jobs''
scheduleSomeJobs qstate
logDebug "Job queue watcher cycle finished"
readJobFromDisk :: JobId -> IO (Result JobWithStat)
readJobFromDisk jid = do
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Reading " ++ fpath
tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
let fstat = either (const nullFStat) id tryFstat
loadResult <- JQ.loadJobFromDisk qdir False jid
return $ liftM (JobWithStat Nothing fstat . fst) loadResult
readJobsFromDisk :: IO [JobWithStat]
readJobsFromDisk = do
logInfo "Loading job queue"
qdir <- queueDir
eitherJids <- JQ.getJobIDs [qdir]
let jids = genericResult (const []) JQ.sortJobIDs eitherJids
jidsstring = commaJoin $ map (show . fromJobId) jids
logInfo $ "Non-archived jobs on disk: " ++ jidsstring
jobs <- mapM readJobFromDisk jids
return $ justOk jobs
initJQScheduler :: JQStatus -> IO ()
initJQScheduler qstate = do
alljobs <- readJobsFromDisk
let jobs = filter (not . jobFinalized . jJob) alljobs
(running, queued) = partition (jobStarted . jJob) jobs
modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
jqjobs <- readIORef (jqJobs qstate)
logInfo $ showQueue jqjobs
scheduleSomeJobs qstate
logInfo "Starting time-based job queue watcher"
_ <- forkIO $ onTimeWatcher qstate
return ()
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
enqueueNewJobs state jobs = do
logInfo . (++) "New jobs enqueued: " . commaJoin
$ map (show . fromJobId . qjId) jobs
let jobs' = map unreadJob jobs
insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
modifyJobs state (onQueuedJobs addJobs)
scheduleSomeJobs state
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
rmJob jid q =
let isJid = (jid ==) . qjId . jJob
(found, queued') = partition isJid $ qEnqueued q
isRunning = any isJid $ qRunning q
sJid = (++) "Job " . show $ fromJobId jid
in case (found, isRunning) of
([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
(_:_, _) -> (q, Bad $ "Queue in inconsistent state."
++ sJid ++ " queued multiple times")
(_, True) -> (q, Ok Nothing)
_ -> (q, Bad $ sJid ++ " not found in queue")
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
dequeueJob state jid = do
result <- atomicModifyIORef (jqJobs state) $ rmJob jid
let result' = fmap isJust result
logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
++ " is " ++ show result'
return result'
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
setJobPriority state jid prio = runResultT $ do
maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
case maybeJob of
Nothing -> return Nothing
Just job -> do
let job' = changeJobPriority prio job
qDir <- liftIO queueDir
mkResultT $ writeJobToDisk qDir job'
liftIO $ enqueueNewJobs state [job']
return $ Just job'