Safe Haskell | None |
---|
Implementation of a reader for the job queue.
- data JobWithStat = JobWithStat {}
- jJobL :: Lens' JobWithStat QueuedJob
- data Queue = Queue {
- qEnqueued :: [JobWithStat]
- qRunning :: [JobWithStat]
- qManipulated :: [JobWithStat]
- qRunningL :: Lens' Queue [JobWithStat]
- qManipulatedL :: Lens' Queue [JobWithStat]
- qEnqueuedL :: Lens' Queue [JobWithStat]
- data JQStatus = JQStatus {
- jqJobs :: IORef Queue
- jqConfig :: IORef (Result ConfigData)
- jqLivelock :: Livelock
- jqForkLock :: Lock
- emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
- onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
- onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
- unreadJob :: QueuedJob -> JobWithStat
- watchInterval :: Int
- getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
- getMaxRunningJobs :: JQStatus -> IO Int
- getMaxTrackedJobs :: JQStatus -> IO Int
- getRQL :: JQStatus -> IO Int
- modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
- readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
- updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
- updateJob :: JQStatus -> JobWithStat -> IO ()
- moveJob :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> Queue -> (Queue, Maybe JobWithStat)
- moveJobAtomic :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> JQStatus -> IO (Maybe JobWithStat)
- manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
- sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
- cleanupFinishedJobs :: JQStatus -> IO ()
- jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
- attachWatcher :: JQStatus -> JobWithStat -> IO ()
- jobEligible :: Queue -> JobWithStat -> Bool
- selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
- logFailedJobs :: MonadLog m => [(JobWithStat, GanetiException)] -> m (Set JobId)
- requeueJobs :: JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
- failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
- scheduleSomeJobs :: JQStatus -> IO ()
- showQueue :: Queue -> String
- checkForDeath :: JQStatus -> JobWithStat -> IO ()
- onTimeWatcher :: JQStatus -> IO ()
- readJobFromDisk :: JobId -> IO (Result JobWithStat)
- readJobsFromDisk :: IO [JobWithStat]
- initJQScheduler :: JQStatus -> IO ()
- enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
- rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
- dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
- setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
Documentation
data JobWithStat Source
jJobL :: Lens' JobWithStat QueuedJobSource
Queue | |
|
qRunningL :: Lens' Queue [JobWithStat]Source
qManipulatedL :: Lens' Queue [JobWithStat]Source
qEnqueuedL :: Lens' Queue [JobWithStat]Source
Representation of the job queue
We keep two lists of jobs (together with information about the last fstat result observed): the jobs that are enqueued, but not yet handed over for execution, and the jobs already handed over for execution. They are kept together in a single IORef, so that we can atomically update both, in particular when scheduling jobs to be handed over for execution.
JQStatus | |
|
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatusSource
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> QueueSource
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> QueueSource
watchInterval :: IntSource
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO aSource
getMaxRunningJobs :: JQStatus -> IO IntSource
getMaxTrackedJobs :: JQStatus -> IO IntSource
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()Source
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)Source
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]Source
updateJob :: JQStatus -> JobWithStat -> IO ()Source
moveJob :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> Queue -> (Queue, Maybe JobWithStat)Source
moveJobAtomic :: Lens' Queue [JobWithStat] -> Lens' Queue [JobWithStat] -> JobId -> JQStatus -> IO (Maybe JobWithStat)Source
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)Source
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])Source
cleanupFinishedJobs :: JQStatus -> IO ()Source
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()Source
attachWatcher :: JQStatus -> JobWithStat -> IO ()Source
jobEligible :: Queue -> JobWithStat -> BoolSource
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])Source
logFailedJobs :: MonadLog m => [(JobWithStat, GanetiException)] -> m (Set JobId)Source
requeueJobs :: JQStatus -> [(JobWithStat, GanetiException)] -> IO ()Source
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)] -> IO ()Source
scheduleSomeJobs :: JQStatus -> IO ()Source
checkForDeath :: JQStatus -> JobWithStat -> IO ()Source
onTimeWatcher :: JQStatus -> IO ()Source
readJobFromDisk :: JobId -> IO (Result JobWithStat)Source
readJobsFromDisk :: IO [JobWithStat]Source
initJQScheduler :: JQStatus -> IO ()Source
Set up the job scheduler. This will also start the monitoring of changes to the running jobs.
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()Source
Enqueue new jobs. This will guarantee that the jobs will be executed eventually.
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)Source
Try to remove a queued job from the job queue. Return True, if the job could be removed from the queue before being handed over to execution, False if the job already started, and a Bad result if the job is unknown.
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))Source
Change the priority of a queued job (once the job is handed over to execution, the job itself needs to be informed). To avoid the job being started unmodified, it is temporarily unqueued during the change. Return the modified job, if the job's priority was sucessfully modified, Nothing, if the job already started, and a Bad value, if the job is unkown.