Safe HaskellNone




Implementation of the job queue.


Data types

noTimestamp :: Timestamp Source #

Missing timestamp type.

fromClockTime :: ClockTime -> Timestamp Source #

Obtain a Timestamp from a given clock time

currentTimestamp :: IO Timestamp Source #

Get the current time in the job-queue timestamp format.

advanceTimestamp :: Int -> Timestamp -> Timestamp Source #

From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.

toMetaOpCode :: InputOpCode -> [MetaOpCode] Source #

From an InputOpCode obtain the MetaOpCode, if any.

invalidOp :: String Source #

Invalid opcode summary.

extractOpSummary :: InputOpCode -> String Source #

Tries to extract the opcode summary from an InputOpCode. This duplicates some functionality from the opSummary function in Ganeti.OpCodes.

queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode Source #

Convenience function to obtain a QueuedOpCode from a MetaOpCode

queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob Source #

From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.

setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob Source #

Attach a received timestamp to a Queued Job.

reasonTrailTimestamp :: Timestamp -> Integer Source #

Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.

extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode -> InputOpCode Source #

Append an element to the reason trail of an input opcode.

extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode -> QueuedOpCode Source #

Append an element to the reason trail of a queued opcode.

extendJobReasonTrail :: QueuedJob -> QueuedJob Source #

Append an element to the reason trail of all the OpCodes of a queued job.

getJobDependencies :: QueuedJob -> [JobId] Source #

From a queued job obtain the list of jobs it depends on.

changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode Source #

Change the priority of a QueuedOpCode, if it is not already finalized.

cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode Source #

Set the state of a QueuedOpCode to canceled.

changeJobPriority :: Int -> QueuedJob -> QueuedJob Source #

Change the priority of a job, i.e., change the priority of the non-finalized opcodes.

cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob Source #

Transform a QueuedJob that has not been started into its canceled form.

failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode Source #

Set the state of a QueuedOpCode to failed and set the Op result using the given reason message.

failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob Source #

Transform a QueuedJob that has not been started into its failed form.

jobFilePrefix :: String Source #

Job file prefix.

jobFileName :: JobId -> FilePath Source #

Computes the filename for a given job ID.

parseJobFileId :: MonadFail m => FilePath -> m JobId Source #

Parses a job ID from a file name.

liveJobFile :: FilePath -> JobId -> FilePath Source #

Computes the full path to a live job.

archivedJobFile :: FilePath -> JobId -> FilePath Source #

Computes the full path to an archives job. BROKEN.

opStatusToJob :: OpStatus -> JobStatus Source #

Map from opcode status to job status.

calcJobStatus :: QueuedJob -> JobStatus Source #

Computes a queued job's status.

jobStarted :: QueuedJob -> Bool Source #

Determine if a job has started

jobFinalized :: QueuedJob -> Bool Source #

Determine if a job is finalised.

jobArchivable :: Timestamp -> QueuedJob -> Bool Source #

Determine if a job is finalized and its timestamp is before a given time.

opStatusFinalized :: OpStatus -> Bool Source #

Determine whether an opcode status is finalized.

calcJobPriority :: QueuedJob -> Int Source #

Compute a job's priority.

ignoreIOError :: a -> Bool -> String -> IOError -> IO a Source #

Log but ignore an IOError.

allArchiveDirs :: FilePath -> IO [FilePath] Source #

Compute the list of existing archive directories. Note that I/O exceptions are swallowed and ignored.

determineJobDirectories :: FilePath -> Bool -> IO [FilePath] Source #

Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.

getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) Source #

Computes the list of all jobs in the given directories.

sortJobIDs :: [JobId] -> [JobId] Source #

Sorts the a list of job IDs.

getDirJobIDs :: FilePath -> ResultT IOError IO [JobId] Source #

Computes the list of jobs in a given directory.

readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) Source #

Reads the job data from disk.

noSuchJob :: Result (QueuedJob, Bool) Source #

Failed to load job error.

loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) Source #

Loads a job from disk.

writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) Source #

Write a job to disk.

replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())] Source #

Replicate a job to all master candidates.

replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () Source #

Replicate many jobs to all master candidates.

writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())] Source #

Writes a job to a file and replicates it to master candidates.

readSerialFromDisk :: IO (Result JobId) Source #

Read the job serial number from disk.

allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId]) Source #

Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.

allocateJobId :: [Node] -> Lock -> IO (Result JobId) Source #

Allocate one new job id.

isQueueOpen :: IO Bool Source #

Decide if job queue is open

startJobs Source #


:: Livelock

Luxi's livelock path

-> Lock

lock for forking new processes

-> [QueuedJob]

the list of jobs to start

-> IO [ErrorResult QueuedJob] 

Start enqueued jobs by executing the Python code.

isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool Source #

Try to prove that a queued job is dead. This function needs to know the livelock of the caller (i.e., luxid) to avoid considering a job dead that is in the process of forking off.

waitUntilJobExited Source #


:: Livelock

LuxiD's own livelock

-> QueuedJob

the job to wait for

-> Int

timeout in milliseconds

-> ResultG (Bool, String) 

Waits for a job's process to exit

waitForJobCancelation :: JobId -> Int -> ResultG (Bool, String) Source #

Waits for a job ordered to cancel to react, and returns whether it was canceled, and a user-intended description of the reason.

cancelJob Source #


:: Bool

if True, use sigKILL instead of sigTERM

-> Livelock

Luxi's livelock path

-> JobId

the job to cancel

-> IO (ErrorResult (Bool, String)) 

Try to cancel a job that has already been handed over to execution, by terminating the process.

tellJobPriority Source #


:: Livelock

Luxi's livelock path

-> JobId

the job to inform

-> Int

the new priority

-> IO (ErrorResult (Bool, String)) 

Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.

notifyJob :: ProcessID -> IO (ErrorResult ()) Source #

Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.

queueDirPermissions :: FilePermissions Source #

Permissions for the archive directories.

archiveSomeJobsUntil Source #


:: ([JobId] -> IO ())

replication function

-> FilePath

queue root directory

-> ClockTime


-> Timestamp

cut-off time for archiving jobs

-> Int

number of jobs alread archived

-> [JobId]

Additional jobs to replicate

-> [JobId]

List of job-ids still to consider

-> IO (Int, Int) 

Try, at most until the given endtime, to archive some of the given jobs, if they are older than the specified cut-off time; also replicate archival of the additional jobs. Return the pair of the number of jobs archived, and the number of jobs remaining int he queue, asuming the given numbers about the not considered jobs.

archiveJobs Source #


:: ConfigData

cluster configuration

-> Int

time the job has to be in the past in order to be archived

-> Int


-> [JobId]

jobs to consider

-> IO (Int, Int) 

Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.