ganeti
Safe HaskellNone

Ganeti.JQScheduler

Description

Implementation of a reader for the job queue.

Synopsis

Documentation

data JQStatus Source #

Representation of the job queue

We keep two lists of jobs (together with information about the last fstat result observed): the jobs that are enqueued, but not yet handed over for execution, and the jobs already handed over for execution. They are kept together in a single IORef, so that we can atomically update both, in particular when scheduling jobs to be handed over for execution.

Constructors

JQStatus 

Fields

onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue Source #

Apply a function on the running jobs.

onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue Source #

Apply a function on the queued jobs.

unreadJob :: QueuedJob -> JobWithStat Source #

Obtain a JobWithStat from a QueuedJob.

watchInterval :: Int Source #

Reload interval for polling the running jobs for updates in microseconds.

getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a Source #

Read a cluster parameter from the configuration, using a default if the configuration is not available.

getMaxRunningJobs :: JQStatus -> IO Int Source #

Get the maximual number of jobs to be run simultaneously from the configuration. If the configuration is not available, be conservative and use the smallest possible value, i.e., 1.

getMaxTrackedJobs :: JQStatus -> IO Int Source #

Get the maximual number of jobs to be tracked simultaneously from the configuration. If the configuration is not available, be conservative and use the smallest possible value, i.e., 1.

getRQL :: JQStatus -> IO Int Source #

Get the number of jobs currently running.

modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () Source #

Wrapper function to atomically update the jobs in the queue status.

readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) Source #

Reread a job from disk, if the file has changed.

updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] Source #

Update a job in the job queue, if it is still there. This is the pure function for inserting a previously read change into the queue. as the change contains its time stamp, we don't have to worry about a later read change overwriting a newer read state. If this happens, the fstat value will be outdated, so the next poller run will fix this.

updateJob :: JQStatus -> JobWithStat -> IO () Source #

Update a single job by reading it from disk, if necessary.

moveJob Source #

Arguments

:: Lens' Queue [JobWithStat]

from queue

-> Lens' Queue [JobWithStat]

to queue

-> JobId 
-> Queue 
-> (Queue, Maybe JobWithStat) 

Move a job from one part of the queue to another. Return the job that was moved, or Nothing if it wasn't found in the queue.

moveJobAtomic Source #

Arguments

:: Lens' Queue [JobWithStat]

from queue

-> Lens' Queue [JobWithStat]

to queue

-> JobId 
-> JQStatus 
-> IO (Maybe JobWithStat) 

Atomically move a job from one part of the queue to another. Return the job that was moved, or Nothing if it wasn't found in the queue.

manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a) Source #

Manipulate a running job by atomically moving it from qRunning into qManipulated, running a given IO action and then atomically returning it back.

Returns the result of the IO action, or Nothing, if the job wasn't found in the queue.

sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat]) Source #

Sort out the finished jobs from the monitored part of the queue. This is the pure part, splitting the queue into a remaining queue and the jobs that were removed.

cleanupFinishedJobs :: JQStatus -> IO () Source #

Actually clean up the finished jobs. This is the IO wrapper around the pure sortoutFinishedJobs.

jobWatcher :: JQStatus -> JobWithStat -> Event -> IO () Source #

Watcher task for a job, to update it on file changes. It also reinstantiates itself upon receiving an Ignored event.

attachWatcher :: JQStatus -> JobWithStat -> IO () Source #

Attach the job watcher to a running job.

jobEligible :: Queue -> JobWithStat -> Bool Source #

For a queued job, determine whether it is eligible to run, i.e., if no jobs it depends on are either enqueued or running.

selectJobsToRun Source #

Arguments

:: Int

How many jobs are allowed to run at the same time.

-> Set FilterRule

Filter rules to respect for scheduling

-> Queue 
-> (Queue, [JobWithStat]) 

Decide on which jobs to schedule next for execution. This is the pure function doing the scheduling.

logFailedJobs :: MonadLog m => [(JobWithStat, GanetiException)] -> m (Set JobId) Source #

Logs errors of failed jobs and returns the set of job IDs.

failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)] -> IO () Source #

Fail jobs that were previously selected for execution but couldn't be started.

cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO () Source #

Checks if any jobs match a REJECT filter rule, and cancels them.

scheduleSomeJobs :: JQStatus -> IO () Source #

Schedule jobs to be run. This is the IO wrapper around the pure selectJobsToRun.

showQueue :: Queue -> String Source #

Format the job queue status in a compact, human readable way.

checkForDeath :: JQStatus -> JobWithStat -> IO Bool Source #

Check if a job died, and clean up if so. Return True, if the job was found dead.

cleanupIfDead :: JQStatus -> JobId -> IO Bool Source #

Trigger job detection for the job with the given job id. Return True, if the job is dead.

updateStatusAndScheduleSomeJobs :: JQStatus -> IO () Source #

Force the queue to check the state of all jobs.

onTimeWatcher :: JQStatus -> IO () Source #

Time-based watcher for updating the job queue.

readJobFromDisk :: JobId -> IO (Result JobWithStat) Source #

Read a single, non-archived, job, specified by its id, from disk.

readJobsFromDisk :: IO [JobWithStat] Source #

Read all non-finalized jobs from disk.

initJQScheduler :: JQStatus -> IO () Source #

Set up the job scheduler. This will also start the monitoring of changes to the running jobs.

enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () Source #

Enqueue new jobs. This will guarantee that the jobs will be executed eventually.

rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob)) Source #

Pure function for removing a queued job from the job queue by atomicModifyIORef. The answer is Just the job if the job could be removed before being handed over to execution, Nothing if it already was started and a Bad result if the job is not found in the queue.

dequeueJob :: JQStatus -> JobId -> IO (Result Bool) Source #

Try to remove a queued job from the job queue. Return True, if the job could be removed from the queue before being handed over to execution, False if the job already started, and a Bad result if the job is unknown.

setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob)) Source #

Change the priority of a queued job (once the job is handed over to execution, the job itself needs to be informed). To avoid the job being started unmodified, it is temporarily unqueued during the change. Return the modified job, if the job's priority was sucessfully modified, Nothing, if the job already started, and a Bad value, if the job is unkown.

configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool Source #

Given old and new configs, determines if the changes between them should trigger the scheduler to run.