Safe Haskell | Safe-Infered |
---|
Implementation of the job queue.
- noTimestamp :: Timestamp
- fromClockTime :: ClockTime -> Timestamp
- currentTimestamp :: IO Timestamp
- advanceTimestamp :: Int -> Timestamp -> Timestamp
- toMetaOpCode :: InputOpCode -> [MetaOpCode]
- invalidOp :: String
- extractOpSummary :: InputOpCode -> String
- queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
- queuedJobFromOpCodes :: Monad m => JobId -> [MetaOpCode] -> m QueuedJob
- setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
- reasonTrailTimestamp :: Timestamp -> Integer
- extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode -> InputOpCode
- extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode -> QueuedOpCode
- extendJobReasonTrail :: QueuedJob -> QueuedJob
- getJobDependencies :: QueuedJob -> [JobId]
- changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
- cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
- changeJobPriority :: Int -> QueuedJob -> QueuedJob
- cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
- failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode
- failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
- jobFilePrefix :: String
- jobFileName :: JobId -> FilePath
- parseJobFileId :: Monad m => FilePath -> m JobId
- liveJobFile :: FilePath -> JobId -> FilePath
- archivedJobFile :: FilePath -> JobId -> FilePath
- opStatusToJob :: OpStatus -> JobStatus
- calcJobStatus :: QueuedJob -> JobStatus
- jobStarted :: QueuedJob -> Bool
- jobFinalized :: QueuedJob -> Bool
- jobArchivable :: Timestamp -> QueuedJob -> Bool
- opStatusFinalized :: OpStatus -> Bool
- calcJobPriority :: QueuedJob -> Int
- ignoreIOError :: a -> Bool -> String -> IOError -> IO a
- allArchiveDirs :: FilePath -> IO [FilePath]
- determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
- getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
- sortJobIDs :: [JobId] -> [JobId]
- getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
- readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
- noSuchJob :: Result (QueuedJob, Bool)
- loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
- writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
- replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
- replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
- writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())]
- readSerialFromDisk :: IO (Result JobId)
- allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
- allocateJobId :: [Node] -> Lock -> IO (Result JobId)
- isQueueOpen :: IO Bool
- startJobs :: Livelock -> Lock -> [QueuedJob] -> IO [ErrorResult QueuedJob]
- isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool
- waitForJob :: JobId -> Int -> ResultG (Bool, String)
- cancelJob :: Bool -> Livelock -> JobId -> IO (ErrorResult (Bool, String))
- tellJobPriority :: Livelock -> JobId -> Int -> IO (ErrorResult (Bool, String))
- notifyJob :: ProcessID -> IO (ErrorResult ())
- queueDirPermissions :: FilePermissions
- archiveSomeJobsUntil :: ([JobId] -> IO ()) -> FilePath -> ClockTime -> Timestamp -> Int -> [JobId] -> [JobId] -> IO (Int, Int)
- archiveJobs :: ConfigData -> Int -> Int -> [JobId] -> IO (Int, Int)
Data types
noTimestamp :: TimestampSource
Missing timestamp type.
fromClockTime :: ClockTime -> TimestampSource
Obtain a Timestamp from a given clock time
currentTimestamp :: IO TimestampSource
Get the current time in the job-queue timestamp format.
advanceTimestamp :: Int -> Timestamp -> TimestampSource
From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.
toMetaOpCode :: InputOpCode -> [MetaOpCode]Source
extractOpSummary :: InputOpCode -> StringSource
Tries to extract the opcode summary from an InputOpCode
. This
duplicates some functionality from the opSummary
function in
Ganeti.OpCodes.
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCodeSource
Convenience function to obtain a QueuedOpCode from a MetaOpCode
queuedJobFromOpCodes :: Monad m => JobId -> [MetaOpCode] -> m QueuedJobSource
From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJobSource
Attach a received timestamp to a Queued Job.
reasonTrailTimestamp :: Timestamp -> IntegerSource
Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode -> InputOpCodeSource
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode -> QueuedOpCodeSource
extendJobReasonTrail :: QueuedJob -> QueuedJobSource
Append an element to the reason trail of all the OpCodes of a queued job.
getJobDependencies :: QueuedJob -> [JobId]Source
From a queued job obtain the list of jobs it depends on.
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCodeSource
Change the priority of a QueuedOpCode, if it is not already finalized.
changeJobPriority :: Int -> QueuedJob -> QueuedJobSource
Change the priority of a job, i.e., change the priority of the non-finalized opcodes.
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJobSource
Transform a QueuedJob that has not been started into its canceled form.
failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCodeSource
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJobSource
Transform a QueuedJob that has not been started into its failed form.
jobFilePrefix :: StringSource
jobFileName :: JobId -> FilePathSource
Computes the filename for a given job ID.
parseJobFileId :: Monad m => FilePath -> m JobIdSource
liveJobFile :: FilePath -> JobId -> FilePathSource
Computes the full path to a live job.
archivedJobFile :: FilePath -> JobId -> FilePathSource
Computes the full path to an archives job. BROKEN.
calcJobStatus :: QueuedJob -> JobStatusSource
Computes a queued job's status.
jobStarted :: QueuedJob -> BoolSource
Determine if a job has started
jobFinalized :: QueuedJob -> BoolSource
Determine if a job is finalised.
jobArchivable :: Timestamp -> QueuedJob -> BoolSource
Determine if a job is finalized and its timestamp is before a given time.
opStatusFinalized :: OpStatus -> BoolSource
Determine whether an opcode status is finalized.
calcJobPriority :: QueuedJob -> IntSource
Compute a job's priority.
ignoreIOError :: a -> Bool -> String -> IOError -> IO aSource
allArchiveDirs :: FilePath -> IO [FilePath]Source
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]Source
Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])Source
Computes the list of all jobs in the given directories.
sortJobIDs :: [JobId] -> [JobId]Source
Sorts the a list of job IDs.
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]Source
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))Source
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))Source
Loads a job from disk.
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())Source
Write a job to disk.
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()Source
Replicate many jobs to all master candidates.
writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())]Source
Writes a job to a file and replicates it to master candidates.
readSerialFromDisk :: IO (Result JobId)Source
Read the job serial number from disk.
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])Source
Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.
isQueueOpen :: IO BoolSource
Decide if job queue is open
:: Livelock | Luxi's livelock path |
-> Lock | lock for forking new processes |
-> [QueuedJob] | the list of jobs to start |
-> IO [ErrorResult QueuedJob] |
Start enqueued jobs by executing the Python code.
isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m BoolSource
waitForJob :: JobId -> Int -> ResultG (Bool, String)Source
:: Bool | if True, use sigKILL instead of sigTERM |
-> Livelock | Luxi's livelock path |
-> JobId | the job to cancel |
-> IO (ErrorResult (Bool, String)) |
Try to cancel a job that has already been handed over to execution, by terminating the process.
:: Livelock | Luxi's livelock path |
-> JobId | the job to inform |
-> Int | the new priority |
-> IO (ErrorResult (Bool, String)) |
Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.
notifyJob :: ProcessID -> IO (ErrorResult ())Source
Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.
queueDirPermissions :: FilePermissionsSource
Permissions for the archive directories.
archiveSomeJobsUntil :: ([JobId] -> IO ()) -> FilePath -> ClockTime -> Timestamp -> Int -> [JobId] -> [JobId] -> IO (Int, Int)Source
:: ConfigData | cluster configuration |
-> Int | time the job has to be in the past in order to be archived |
-> Int | timeout |
-> [JobId] | jobs to consider |
-> IO (Int, Int) |
Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.