module Ganeti.JQueue
( queuedOpCodeFromMetaOpCode
, queuedJobFromOpCodes
, changeOpCodePriority
, changeJobPriority
, cancelQueuedJob
, failQueuedJob
, fromClockTime
, noTimestamp
, currentTimestamp
, advanceTimestamp
, reasonTrailTimestamp
, setReceivedTimestamp
, extendJobReasonTrail
, getJobDependencies
, opStatusFinalized
, extractOpSummary
, calcJobStatus
, jobStarted
, jobFinalized
, jobArchivable
, calcJobPriority
, jobFileName
, liveJobFile
, archivedJobFile
, determineJobDirectories
, getJobIDs
, sortJobIDs
, loadJobFromDisk
, noSuchJob
, readSerialFromDisk
, allocateJobIds
, allocateJobId
, writeJobToDisk
, replicateManyJobs
, writeAndReplicateJob
, isQueueOpen
, startJobs
, cancelJob
, tellJobPriority
, notifyJob
, queueDirPermissions
, archiveJobs
, Timestamp
, InputOpCode(..)
, QueuedOpCode(..)
, QueuedJob(..)
) where
import Control.Applicative (liftA2, (<|>), (<$>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception
import Control.Lens (over)
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import Data.Functor ((<$))
import Data.List
import Data.Maybe
import Data.Ord (comparing)
import Prelude hiding (id, log)
import System.Directory
import System.FilePath
import System.IO.Error (isDoesNotExistError)
import System.Posix.Files
import System.Posix.Signals (sigHUP, sigTERM, sigUSR1, sigKILL, signalProcess)
import System.Posix.Types (ProcessID)
import System.Time
import qualified Text.JSON
import Text.JSON.Types
import Ganeti.BasicTypes
import qualified Ganeti.Config as Config
import qualified Ganeti.Constants as C
import Ganeti.Errors (ErrorResult, ResultG)
import Ganeti.JQueue.Lens (qoInputL, validOpCodeL)
import Ganeti.JQueue.Objects
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.Luxi
import Ganeti.Objects (ConfigData, Node)
import Ganeti.OpCodes
import Ganeti.OpCodes.Lens (metaParamsL, opReasonL)
import Ganeti.Path
import Ganeti.Query.Exec as Exec
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.Runtime (GanetiDaemon(..), GanetiGroup(..), MiscGroup(..))
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Atomic
import Ganeti.Utils.Livelock (Livelock, isDead)
import Ganeti.Utils.MVarLock
import Ganeti.VCluster (makeVirtualPath)
noTimestamp :: Timestamp
noTimestamp = (1, 1)
fromClockTime :: ClockTime -> Timestamp
fromClockTime (TOD ctime pico) =
(fromIntegral ctime, fromIntegral $ pico `div` 1000000)
currentTimestamp :: IO Timestamp
currentTimestamp = fromClockTime `liftM` getClockTime
advanceTimestamp :: Int -> Timestamp -> Timestamp
advanceTimestamp = first . (+)
toMetaOpCode :: InputOpCode -> [MetaOpCode]
toMetaOpCode (ValidOpCode mopc) = [mopc]
toMetaOpCode _ = []
invalidOp :: String
invalidOp = "INVALID_OP"
extractOpSummary :: InputOpCode -> String
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
extractOpSummary (InvalidOpCode (JSObject o)) =
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
Just s -> drop 3 s
Nothing -> invalidOp
extractOpSummary _ = invalidOp
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
queuedOpCodeFromMetaOpCode op =
QueuedOpCode { qoInput = ValidOpCode op
, qoStatus = OP_STATUS_QUEUED
, qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
$ op
, qoLog = []
, qoResult = JSNull
, qoStartTimestamp = Nothing
, qoEndTimestamp = Nothing
, qoExecTimestamp = Nothing
}
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
queuedJobFromOpCodes jobid ops = do
ops' <- mapM (`resolveDependencies` jobid) ops
return QueuedJob { qjId = jobid
, qjOps = map queuedOpCodeFromMetaOpCode ops'
, qjReceivedTimestamp = Nothing
, qjStartTimestamp = Nothing
, qjEndTimestamp = Nothing
, qjLivelock = Nothing
, qjProcessId = Nothing
}
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
reasonTrailTimestamp :: Timestamp -> Integer
reasonTrailTimestamp (sec, micro) =
let sec' = toInteger sec
micro' = toInteger micro
in sec' * 1000000000 + micro' * 1000
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
-> InputOpCode
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
let metaP = metaParams vOp
op = metaOpCode vOp
trail = opReason metaP
reasonSrc = opReasonSrcID op
reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
trail' = trail ++ [reason]
in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
-> QueuedOpCode
extendOpCodeReasonTrail jid ts i op =
let inOp = qoInput op
in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
extendJobReasonTrail :: QueuedJob -> QueuedJob
extendJobReasonTrail job =
let jobId = qjId job
mTimestamp = qjReceivedTimestamp job
timestamp = fromMaybe (0, 0) mTimestamp
in job
{ qjOps =
zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
qjOps job
}
getJobDependencies :: QueuedJob -> [JobId]
getJobDependencies job = do
op <- qjOps job
mopc <- toMetaOpCode $ qoInput op
dep <- fromMaybe [] . opDepends $ metaParams mopc
getJobIdFromDependency dep
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
changeOpCodePriority prio op =
if qoStatus op > OP_STATUS_RUNNING
then op
else op { qoPriority = prio }
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
cancelOpCode now op =
op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
changeJobPriority :: Int -> QueuedJob -> QueuedJob
changeJobPriority prio job =
job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
cancelQueuedJob now job =
let ops' = map (cancelOpCode now) $ qjOps job
in job { qjOps = ops', qjEndTimestamp = Just now }
failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode
failOpCode reason@(_, msg, _) now op =
over (qoInputL . validOpCodeL . metaParamsL . opReasonL) (++ [reason])
op { qoStatus = OP_STATUS_ERROR
, qoResult = Text.JSON.JSString . Text.JSON.toJSString $ msg
, qoEndTimestamp = Just now }
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
failQueuedJob reason now job =
let ops' = map (failOpCode reason now) $ qjOps job
in job { qjOps = ops', qjEndTimestamp = Just now }
jobFilePrefix :: String
jobFilePrefix = "job-"
jobFileName :: JobId -> FilePath
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
parseJobFileId :: (Monad m) => FilePath -> m JobId
parseJobFileId path =
case stripPrefix jobFilePrefix path of
Nothing -> fail $ "Job file '" ++ path ++
"' doesn't have the correct prefix"
Just suffix -> makeJobIdS suffix
liveJobFile :: FilePath -> JobId -> FilePath
liveJobFile rootdir jid = rootdir </> jobFileName jid
archivedJobFile :: FilePath -> JobId -> FilePath
archivedJobFile rootdir jid =
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
opStatusToJob :: OpStatus -> JobStatus
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR
calcJobStatus :: QueuedJob -> JobStatus
calcJobStatus QueuedJob { qjOps = ops } =
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
where
terminalStatus OP_STATUS_ERROR = True
terminalStatus OP_STATUS_CANCELING = True
terminalStatus OP_STATUS_CANCELED = True
terminalStatus _ = False
softStatus OP_STATUS_SUCCESS = True
softStatus OP_STATUS_QUEUED = True
softStatus _ = False
extractOpSt [] _ True = JOB_STATUS_SUCCESS
extractOpSt [] d False = d
extractOpSt (x:xs) d old_all
| terminalStatus x = opStatusToJob x
| softStatus x = extractOpSt xs d new_all
| otherwise = extractOpSt xs (opStatusToJob x) new_all
where new_all = x == OP_STATUS_SUCCESS && old_all
jobStarted :: QueuedJob -> Bool
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
jobFinalized :: QueuedJob -> Bool
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
jobArchivable :: Timestamp -> QueuedJob -> Bool
jobArchivable ts = liftA2 (&&) jobFinalized
$ maybe False (< ts)
. liftA2 (<|>) qjEndTimestamp qjStartTimestamp
opStatusFinalized :: OpStatus -> Bool
opStatusFinalized = (> OP_STATUS_RUNNING)
calcJobPriority :: QueuedJob -> Int
calcJobPriority QueuedJob { qjOps = ops } =
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
where helper [] = C.opPrioDefault
helper ps = minimum ps
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
ignoreIOError a ignore_noent msg e = do
unless (isDoesNotExistError e && ignore_noent) .
logWarning $ msg ++ ": " ++ show e
return a
allArchiveDirs :: FilePath -> IO [FilePath]
allArchiveDirs rootdir = do
let adir = rootdir </> jobQueueArchiveSubDir
contents <- getDirectoryContents adir `Control.Exception.catch`
ignoreIOError [] False
("Failed to list queue directory " ++ adir)
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
filterM (\path ->
liftM isDirectory (getFileStatus (adir </> path))
`Control.Exception.catch`
ignoreIOError False True
("Failed to stat archive path " ++ path)) fpaths
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
determineJobDirectories rootdir archived = do
other <- if archived
then allArchiveDirs rootdir
else return []
return $ rootdir:other
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
sortJobIDs :: [JobId] -> [JobId]
sortJobIDs = sortBy (comparing fromJobId)
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
getDirJobIDs path =
withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
readJobDataFromDisk rootdir archived jid = do
let live_path = liveJobFile rootdir jid
archived_path = archivedJobFile rootdir jid
all_paths = if archived
then [(live_path, False), (archived_path, True)]
else [(live_path, False)]
foldM (\state (path, isarchived) ->
liftM (\r -> Just (r, isarchived)) (readFile path)
`Control.Exception.catch`
ignoreIOError state True
("Failed to read job file " ++ path)) Nothing all_paths
noSuchJob :: Result (QueuedJob, Bool)
noSuchJob = Bad "Can't load job file"
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
loadJobFromDisk rootdir archived jid = do
raw <- readJobDataFromDisk rootdir archived jid
return $! case raw of
Nothing -> noSuchJob
Just (str, arch) ->
liftM (\qj -> (qj, arch)) .
fromJResult "Parsing job file" $ Text.JSON.decode str
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
writeJobToDisk rootdir job = do
let filename = liveJobFile rootdir . qjId $ job
content = Text.JSON.encode . Text.JSON.showJSON $ job
tryAndLogIOError (atomicWriteFile filename content)
("Failed to write " ++ filename) Ok
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
replicateJob rootdir mastercandidates job = do
let filename = liveJobFile rootdir . qjId $ job
content = Text.JSON.encode . Text.JSON.showJSON $ job
filename' <- makeVirtualPath filename
callresult <- executeRpcCall mastercandidates
$ RpcCallJobqueueUpdate filename' content
let result = map (second (() <$)) callresult
_ <- logRpcErrors result
return result
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
replicateManyJobs rootdir mastercandidates =
mapM_ (replicateJob rootdir mastercandidates)
writeAndReplicateJob :: (Error e)
=> ConfigData -> FilePath -> QueuedJob
-> ResultT e IO [(Node, ERpcError ())]
writeAndReplicateJob cfg rootdir job = do
mkResultT $ writeJobToDisk rootdir job
liftIO $ replicateJob rootdir (Config.getMasterCandidates cfg) job
readSerialFromDisk :: IO (Result JobId)
readSerialFromDisk = do
filename <- jobQueueSerialFile
tryAndLogIOError (readFile filename) "Failed to read serial file"
(makeJobIdS . rStripSpace)
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
allocateJobIds mastercandidates lock n =
if n <= 0
then if n == 0
then return $ Ok []
else return . Bad
$ "Can only allocate non-negative number of job ids"
else withLock lock $ do
rjobid <- readSerialFromDisk
case rjobid of
Bad s -> return . Bad $ s
Ok jid -> do
let current = fromJobId jid
serial_content = show (current + n) ++ "\n"
serial <- jobQueueSerialFile
write_result <- try $ atomicWriteFile serial serial_content
:: IO (Either IOError ())
case write_result of
Left e -> do
let msg = "Failed to write serial file: " ++ show e
logError msg
return . Bad $ msg
Right () -> do
serial' <- makeVirtualPath serial
_ <- executeRpcCall mastercandidates
$ RpcCallJobqueueUpdate serial' serial_content
return $ mapM makeJobId [(current+1)..(current+n)]
allocateJobId :: [Node] -> Lock -> IO (Result JobId)
allocateJobId mastercandidates lock = do
jids <- allocateJobIds mastercandidates lock 1
return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
isQueueOpen :: IO Bool
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
startJobs :: Livelock
-> Lock
-> [QueuedJob]
-> IO [ErrorResult QueuedJob]
startJobs luxiLivelock forkLock jobs = do
qdir <- queueDir
let updateJob job llfile =
void . mkResultT . writeJobToDisk qdir
$ job { qjLivelock = Just llfile }
let runJob job = withLock forkLock $ do
(llfile, _) <- Exec.forkJobProcess job luxiLivelock
(updateJob job)
return $ job { qjLivelock = Just llfile }
mapM (runResultT . runJob) jobs
isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool
isQueuedJobDead ownlivelock =
maybe (return False) (liftIO . isDead)
. mfilter (/= ownlivelock)
. qjLivelock
waitForJob :: JobId -> Int -> ResultG (Bool, String)
waitForJob jid tmout = do
qDir <- liftIO queueDir
let jobfile = liveJobFile qDir jid
load = liftM fst <$> loadJobFromDisk qDir False jid
finalizedR = genericResult (const False) jobFinalized
jobR <- liftIO $ watchFileBy jobfile tmout finalizedR load
case calcJobStatus <$> jobR of
Ok s | s == JOB_STATUS_CANCELED ->
return (True, "Job successfully cancelled")
| finalizedR jobR ->
return (False, "Job exited before it could have been canceled,\
\ status " ++ show s)
| otherwise ->
return (False, "Job could not be canceled, status "
++ show s)
Bad e -> failError $ "Can't read job status: " ++ e
cancelJob :: Bool
-> Livelock
-> JobId
-> IO (ErrorResult (Bool, String))
cancelJob kill luxiLivelock jid = runResultT $ do
result <- runMaybeT . msum . flip map [0..5 :: Int] $ \tryNo -> do
when (tryNo > 0) . liftIO . threadDelay $ 100000 * (2 ^ tryNo)
qDir <- liftIO queueDir
(job, _) <- lift . mkResultT $ loadJobFromDisk qDir True jid
let jName = ("Job " ++) . show . fromJobId . qjId $ job
dead <- isQueuedJobDead luxiLivelock job
case qjProcessId job of
_ | dead ->
return (True, jName ++ " has been already dead")
Just pid -> do
liftIO $ signalProcess (if kill then sigKILL else sigTERM) pid
if not kill then
if calcJobStatus job > JOB_STATUS_WAITING
then return (False, "Job no longer waiting, can't cancel\
\ (informed it anyway)")
else lift $ waitForJob jid C.luxiCancelJobTimeout
else return (True, "SIGKILL send to the process")
_ -> do
logDebug $ jName ++ " in its startup phase, retrying"
mzero
return $ fromMaybe (False, "Timeout: job still in its startup phase") result
tellJobPriority :: Livelock
-> JobId
-> Int
-> IO (ErrorResult (Bool, String))
tellJobPriority luxiLivelock jid prio = runResultT $ do
let jidS = show $ fromJobId jid
jName = "Job " ++ jidS
mDir <- liftIO luxidMessageDir
let prioFile = mDir </> jidS ++ ".prio"
liftIO . atomicWriteFile prioFile $ show prio
qDir <- liftIO queueDir
(job, _) <- mkResultT $ loadJobFromDisk qDir True jid
dead <- isQueuedJobDead luxiLivelock job
case qjProcessId job of
_ | dead -> do
liftIO $ removeFile prioFile
return (False, jName ++ " is dead")
Just pid -> do
liftIO $ signalProcess sigUSR1 pid
return (True, jName ++ " with pid " ++ show pid ++ " signaled")
_ -> return (False, jName ++ "'s pid unknown")
notifyJob :: ProcessID -> IO (ErrorResult ())
notifyJob pid = runResultT $ do
logDebug $ "Signalling process " ++ show pid
liftIO $ signalProcess sigHUP pid
queueDirPermissions :: FilePermissions
queueDirPermissions = FilePermissions { fpOwner = Just GanetiMasterd
, fpGroup = Just $ ExtraGroup DaemonsGroup
, fpPermissions = 0o0750
}
archiveSomeJobsUntil :: ([JobId] -> IO ())
-> FilePath
-> ClockTime
-> Timestamp
-> Int
-> [JobId]
-> [JobId]
-> IO (Int, Int)
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
unless (null torepl) . (>> return ())
. forkIO $ replicateFn torepl
return (arch, 0)
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
continue = archiveMore arch torepl jids
jidname = show $ fromJobId jid
time <- getClockTime
if time >= endt
then do
_ <- forkIO $ replicateFn torepl
return (arch, length (jid:jids))
else do
logDebug $ "Inspecting job " ++ jidname ++ " for archival"
loadResult <- loadJobFromDisk qDir False jid
case loadResult of
Bad _ -> continue
Ok (job, _) ->
if jobArchivable cutt job
then do
let live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
renameResult <- safeRenameFile queueDirPermissions
live archive
case renameResult of
Bad s -> do
logWarning $ "Renaming " ++ live ++ " to " ++ archive
++ " failed unexpectedly: " ++ s
continue
Ok () -> do
let torepl' = jid:torepl
if length torepl' >= 10
then do
_ <- forkIO $ replicateFn torepl'
archiveMore (arch + 1) [] jids
else archiveMore (arch + 1) torepl' jids
else continue
archiveJobs :: ConfigData
-> Int
-> Int
-> [JobId]
-> IO (Int, Int)
archiveJobs cfg age timeout jids = do
now <- getClockTime
qDir <- queueDir
let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
cuttime = if age < 0 then noTimestamp
else advanceTimestamp ( age) (fromClockTime now)
mcs = Config.getMasterCandidates cfg
replicateFn jobs = do
let olds = map (liveJobFile qDir) jobs
news = map (archivedJobFile qDir) jobs
_ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
return ()
archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids