Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Ganeti.JQueue
Description
Implementation of the job queue.
Synopsis
- queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
- queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob
- changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
- changeJobPriority :: Int -> QueuedJob -> QueuedJob
- cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
- failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
- fromClockTime :: ClockTime -> Timestamp
- noTimestamp :: Timestamp
- currentTimestamp :: IO Timestamp
- advanceTimestamp :: Int -> Timestamp -> Timestamp
- reasonTrailTimestamp :: Timestamp -> Integer
- setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
- extendJobReasonTrail :: QueuedJob -> QueuedJob
- getJobDependencies :: QueuedJob -> [JobId]
- opStatusFinalized :: OpStatus -> Bool
- extractOpSummary :: InputOpCode -> String
- calcJobStatus :: QueuedJob -> JobStatus
- jobStarted :: QueuedJob -> Bool
- jobFinalized :: QueuedJob -> Bool
- jobArchivable :: Timestamp -> QueuedJob -> Bool
- calcJobPriority :: QueuedJob -> Int
- jobFileName :: JobId -> FilePath
- liveJobFile :: FilePath -> JobId -> FilePath
- archivedJobFile :: FilePath -> JobId -> FilePath
- determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
- getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
- sortJobIDs :: [JobId] -> [JobId]
- loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
- noSuchJob :: Result (QueuedJob, Bool)
- readSerialFromDisk :: IO (Result JobId)
- allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
- allocateJobId :: [Node] -> Lock -> IO (Result JobId)
- writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
- replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
- writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())]
- isQueueOpen :: IO Bool
- startJobs :: Livelock -> Lock -> [QueuedJob] -> IO [ErrorResult QueuedJob]
- cancelJob :: Bool -> Livelock -> JobId -> IO (ErrorResult (Bool, String))
- tellJobPriority :: Livelock -> JobId -> Int -> IO (ErrorResult (Bool, String))
- notifyJob :: ProcessID -> IO (ErrorResult ())
- waitUntilJobExited :: Livelock -> QueuedJob -> Int -> ResultG (Bool, String)
- queueDirPermissions :: FilePermissions
- archiveJobs :: ConfigData -> Int -> Int -> [JobId] -> IO (Int, Int)
- type Timestamp = (Int, Int)
- data InputOpCode
- = ValidOpCode MetaOpCode
- | InvalidOpCode JSValue
- data QueuedOpCode = QueuedOpCode {
- qoInput :: InputOpCode
- qoStatus :: OpStatus
- qoResult :: JSValue
- qoLog :: [(Int, Timestamp, ELogType, JSValue)]
- qoPriority :: Int
- qoStartTimestamp :: (Maybe Timestamp)
- qoExecTimestamp :: (Maybe Timestamp)
- qoEndTimestamp :: (Maybe Timestamp)
- data QueuedJob = QueuedJob {
- qjId :: JobId
- qjOps :: [QueuedOpCode]
- qjReceivedTimestamp :: (Maybe Timestamp)
- qjStartTimestamp :: (Maybe Timestamp)
- qjEndTimestamp :: (Maybe Timestamp)
- qjLivelock :: (Maybe FilePath)
- qjProcessId :: (Maybe ProcessID)
Documentation
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode #
Convenience function to obtain a QueuedOpCode from a MetaOpCode
queuedJobFromOpCodes :: MonadFail m => JobId -> [MetaOpCode] -> m QueuedJob #
From a job-id and a list of op-codes create a job. This is the pure part of job creation, as allocating a new job id lives in IO.
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode #
Change the priority of a QueuedOpCode, if it is not already finalized.
changeJobPriority :: Int -> QueuedJob -> QueuedJob #
Change the priority of a job, i.e., change the priority of the non-finalized opcodes.
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob #
Transform a QueuedJob that has not been started into its canceled form.
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob #
Transform a QueuedJob that has not been started into its failed form.
fromClockTime :: ClockTime -> Timestamp #
Obtain a Timestamp from a given clock time
Missing timestamp type.
currentTimestamp :: IO Timestamp #
Get the current time in the job-queue timestamp format.
advanceTimestamp :: Int -> Timestamp -> Timestamp #
From a given timestamp, obtain the timestamp of the time that is the given number of seconds later.
reasonTrailTimestamp :: Timestamp -> Integer #
Build a timestamp in the format expected by the reason trail (nanoseconds) starting from a JQueue Timestamp.
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob #
Attach a received timestamp to a Queued Job.
extendJobReasonTrail :: QueuedJob -> QueuedJob #
Append an element to the reason trail of all the OpCodes of a queued job.
getJobDependencies :: QueuedJob -> [JobId] #
From a queued job obtain the list of jobs it depends on.
opStatusFinalized :: OpStatus -> Bool #
Determine whether an opcode status is finalized.
extractOpSummary :: InputOpCode -> String #
Tries to extract the opcode summary from an InputOpCode
. This
duplicates some functionality from the opSummary
function in
Ganeti.OpCodes.
calcJobStatus :: QueuedJob -> JobStatus #
Computes a queued job's status.
jobStarted :: QueuedJob -> Bool #
Determine if a job has started
jobFinalized :: QueuedJob -> Bool #
Determine if a job is finalised.
jobArchivable :: Timestamp -> QueuedJob -> Bool #
Determine if a job is finalized and its timestamp is before a given time.
calcJobPriority :: QueuedJob -> Int #
Compute a job's priority.
jobFileName :: JobId -> FilePath #
Computes the filename for a given job ID.
liveJobFile :: FilePath -> JobId -> FilePath #
Computes the full path to a live job.
archivedJobFile :: FilePath -> JobId -> FilePath #
Computes the full path to an archives job. BROKEN.
determineJobDirectories :: FilePath -> Bool -> IO [FilePath] #
Build list of directories containing job files. Note: compared to the Python version, this doesn't ignore a potential lost+found file.
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) #
Computes the list of all jobs in the given directories.
sortJobIDs :: [JobId] -> [JobId] #
Sorts the a list of job IDs.
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) #
Loads a job from disk.
readSerialFromDisk :: IO (Result JobId) #
Read the job serial number from disk.
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId]) #
Allocate new job ids. To avoid races while accessing the serial file, the threads synchronize over a lock, as usual provided by a Lock.
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) #
Write a job to disk.
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () #
Replicate many jobs to all master candidates.
writeAndReplicateJob :: Error e => ConfigData -> FilePath -> QueuedJob -> ResultT e IO [(Node, ERpcError ())] #
Writes a job to a file and replicates it to master candidates.
isQueueOpen :: IO Bool #
Decide if job queue is open
Arguments
:: Livelock | Luxi's livelock path |
-> Lock | lock for forking new processes |
-> [QueuedJob] | the list of jobs to start |
-> IO [ErrorResult QueuedJob] |
Start enqueued jobs by executing the Python code.
Arguments
:: Bool | if True, use sigKILL instead of sigTERM |
-> Livelock | Luxi's livelock path |
-> JobId | the job to cancel |
-> IO (ErrorResult (Bool, String)) |
Try to cancel a job that has already been handed over to execution, by terminating the process.
Arguments
:: Livelock | Luxi's livelock path |
-> JobId | the job to inform |
-> Int | the new priority |
-> IO (ErrorResult (Bool, String)) |
Inform a job that it is requested to change its priority. This is done by writing the new priority to a file and sending SIGUSR1.
notifyJob :: ProcessID -> IO (ErrorResult ()) #
Notify a job that something relevant happened, e.g., a lock became available. We do this by sending sigHUP to the process.
Arguments
:: Livelock | LuxiD's own livelock |
-> QueuedJob | the job to wait for |
-> Int | timeout in milliseconds |
-> ResultG (Bool, String) |
Waits for a job's process to exit
queueDirPermissions :: FilePermissions #
Permissions for the archive directories.
Arguments
:: ConfigData | cluster configuration |
-> Int | time the job has to be in the past in order to be archived |
-> Int | timeout |
-> [JobId] | jobs to consider |
-> IO (Int, Int) |
Archive jobs older than the given time, but do not exceed the timeout for carrying out this task.
The ganeti queue timestamp type. It represents the time as the pair of seconds since the epoch and microseconds since the beginning of the second.
data InputOpCode #
An input opcode.
Constructors
ValidOpCode MetaOpCode | OpCode was parsed successfully |
InvalidOpCode JSValue | Invalid opcode |
Instances
Show InputOpCode # | |
Defined in Ganeti.JQueue.Objects Methods showsPrec :: Int -> InputOpCode -> ShowS show :: InputOpCode -> String showList :: [InputOpCode] -> ShowS | |
Eq InputOpCode # | |
Defined in Ganeti.JQueue.Objects | |
Ord InputOpCode # | |
Defined in Ganeti.JQueue.Objects Methods compare :: InputOpCode -> InputOpCode -> Ordering (<) :: InputOpCode -> InputOpCode -> Bool (<=) :: InputOpCode -> InputOpCode -> Bool (>) :: InputOpCode -> InputOpCode -> Bool (>=) :: InputOpCode -> InputOpCode -> Bool max :: InputOpCode -> InputOpCode -> InputOpCode min :: InputOpCode -> InputOpCode -> InputOpCode | |
JSON InputOpCode # | JSON instance for |
Defined in Ganeti.JQueue.Objects Methods readJSON :: JSValue -> Result InputOpCode showJSON :: InputOpCode -> JSValue readJSONs :: JSValue -> Result [InputOpCode] showJSONs :: [InputOpCode] -> JSValue |
data QueuedOpCode #
Constructors
QueuedOpCode | |
Fields
|
Instances
Constructors
QueuedJob | |
Fields
|
Instances
Show QueuedJob # | |
ArrayObject QueuedJob # | |
Defined in Ganeti.JQueue.Objects | |
DictObject QueuedJob # | |
Defined in Ganeti.JQueue.Objects | |
Eq QueuedJob # | |
Ord QueuedJob # | |
Defined in Ganeti.JQueue.Objects | |
JSON QueuedJob # | |