Safe HaskellNone




Implementation of the job queue.


Data types

noTimestamp :: Timestamp Source #

Missing timestamp type.

fromClockTime :: ClockTime -> Timestamp Source #

Obtain a Timestamp from a given clock time

currentTimestamp :: IO Timestamp Source #

Get the current time in the job-queue timestamp format.

advanceTimestamp :: Int -> Timestamp -> Timestamp Source #

From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.

extractOpSummary :: InputOpCode -> String Source #

Tries to extract the opcode summary from an InputOpCode. This duplicates some functionality from the opSummary function in Ganeti.OpCodes.

queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode Source #

Convenience function to obtain a QueuedOpCode from a MetaOpCode

queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob Source #

From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.

setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob Source #

Attach a received timestamp to a Queued Job.

reasonTrailTimestamp :: Timestamp -> Integer Source #

Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.

extendJobReasonTrail :: QueuedJob -> QueuedJob Source #

Append an element to the reason trail of all the OpCodes of a queued job.

getJobDependencies :: QueuedJob -> [JobId] Source #

From a queued job obtain the list of jobs it depends on.

changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode Source #

Change the priority of a QueuedOpCode, if it is not already finalized.

changeJobPriority :: Int -> QueuedJob -> QueuedJob Source #

Change the priority of a job, i.e., change the priority of the non-finalized opcodes.

cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob Source #

Transform a QueuedJob that has not been started into its canceled form.

failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob Source #

Transform a QueuedJob that has not been started into its failed form.

jobFileName :: JobId -> FilePath Source #

Computes the filename for a given job ID.

liveJobFile :: FilePath -> JobId -> FilePath Source #

Computes the full path to a live job.

archivedJobFile :: FilePath -> JobId -> FilePath Source #

Computes the full path to an archives job. BROKEN.

calcJobStatus :: QueuedJob -> JobStatus Source #

Computes a queued job's status.

jobStarted :: QueuedJob -> Bool Source #

Determine if a job has started

jobFinalized :: QueuedJob -> Bool Source #

Determine if a job is finalised.

jobArchivable :: Timestamp -> QueuedJob -> Bool Source #

Determine if a job is finalized and its timestamp is before a given time.

opStatusFinalized :: OpStatus -> Bool Source #

Determine whether an opcode status is finalized.

calcJobPriority :: QueuedJob -> Int Source #

Compute a job's priority.

determineJobDirectories :: FilePath -> Bool -> IO [FilePath] Source #

Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.

getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) Source #

Computes the list of all jobs in the given directories.

sortJobIDs :: [JobId] -> [JobId] Source #

Sorts the a list of job IDs.

noSuchJob :: Result (QueuedJob, Bool) Source #

Failed to load job error.

loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) Source #

Loads a job from disk.

writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) Source #

Write a job to disk.

replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () Source #

Replicate many jobs to all master candidates.

writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())] Source #

Writes a job to a file and replicates it to master candidates.

readSerialFromDisk :: IO (Result JobId) Source #

Read the job serial number from disk.

allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId]) Source #

Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.

allocateJobId :: [Node] -> Lock -> IO (Result JobId) Source #

Allocate one new job id.

isQueueOpen :: IO Bool Source #

Decide if job queue is open

startJobs Source #


:: Livelock

Luxi's livelock path

-> Lock

lock for forking new processes

-> [QueuedJob]

the list of jobs to start

-> IO [ErrorResult QueuedJob] 

Start enqueued jobs by executing the Python code.

waitUntilJobExited Source #


:: Livelock

LuxiD's own livelock

-> QueuedJob

the job to wait for

-> Int

timeout in milliseconds

-> ResultG (Bool, String) 

Waits for a job's process to exit

cancelJob Source #


:: Bool

if True, use sigKILL instead of sigTERM

-> Livelock

Luxi's livelock path

-> JobId

the job to cancel

-> IO (ErrorResult (Bool, String)) 

Try to cancel a job that has already been handed over to execution, by terminating the process.

tellJobPriority Source #


:: Livelock

Luxi's livelock path

-> JobId

the job to inform

-> Int

the new priority

-> IO (ErrorResult (Bool, String)) 

Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.

notifyJob :: ProcessID -> IO (ErrorResult ()) Source #

Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.

queueDirPermissions :: FilePermissions Source #

Permissions for the archive directories.

archiveJobs Source #


:: ConfigData

cluster configuration

-> Int

time the job has to be in the past in order to be archived

-> Int


-> [JobId]

jobs to consider

-> IO (Int, Int) 

Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.