Safe HaskellSafe-Infered




Implementation of the job queue.


Data types

noTimestamp :: TimestampSource

Missing timestamp type.

fromClockTime :: ClockTime -> TimestampSource

Obtain a Timestamp from a given clock time

currentTimestamp :: IO TimestampSource

Get the current time in the job-queue timestamp format.

advanceTimestamp :: Int -> Timestamp -> TimestampSource

From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.

invalidOp :: StringSource

extractOpSummary :: InputOpCode -> StringSource

Tries to extract the opcode summary from an InputOpCode. This duplicates some functionality from the opSummary function in Ganeti.OpCodes.

queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCodeSource

Convenience function to obtain a QueuedOpCode from a MetaOpCode

queuedJobFromOpCodes :: Monad m => JobId -> [MetaOpCode] -> m QueuedJobSource

From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.

setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJobSource

Attach a received timestamp to a Queued Job.

reasonTrailTimestamp :: Timestamp -> IntegerSource

Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.

extendJobReasonTrail :: QueuedJob -> QueuedJobSource

Append an element to the reason trail of all the OpCodes of a queued job.

getJobDependencies :: QueuedJob -> [JobId]Source

From a queued job obtain the list of jobs it depends on.

changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCodeSource

Change the priority of a QueuedOpCode, if it is not already finalized.

changeJobPriority :: Int -> QueuedJob -> QueuedJobSource

Change the priority of a job, i.e., change the priority of the non-finalized opcodes.

cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJobSource

Transform a QueuedJob that has not been started into its canceled form.

failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJobSource

Transform a QueuedJob that has not been started into its failed form.

jobFileName :: JobId -> FilePathSource

Computes the filename for a given job ID.

parseJobFileId :: Monad m => FilePath -> m JobIdSource

liveJobFile :: FilePath -> JobId -> FilePathSource

Computes the full path to a live job.

archivedJobFile :: FilePath -> JobId -> FilePathSource

Computes the full path to an archives job. BROKEN.

calcJobStatus :: QueuedJob -> JobStatusSource

Computes a queued job's status.

jobStarted :: QueuedJob -> BoolSource

Determine if a job has started

jobFinalized :: QueuedJob -> BoolSource

Determine if a job is finalised.

jobArchivable :: Timestamp -> QueuedJob -> BoolSource

Determine if a job is finalized and its timestamp is before a given time.

opStatusFinalized :: OpStatus -> BoolSource

Determine whether an opcode status is finalized.

calcJobPriority :: QueuedJob -> IntSource

Compute a job's priority.

ignoreIOError :: a -> Bool -> String -> IOError -> IO aSource

allArchiveDirs :: FilePath -> IO [FilePath]Source

determineJobDirectories :: FilePath -> Bool -> IO [FilePath]Source

Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.

getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])Source

Computes the list of all jobs in the given directories.

sortJobIDs :: [JobId] -> [JobId]Source

Sorts the a list of job IDs.

getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]Source

readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))Source

noSuchJob :: Result (QueuedJob, Bool)Source

Failed to load job error.

loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))Source

Loads a job from disk.

writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())Source

Write a job to disk.

replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]Source

replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()Source

Replicate many jobs to all master candidates.

writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())]Source

Writes a job to a file and replicates it to master candidates.

readSerialFromDisk :: IO (Result JobId)Source

Read the job serial number from disk.

allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])Source

Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.

allocateJobId :: [Node] -> Lock -> IO (Result JobId)Source

Allocate one new job id.

isQueueOpen :: IO BoolSource

Decide if job queue is open



:: Livelock

Luxi's livelock path

-> Lock

lock for forking new processes

-> [QueuedJob]

the list of jobs to start

-> IO [ErrorResult QueuedJob] 

Start enqueued jobs by executing the Python code.

isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m BoolSource

waitForJob :: JobId -> Int -> ResultG (Bool, String)Source



:: Bool

if True, use sigKILL instead of sigTERM

-> Livelock

Luxi's livelock path

-> JobId

the job to cancel

-> IO (ErrorResult (Bool, String)) 

Try to cancel a job that has already been handed over to execution, by terminating the process.



:: Livelock

Luxi's livelock path

-> JobId

the job to inform

-> Int

the new priority

-> IO (ErrorResult (Bool, String)) 

Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.

notifyJob :: ProcessID -> IO (ErrorResult ())Source

Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.

queueDirPermissions :: FilePermissionsSource

Permissions for the archive directories.

archiveSomeJobsUntil :: ([JobId] -> IO ()) -> FilePath -> ClockTime -> Timestamp -> Int -> [JobId] -> [JobId] -> IO (Int, Int)Source



:: ConfigData

cluster configuration

-> Int

time the job has to be in the past in order to be archived

-> Int


-> [JobId]

jobs to consider

-> IO (Int, Int) 

Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.