Safe Haskell | Safe-Infered |
---|
Implementation of a reader for the job queue.
- data JQStatus = JQStatus {
- jqJobs :: IORef Queue
- jqConfig :: IORef (Result ConfigData)
- jqLivelock :: Livelock
- jqForkLock :: Lock
- emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
- onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
- onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
- unreadJob :: QueuedJob -> JobWithStat
- watchInterval :: Int
- getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
- getMaxRunningJobs :: JQStatus -> IO Int
- getMaxTrackedJobs :: JQStatus -> IO Int
- getRQL :: JQStatus -> IO Int
- modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
- readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
- updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
- updateJob :: JQStatus -> JobWithStat -> IO ()
- moveJob :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> Queue -> (Queue, Maybe JobWithStat)
- moveJobAtomic :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> JQStatus -> IO (Maybe JobWithStat)
- manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
- sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
- cleanupFinishedJobs :: JQStatus -> IO ()
- jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
- attachWatcher :: JQStatus -> JobWithStat -> IO ()
- jobEligible :: Queue -> JobWithStat -> Bool
- selectJobsToRun :: Int -> Set FilterRule -> Queue -> (Queue, [JobWithStat])
- logFailedJobs :: MonadLog m => [(JobWithStat, GanetiException)] -> m (Set JobId)
- failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
- cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
- scheduleSomeJobs :: JQStatus -> IO ()
- showQueue :: Queue -> String
- checkForDeath :: JQStatus -> JobWithStat -> IO Bool
- cleanupIfDead :: JQStatus -> JobId -> IO Bool
- onTimeWatcher :: JQStatus -> IO ()
- readJobFromDisk :: JobId -> IO (Result JobWithStat)
- readJobsFromDisk :: IO [JobWithStat]
- initJQScheduler :: JQStatus -> IO ()
- enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
- rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
- dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
- setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
- configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool
Documentation
Representation of the job queue
We keep two lists of jobs (together with information about the last fstat result observed): the jobs that are enqueued, but not yet handed over for execution, and the jobs already handed over for execution. They are kept together in a single IORef, so that we can atomically update both, in particular when scheduling jobs to be handed over for execution.
JQStatus | |
|
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatusSource
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> QueueSource
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> QueueSource
watchInterval :: IntSource
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO aSource
getMaxRunningJobs :: JQStatus -> IO IntSource
getMaxTrackedJobs :: JQStatus -> IO IntSource
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()Source
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)Source
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]Source
updateJob :: JQStatus -> JobWithStat -> IO ()Source
moveJob :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> Queue -> (Queue, Maybe JobWithStat)Source
moveJobAtomic :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> JQStatus -> IO (Maybe JobWithStat)Source
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)Source
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])Source
cleanupFinishedJobs :: JQStatus -> IO ()Source
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()Source
attachWatcher :: JQStatus -> JobWithStat -> IO ()Source
jobEligible :: Queue -> JobWithStat -> BoolSource
:: Int | How many jobs are allowed to run at the same time. |
-> Set FilterRule | Filter rules to respect for scheduling |
-> Queue | |
-> (Queue, [JobWithStat]) |
Decide on which jobs to schedule next for execution. This is the pure function doing the scheduling.
logFailedJobs :: MonadLog m => [(JobWithStat, GanetiException)] -> m (Set JobId)Source
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)] -> IO ()Source
cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()Source
scheduleSomeJobs :: JQStatus -> IO ()Source
Schedule jobs to be run. This is the IO wrapper around the
pure selectJobsToRun
.
checkForDeath :: JQStatus -> JobWithStat -> IO BoolSource
cleanupIfDead :: JQStatus -> JobId -> IO BoolSource
Trigger job detection for the job with the given job id. Return True, if the job is dead.
onTimeWatcher :: JQStatus -> IO ()Source
readJobFromDisk :: JobId -> IO (Result JobWithStat)Source
readJobsFromDisk :: IO [JobWithStat]Source
initJQScheduler :: JQStatus -> IO ()Source
Set up the job scheduler. This will also start the monitoring of changes to the running jobs.
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()Source
Enqueue new jobs. This will guarantee that the jobs will be executed eventually.
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)Source
Try to remove a queued job from the job queue. Return True, if the job could be removed from the queue before being handed over to execution, False if the job already started, and a Bad result if the job is unknown.
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))Source
Change the priority of a queued job (once the job is handed over to execution, the job itself needs to be informed). To avoid the job being started unmodified, it is temporarily unqueued during the change. Return the modified job, if the job's priority was sucessfully modified, Nothing, if the job already started, and a Bad value, if the job is unkown.
configChangeNeedsRescheduling :: ConfigData -> ConfigData -> BoolSource
Given old and new configs, determines if the changes between them should trigger the scheduler to run.