module Ganeti.JQueue
( QueuedOpCode(..)
, QueuedJob(..)
, InputOpCode(..)
, queuedOpCodeFromMetaOpCode
, queuedJobFromOpCodes
, changeOpCodePriority
, changeJobPriority
, cancelQueuedJob
, Timestamp
, fromClockTime
, noTimestamp
, currentTimestamp
, advanceTimestamp
, setReceivedTimestamp
, extendJobReasonTrail
, opStatusFinalized
, extractOpSummary
, calcJobStatus
, jobStarted
, jobFinalized
, jobArchivable
, calcJobPriority
, jobFileName
, liveJobFile
, archivedJobFile
, determineJobDirectories
, getJobIDs
, sortJobIDs
, loadJobFromDisk
, noSuchJob
, readSerialFromDisk
, allocateJobIds
, allocateJobId
, writeJobToDisk
, replicateManyJobs
, isQueueOpen
, startJobs
, cancelJob
, queueDirPermissions
, archiveJobs
) where
import Control.Applicative (liftA2, (<|>), (<$>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Data.Functor ((<$))
import Data.List
import Data.Maybe
import Data.Ord (comparing)
import Prelude hiding (id, log)
import System.Directory
import System.FilePath
import System.IO.Error (isDoesNotExistError)
import System.Posix.Files
import System.Time
import qualified Text.JSON
import Text.JSON.Types
import Ganeti.BasicTypes
import qualified Ganeti.Config as Config
import qualified Ganeti.Constants as C
import Ganeti.Errors (ErrorResult)
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.Luxi
import Ganeti.Objects (ConfigData, Node)
import Ganeti.OpCodes
import Ganeti.Path
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.THH
import Ganeti.Types
import Ganeti.Utils
import Ganeti.VCluster (makeVirtualPath)
type Timestamp = (Int, Int)
noTimestamp :: Timestamp
noTimestamp = (1, 1)
fromClockTime :: ClockTime -> Timestamp
fromClockTime (TOD ctime pico) =
(fromIntegral ctime, fromIntegral $ pico `div` 1000000)
currentTimestamp :: IO Timestamp
currentTimestamp = fromClockTime `liftM` getClockTime
advanceTimestamp :: Int -> Timestamp -> Timestamp
advanceTimestamp = first . (+)
data InputOpCode = ValidOpCode MetaOpCode
| InvalidOpCode JSValue
deriving (Show, Eq)
instance Text.JSON.JSON InputOpCode where
showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
showJSON (InvalidOpCode inv) = inv
readJSON v = case Text.JSON.readJSON v of
Text.JSON.Error _ -> return $ InvalidOpCode v
Text.JSON.Ok mo -> return $ ValidOpCode mo
invalidOp :: String
invalidOp = "INVALID_OP"
extractOpSummary :: InputOpCode -> String
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
extractOpSummary (InvalidOpCode (JSObject o)) =
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
Just s -> drop 3 s
Nothing -> invalidOp
extractOpSummary _ = invalidOp
$(buildObject "QueuedOpCode" "qo"
[ simpleField "input" [t| InputOpCode |]
, simpleField "status" [t| OpStatus |]
, simpleField "result" [t| JSValue |]
, defaultField [| [] |] $
simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |]
, simpleField "priority" [t| Int |]
, optionalNullSerField $
simpleField "start_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "exec_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "end_timestamp" [t| Timestamp |]
])
$(buildObject "QueuedJob" "qj"
[ simpleField "id" [t| JobId |]
, simpleField "ops" [t| [QueuedOpCode] |]
, optionalNullSerField $
simpleField "received_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "start_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "end_timestamp" [t| Timestamp |]
])
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
queuedOpCodeFromMetaOpCode op =
QueuedOpCode { qoInput = ValidOpCode op
, qoStatus = OP_STATUS_QUEUED
, qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
$ op
, qoLog = []
, qoResult = JSNull
, qoStartTimestamp = Nothing
, qoEndTimestamp = Nothing
, qoExecTimestamp = Nothing
}
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
queuedJobFromOpCodes jobid ops = do
ops' <- mapM (`resolveDependencies` jobid) ops
return QueuedJob { qjId = jobid
, qjOps = map queuedOpCodeFromMetaOpCode ops'
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
}
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
reasonTrailTimestamp :: Timestamp -> Integer
reasonTrailTimestamp (sec, micro) =
let sec' = toInteger sec
micro' = toInteger micro
in sec' * 1000000000 + micro' * 1000
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
-> InputOpCode
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
let metaP = metaParams vOp
op = metaOpCode vOp
trail = opReason metaP
reasonSrc = opReasonSrcID op
reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
trail' = trail ++ [reason]
in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
-> QueuedOpCode
extendOpCodeReasonTrail jid ts i op =
let inOp = qoInput op
in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
extendJobReasonTrail :: QueuedJob -> QueuedJob
extendJobReasonTrail job =
let jobId = qjId job
mTimestamp = qjReceivedTimestamp job
timestamp = fromMaybe (0, 0) mTimestamp
in job
{ qjOps =
zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
qjOps job
}
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
changeOpCodePriority prio op =
if qoStatus op > OP_STATUS_RUNNING
then op
else op { qoPriority = prio }
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
cancelOpCode now op =
op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
changeJobPriority :: Int -> QueuedJob -> QueuedJob
changeJobPriority prio job =
job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
cancelQueuedJob now job =
let ops' = map (cancelOpCode now) $ qjOps job
in job { qjOps = ops', qjEndTimestamp = Just now}
jobFilePrefix :: String
jobFilePrefix = "job-"
jobFileName :: JobId -> FilePath
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
parseJobFileId :: (Monad m) => FilePath -> m JobId
parseJobFileId path =
case stripPrefix jobFilePrefix path of
Nothing -> fail $ "Job file '" ++ path ++
"' doesn't have the correct prefix"
Just suffix -> makeJobIdS suffix
liveJobFile :: FilePath -> JobId -> FilePath
liveJobFile rootdir jid = rootdir </> jobFileName jid
archivedJobFile :: FilePath -> JobId -> FilePath
archivedJobFile rootdir jid =
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
opStatusToJob :: OpStatus -> JobStatus
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR
calcJobStatus :: QueuedJob -> JobStatus
calcJobStatus QueuedJob { qjOps = ops } =
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
where
terminalStatus OP_STATUS_ERROR = True
terminalStatus OP_STATUS_CANCELING = True
terminalStatus OP_STATUS_CANCELED = True
terminalStatus _ = False
softStatus OP_STATUS_SUCCESS = True
softStatus OP_STATUS_QUEUED = True
softStatus _ = False
extractOpSt [] _ True = JOB_STATUS_SUCCESS
extractOpSt [] d False = d
extractOpSt (x:xs) d old_all
| terminalStatus x = opStatusToJob x
| softStatus x = extractOpSt xs d new_all
| otherwise = extractOpSt xs (opStatusToJob x) new_all
where new_all = x == OP_STATUS_SUCCESS && old_all
jobStarted :: QueuedJob -> Bool
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
jobFinalized :: QueuedJob -> Bool
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
jobArchivable :: Timestamp -> QueuedJob -> Bool
jobArchivable ts = liftA2 (&&) jobFinalized
$ maybe False (< ts)
. liftA2 (<|>) qjEndTimestamp qjStartTimestamp
opStatusFinalized :: OpStatus -> Bool
opStatusFinalized = (> OP_STATUS_RUNNING)
calcJobPriority :: QueuedJob -> Int
calcJobPriority QueuedJob { qjOps = ops } =
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
where helper [] = C.opPrioDefault
helper ps = minimum ps
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
ignoreIOError a ignore_noent msg e = do
unless (isDoesNotExistError e && ignore_noent) .
logWarning $ msg ++ ": " ++ show e
return a
allArchiveDirs :: FilePath -> IO [FilePath]
allArchiveDirs rootdir = do
let adir = rootdir </> jobQueueArchiveSubDir
contents <- getDirectoryContents adir `Control.Exception.catch`
ignoreIOError [] False
("Failed to list queue directory " ++ adir)
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
filterM (\path ->
liftM isDirectory (getFileStatus (adir </> path))
`Control.Exception.catch`
ignoreIOError False True
("Failed to stat archive path " ++ path)) fpaths
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
determineJobDirectories rootdir archived = do
other <- if archived
then allArchiveDirs rootdir
else return []
return $ rootdir:other
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
sequencer l = reverse <$> foldl seqFolder (Right []) l
seqFolder :: Either IOError [[JobId]]
-> Either IOError [JobId]
-> Either IOError [[JobId]]
seqFolder (Left e) _ = Left e
seqFolder (Right _) (Left e) = Left e
seqFolder (Right l) (Right el) = Right $ el:l
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
sortJobIDs :: [JobId] -> [JobId]
sortJobIDs = sortBy (comparing fromJobId)
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
getDirJobIDs path = do
either_contents <-
try (getDirectoryContents path) :: IO (Either IOError [FilePath])
case either_contents of
Left e -> do
logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
return $ Left e
Right contents -> do
let jids = foldl (\ids file ->
case parseJobFileId file of
Nothing -> ids
Just new_id -> new_id:ids) [] contents
return . Right $ reverse jids
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
readJobDataFromDisk rootdir archived jid = do
let live_path = liveJobFile rootdir jid
archived_path = archivedJobFile rootdir jid
all_paths = if archived
then [(live_path, False), (archived_path, True)]
else [(live_path, False)]
foldM (\state (path, isarchived) ->
liftM (\r -> Just (r, isarchived)) (readFile path)
`Control.Exception.catch`
ignoreIOError state True
("Failed to read job file " ++ path)) Nothing all_paths
noSuchJob :: Result (QueuedJob, Bool)
noSuchJob = Bad "Can't load job file"
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
loadJobFromDisk rootdir archived jid = do
raw <- readJobDataFromDisk rootdir archived jid
return $! case raw of
Nothing -> noSuchJob
Just (str, arch) ->
liftM (\qj -> (qj, arch)) .
fromJResult "Parsing job file" $ Text.JSON.decode str
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
writeJobToDisk rootdir job = do
let filename = liveJobFile rootdir . qjId $ job
content = Text.JSON.encode . Text.JSON.showJSON $ job
tryAndLogIOError (atomicWriteFile filename content)
("Failed to write " ++ filename) Ok
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
replicateJob rootdir mastercandidates job = do
let filename = liveJobFile rootdir . qjId $ job
content = Text.JSON.encode . Text.JSON.showJSON $ job
filename' <- makeVirtualPath filename
callresult <- executeRpcCall mastercandidates
$ RpcCallJobqueueUpdate filename' content
let result = map (second (() <$)) callresult
logRpcErrors result
return result
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
replicateManyJobs rootdir mastercandidates =
mapM_ (replicateJob rootdir mastercandidates)
readSerialFromDisk :: IO (Result JobId)
readSerialFromDisk = do
filename <- jobQueueSerialFile
tryAndLogIOError (readFile filename) "Failed to read serial file"
(makeJobIdS . rStripSpace)
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
allocateJobIds mastercandidates lock n =
if n <= 0
then return . Bad $ "Can only allocate positive number of job ids"
else do
takeMVar lock
rjobid <- readSerialFromDisk
case rjobid of
Bad s -> do
putMVar lock ()
return . Bad $ s
Ok jid -> do
let current = fromJobId jid
serial_content = show (current + n) ++ "\n"
serial <- jobQueueSerialFile
write_result <- try $ atomicWriteFile serial serial_content
:: IO (Either IOError ())
case write_result of
Left e -> do
putMVar lock ()
let msg = "Failed to write serial file: " ++ show e
logError msg
return . Bad $ msg
Right () -> do
serial' <- makeVirtualPath serial
_ <- executeRpcCall mastercandidates
$ RpcCallJobqueueUpdate serial' serial_content
putMVar lock ()
return $ mapM makeJobId [(current+1)..(current+n)]
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
allocateJobId mastercandidates lock = do
jids <- allocateJobIds mastercandidates lock 1
return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
isQueueOpen :: IO Bool
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
startJobs :: [QueuedJob] -> IO ()
startJobs jobs = do
socketpath <- defaultMasterSocket
client <- getLuxiClient socketpath
pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
let failures = map show $ justBad pickupResults
unless (null failures)
. logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
cancelJob :: JobId -> IO (ErrorResult JSValue)
cancelJob jid = do
socketpath <- defaultMasterSocket
client <- getLuxiClient socketpath
callMethod (CancelJob jid) client
queueDirPermissions :: FilePermissions
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
, fpGroup = Just C.daemonsGroup
, fpPermissions = 0o0750
}
archiveSomeJobsUntil :: ([JobId] -> IO ())
-> FilePath
-> ClockTime
-> Timestamp
-> Int
-> [JobId]
-> [JobId]
-> IO (Int, Int)
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
unless (null torepl) . (>> return ())
. forkIO $ replicateFn torepl
return (arch, 0)
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
continue = archiveMore arch torepl jids
jidname = show $ fromJobId jid
time <- getClockTime
if time >= endt
then do
_ <- forkIO $ replicateFn torepl
return (arch, length (jid:jids))
else do
logDebug $ "Inspecting job " ++ jidname ++ " for archival"
loadResult <- loadJobFromDisk qDir False jid
case loadResult of
Bad _ -> continue
Ok (job, _) ->
if jobArchivable cutt job
then do
let live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
renameResult <- safeRenameFile queueDirPermissions
live archive
case renameResult of
Bad s -> do
logWarning $ "Renaming " ++ live ++ " to " ++ archive
++ " failed unexpectedly: " ++ s
continue
Ok () -> do
let torepl' = jid:torepl
if length torepl' >= 10
then do
_ <- forkIO $ replicateFn torepl'
archiveMore (arch + 1) [] jids
else archiveMore (arch + 1) torepl' jids
else continue
archiveJobs :: ConfigData
-> Int
-> Int
-> [JobId]
-> IO (Int, Int)
archiveJobs cfg age timeout jids = do
now <- getClockTime
qDir <- queueDir
let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
cuttime = if age < 0 then noTimestamp
else advanceTimestamp ( age) (fromClockTime now)
mcs = Config.getMasterCandidates cfg
replicateFn jobs = do
let olds = map (liveJobFile qDir) jobs
news = map (archivedJobFile qDir) jobs
_ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
return ()
archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids