module Ganeti.JQScheduler
( JQStatus
, emptyJQStatus
, initJQScheduler
, enqueueNewJobs
, dequeueJob
, setJobPriority
) where
import Control.Arrow
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Function (on)
import Data.List
import Data.Maybe
import Data.IORef
import System.INotify
import Ganeti.BasicTypes
import Ganeti.Constants as C
import Ganeti.JQueue as JQ
import Ganeti.Logging
import Ganeti.Objects
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
, jStat :: FStat
, jJob :: QueuedJob
}
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
data JQStatus = JQStatus
{ jqJobs :: IORef Queue
, jqConfig :: IORef (Result ConfigData)
}
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
return JQStatus { jqJobs = jqJ, jqConfig = config }
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
unreadJob :: QueuedJob -> JobWithStat
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
watchInterval :: Int
watchInterval = C.luxidJobqueuePollInterval * 1000000
getMaxRunningJobs :: JQStatus -> IO Int
getMaxRunningJobs =
liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
. readIORef . jqConfig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f)
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
let jid = qjId job
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Checking if " ++ fpath ++ " changed on disk."
changedResult <- try $ needsReload fstat fpath
:: IO (Either IOError (Maybe FStat))
let changed = either (const $ Just nullFStat) id changedResult
case changed of
Nothing -> do
logDebug $ "File " ++ fpath ++ " not changed on disk."
return Nothing
Just fstat' -> do
let jids = show $ fromJobId jid
logDebug $ "Rereading job " ++ jids
readResult <- loadJobFromDisk qdir True jid
case readResult of
Bad s -> do
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
return Nothing
Ok (job', _) -> do
logDebug
$ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
return . Just $ jWS {jStat=fstat', jJob=job'}
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
updateJobStatus job' =
let jid = qjId $ jJob job' in
map (\job -> if qjId (jJob job) == jid then job' else job)
updateJob :: JQStatus -> JobWithStat -> IO ()
updateJob state jb = do
jb' <- readJobStatus jb
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
logDebug "Scheduler noticed a job to have finished."
cleanupFinishedJobs state
scheduleSomeJobs state
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
sortoutFinishedJobs queue =
let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
in (queue {qRunning=run'}, fin)
cleanupFinishedJobs :: JQStatus -> IO ()
cleanupFinishedJobs qstate = do
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
jlist = commaJoin $ map showJob finished
unless (null finished)
. logInfo $ "Finished jobs: " ++ jlist
mapM_ (maybe (return ()) killINotify . jINotify) finished
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
jobWatcher state jWS e = do
let jid = qjId $ jJob jWS
jids = show $ fromJobId jid
logInfo $ "Scheduler notified of change of job " ++ jids
logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
let inotify = jINotify jWS
when (e == Ignored && isJust inotify) $ do
qdir <- queueDir
let fpath = liveJobFile qdir jid
_ <- addWatch (fromJust inotify) [Modify, Delete] fpath
(jobWatcher state jWS)
return ()
updateJob state jWS
attachWatcher :: JQStatus -> JobWithStat -> IO ()
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
inotify <- initINotify
qdir <- queueDir
let fpath = liveJobFile qdir . qjId $ jJob jWS
jWS' = jWS { jINotify=Just inotify }
logDebug $ "Attaching queue watcher for " ++ fpath
_ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
modifyJobs state . onRunningJobs $ updateJobStatus jWS'
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
selectJobsToRun count queue =
let n = count length (qRunning queue)
(chosen, remain) = splitAt n (qEnqueued queue)
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
requeueJobs qstate jobs err = do
let jids = map (qjId . jJob) jobs
jidsString = commaJoin $ map (show . fromJobId) jids
rmJobs = filter ((`notElem` jids) . qjId . jJob)
logWarning $ "Starting jobs failed: " ++ show err
logWarning $ "Rescheduling jobs: " ++ jidsString
modifyJobs qstate (onRunningJobs rmJobs)
modifyJobs qstate (onQueuedJobs $ (++) jobs)
scheduleSomeJobs :: JQStatus -> IO ()
scheduleSomeJobs qstate = do
count <- getMaxRunningJobs qstate
chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
let jobs = map jJob chosen
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
$ map (show . fromJobId . qjId) jobs
mapM_ (attachWatcher qstate) chosen
result <- try $ JQ.startJobs jobs
either (requeueJobs qstate chosen) return result
showQueue :: Queue -> String
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
let showids = show . map (fromJobId . qjId . jJob)
in "Waiting jobs: " ++ showids waiting
++ "; running jobs: " ++ showids running
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
threadDelay watchInterval
logDebug "Job queue watcher timer fired"
jobs <- readIORef (jqJobs qstate)
mapM_ (updateJob qstate) $ qRunning jobs
cleanupFinishedJobs qstate
jobs' <- readIORef (jqJobs qstate)
logInfo $ showQueue jobs'
scheduleSomeJobs qstate
readJobFromDisk :: JobId -> IO (Result JobWithStat)
readJobFromDisk jid = do
qdir <- queueDir
let fpath = liveJobFile qdir jid
logDebug $ "Reading " ++ fpath
tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
let fstat = either (const nullFStat) id tryFstat
loadResult <- JQ.loadJobFromDisk qdir False jid
return $ liftM (JobWithStat Nothing fstat . fst) loadResult
readJobsFromDisk :: IO [JobWithStat]
readJobsFromDisk = do
logInfo "Loading job queue"
qdir <- queueDir
eitherJids <- JQ.getJobIDs [qdir]
let jids = either (const []) JQ.sortJobIDs eitherJids
jidsstring = commaJoin $ map (show . fromJobId) jids
logInfo $ "Non-archived jobs on disk: " ++ jidsstring
jobs <- mapM readJobFromDisk jids
return $ justOk jobs
initJQScheduler :: JQStatus -> IO ()
initJQScheduler qstate = do
alljobs <- readJobsFromDisk
let jobs = filter (not . jobFinalized . jJob) alljobs
(running, queued) = partition (jobStarted . jJob) jobs
modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
jqjobs <- readIORef (jqJobs qstate)
logInfo $ showQueue jqjobs
scheduleSomeJobs qstate
logInfo "Starting time-based job queue watcher"
_ <- forkIO $ onTimeWatcher qstate
return ()
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
enqueueNewJobs state jobs = do
logInfo . (++) "New jobs enqueued: " . commaJoin
$ map (show . fromJobId . qjId) jobs
let jobs' = map unreadJob jobs
insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
modifyJobs state (onQueuedJobs addJobs)
scheduleSomeJobs state
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
rmJob jid q =
let isJid = (jid ==) . qjId . jJob
(found, queued') = partition isJid $ qEnqueued q
isRunning = any isJid $ qRunning q
sJid = (++) "Job " . show $ fromJobId jid
in case (found, isRunning) of
([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
(_:_, _) -> (q, Bad $ "Queue in inconsistent state."
++ sJid ++ " queued multiple times")
(_, True) -> (q, Ok Nothing)
_ -> (q, Bad $ sJid ++ " not found in queue")
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
dequeueJob state jid = do
result <- atomicModifyIORef (jqJobs state) $ rmJob jid
let result' = fmap isJust result
logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
++ " is " ++ show result'
return result'
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
setJobPriority state jid prio = runResultT $ do
maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
case maybeJob of
Nothing -> return Nothing
Just job -> do
let job' = changeJobPriority prio job
qDir <- liftIO queueDir
mkResultT $ writeJobToDisk qDir job'
liftIO $ enqueueNewJobs state [job']
return $ Just job'