Safe Haskell | None |
---|
Implementation of the job queue.
Synopsis
- noTimestamp :: Timestamp
- fromClockTime :: ClockTime -> Timestamp
- currentTimestamp :: IO Timestamp
- advanceTimestamp :: Int -> Timestamp -> Timestamp
- toMetaOpCode :: InputOpCode -> [MetaOpCode]
- invalidOp :: String
- extractOpSummary :: InputOpCode -> String
- queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
- queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob
- setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
- reasonTrailTimestamp :: Timestamp -> Integer
- extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode -> InputOpCode
- extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode -> QueuedOpCode
- extendJobReasonTrail :: QueuedJob -> QueuedJob
- getJobDependencies :: QueuedJob -> [JobId]
- changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
- cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
- changeJobPriority :: Int -> QueuedJob -> QueuedJob
- cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
- failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode
- failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
- jobFilePrefix :: String
- jobFileName :: JobId -> FilePath
- parseJobFileId :: MonadFail m => FilePath -> m JobId
- liveJobFile :: FilePath -> JobId -> FilePath
- archivedJobFile :: FilePath -> JobId -> FilePath
- opStatusToJob :: OpStatus -> JobStatus
- calcJobStatus :: QueuedJob -> JobStatus
- jobStarted :: QueuedJob -> Bool
- jobFinalized :: QueuedJob -> Bool
- jobArchivable :: Timestamp -> QueuedJob -> Bool
- opStatusFinalized :: OpStatus -> Bool
- calcJobPriority :: QueuedJob -> Int
- ignoreIOError :: a -> Bool -> String -> IOError -> IO a
- allArchiveDirs :: FilePath -> IO [FilePath]
- determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
- getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
- sortJobIDs :: [JobId] -> [JobId]
- getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
- readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
- noSuchJob :: Result (QueuedJob, Bool)
- loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
- writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
- replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
- replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
- writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())]
- readSerialFromDisk :: IO (Result JobId)
- allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
- allocateJobId :: [Node] -> Lock -> IO (Result JobId)
- isQueueOpen :: IO Bool
- startJobs :: Livelock -> Lock -> [QueuedJob] -> IO [ErrorResult QueuedJob]
- isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool
- waitUntilJobExited :: Livelock -> QueuedJob -> Int -> ResultG (Bool, String)
- waitForJobCancelation :: JobId -> Int -> ResultG (Bool, String)
- cancelJob :: Bool -> Livelock -> JobId -> IO (ErrorResult (Bool, String))
- tellJobPriority :: Livelock -> JobId -> Int -> IO (ErrorResult (Bool, String))
- notifyJob :: ProcessID -> IO (ErrorResult ())
- queueDirPermissions :: FilePermissions
- archiveSomeJobsUntil :: ([JobId] -> IO ()) -> FilePath -> ClockTime -> Timestamp -> Int -> [JobId] -> [JobId] -> IO (Int, Int)
- archiveJobs :: ConfigData -> Int -> Int -> [JobId] -> IO (Int, Int)
Data types
noTimestamp :: Timestamp Source #
Missing timestamp type.
fromClockTime :: ClockTime -> Timestamp Source #
Obtain a Timestamp from a given clock time
currentTimestamp :: IO Timestamp Source #
Get the current time in the job-queue timestamp format.
advanceTimestamp :: Int -> Timestamp -> Timestamp Source #
From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.
toMetaOpCode :: InputOpCode -> [MetaOpCode] Source #
From an InputOpCode obtain the MetaOpCode, if any.
extractOpSummary :: InputOpCode -> String Source #
Tries to extract the opcode summary from an InputOpCode
. This
duplicates some functionality from the opSummary
function in
Ganeti.OpCodes.
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode Source #
Convenience function to obtain a QueuedOpCode from a MetaOpCode
queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob Source #
From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob Source #
Attach a received timestamp to a Queued Job.
reasonTrailTimestamp :: Timestamp -> Integer Source #
Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode -> InputOpCode Source #
Append an element to the reason trail of an input opcode.
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode -> QueuedOpCode Source #
Append an element to the reason trail of a queued opcode.
extendJobReasonTrail :: QueuedJob -> QueuedJob Source #
Append an element to the reason trail of all the OpCodes of a queued job.
getJobDependencies :: QueuedJob -> [JobId] Source #
From a queued job obtain the list of jobs it depends on.
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode Source #
Change the priority of a QueuedOpCode, if it is not already finalized.
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode Source #
Set the state of a QueuedOpCode to canceled.
changeJobPriority :: Int -> QueuedJob -> QueuedJob Source #
Change the priority of a job, i.e., change the priority of the non-finalized opcodes.
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob Source #
Transform a QueuedJob that has not been started into its canceled form.
failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode Source #
Set the state of a QueuedOpCode to failed and set the Op result using the given reason message.
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob Source #
Transform a QueuedJob that has not been started into its failed form.
jobFilePrefix :: String Source #
Job file prefix.
jobFileName :: JobId -> FilePath Source #
Computes the filename for a given job ID.
parseJobFileId :: MonadFail m => FilePath -> m JobId Source #
Parses a job ID from a file name.
liveJobFile :: FilePath -> JobId -> FilePath Source #
Computes the full path to a live job.
archivedJobFile :: FilePath -> JobId -> FilePath Source #
Computes the full path to an archives job. BROKEN.
opStatusToJob :: OpStatus -> JobStatus Source #
Map from opcode status to job status.
calcJobStatus :: QueuedJob -> JobStatus Source #
Computes a queued job's status.
jobStarted :: QueuedJob -> Bool Source #
Determine if a job has started
jobFinalized :: QueuedJob -> Bool Source #
Determine if a job is finalised.
jobArchivable :: Timestamp -> QueuedJob -> Bool Source #
Determine if a job is finalized and its timestamp is before a given time.
opStatusFinalized :: OpStatus -> Bool Source #
Determine whether an opcode status is finalized.
calcJobPriority :: QueuedJob -> Int Source #
Compute a job's priority.
ignoreIOError :: a -> Bool -> String -> IOError -> IO a Source #
Log but ignore an IOError
.
allArchiveDirs :: FilePath -> IO [FilePath] Source #
Compute the list of existing archive directories. Note that I/O exceptions are swallowed and ignored.
determineJobDirectories :: FilePath -> Bool -> IO [FilePath] Source #
Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) Source #
Computes the list of all jobs in the given directories.
sortJobIDs :: [JobId] -> [JobId] Source #
Sorts the a list of job IDs.
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId] Source #
Computes the list of jobs in a given directory.
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) Source #
Reads the job data from disk.
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) Source #
Loads a job from disk.
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) Source #
Write a job to disk.
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())] Source #
Replicate a job to all master candidates.
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () Source #
Replicate many jobs to all master candidates.
writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())] Source #
Writes a job to a file and replicates it to master candidates.
readSerialFromDisk :: IO (Result JobId) Source #
Read the job serial number from disk.
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId]) Source #
Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.
isQueueOpen :: IO Bool Source #
Decide if job queue is open
:: Livelock | Luxi's livelock path |
-> Lock | lock for forking new processes |
-> [QueuedJob] | the list of jobs to start |
-> IO [ErrorResult QueuedJob] |
Start enqueued jobs by executing the Python code.
isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool Source #
Try to prove that a queued job is dead. This function needs to know the livelock of the caller (i.e., luxid) to avoid considering a job dead that is in the process of forking off.
:: Livelock | LuxiD's own livelock |
-> QueuedJob | the job to wait for |
-> Int | timeout in milliseconds |
-> ResultG (Bool, String) |
Waits for a job's process to exit
waitForJobCancelation :: JobId -> Int -> ResultG (Bool, String) Source #
Waits for a job ordered to cancel to react, and returns whether it was canceled, and a user-intended description of the reason.
:: Bool | if True, use sigKILL instead of sigTERM |
-> Livelock | Luxi's livelock path |
-> JobId | the job to cancel |
-> IO (ErrorResult (Bool, String)) |
Try to cancel a job that has already been handed over to execution, by terminating the process.
:: Livelock | Luxi's livelock path |
-> JobId | the job to inform |
-> Int | the new priority |
-> IO (ErrorResult (Bool, String)) |
Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.
notifyJob :: ProcessID -> IO (ErrorResult ()) Source #
Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.
queueDirPermissions :: FilePermissions Source #
Permissions for the archive directories.
:: ([JobId] -> IO ()) | replication function |
-> FilePath | queue root directory |
-> ClockTime | Endtime |
-> Timestamp | cut-off time for archiving jobs |
-> Int | number of jobs alread archived |
-> [JobId] | Additional jobs to replicate |
-> [JobId] | List of job-ids still to consider |
-> IO (Int, Int) |
Try, at most until the given endtime, to archive some of the given jobs, if they are older than the specified cut-off time; also replicate archival of the additional jobs. Return the pair of the number of jobs archived, and the number of jobs remaining int he queue, asuming the given numbers about the not considered jobs.
:: ConfigData | cluster configuration |
-> Int | time the job has to be in the past in order to be archived |
-> Int | timeout |
-> [JobId] | jobs to consider |
-> IO (Int, Int) |
Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.