module Ganeti.JQueue
( QueuedOpCode(..)
, QueuedJob(..)
, InputOpCode(..)
, Timestamp
, noTimestamp
, opStatusFinalized
, extractOpSummary
, calcJobStatus
, calcJobPriority
, jobFileName
, liveJobFile
, archivedJobFile
, determineJobDirectories
, getJobIDs
, sortJobIDs
, loadJobFromDisk
, noSuchJob
) where
import Control.Exception
import Control.Monad
import Data.List
import Data.Ord (comparing)
import Prelude hiding (log, id)
import System.Directory
import System.FilePath
import System.IO.Error (isDoesNotExistError)
import System.Posix.Files
import qualified Text.JSON
import Text.JSON.Types
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.OpCodes
import Ganeti.Path
import Ganeti.THH
import Ganeti.Types
type Timestamp = (Int, Int)
noTimestamp :: Timestamp
noTimestamp = (1, 1)
data InputOpCode = ValidOpCode MetaOpCode
| InvalidOpCode JSValue
deriving (Show, Eq)
instance Text.JSON.JSON InputOpCode where
showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
showJSON (InvalidOpCode inv) = inv
readJSON v = case Text.JSON.readJSON v of
Text.JSON.Error _ -> return $ InvalidOpCode v
Text.JSON.Ok mo -> return $ ValidOpCode mo
invalidOp :: String
invalidOp = "INVALID_OP"
extractOpSummary :: InputOpCode -> String
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
extractOpSummary (InvalidOpCode (JSObject o)) =
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
Just s -> drop 3 s
Nothing -> invalidOp
extractOpSummary _ = invalidOp
$(buildObject "QueuedOpCode" "qo"
[ simpleField "input" [t| InputOpCode |]
, simpleField "status" [t| OpStatus |]
, simpleField "result" [t| JSValue |]
, defaultField [| [] |] $
simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |]
, simpleField "priority" [t| Int |]
, optionalNullSerField $
simpleField "start_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "exec_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "end_timestamp" [t| Timestamp |]
])
$(buildObject "QueuedJob" "qj"
[ simpleField "id" [t| JobId |]
, simpleField "ops" [t| [QueuedOpCode] |]
, optionalNullSerField $
simpleField "received_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "start_timestamp" [t| Timestamp |]
, optionalNullSerField $
simpleField "end_timestamp" [t| Timestamp |]
])
jobFilePrefix :: String
jobFilePrefix = "job-"
jobFileName :: JobId -> FilePath
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
parseJobFileId :: (Monad m) => FilePath -> m JobId
parseJobFileId path =
case stripPrefix jobFilePrefix path of
Nothing -> fail $ "Job file '" ++ path ++
"' doesn't have the correct prefix"
Just suffix -> makeJobIdS suffix
liveJobFile :: FilePath -> JobId -> FilePath
liveJobFile rootdir jid = rootdir </> jobFileName jid
archivedJobFile :: FilePath -> JobId -> FilePath
archivedJobFile rootdir jid =
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
opStatusToJob :: OpStatus -> JobStatus
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR
calcJobStatus :: QueuedJob -> JobStatus
calcJobStatus QueuedJob { qjOps = ops } =
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
where
terminalStatus OP_STATUS_ERROR = True
terminalStatus OP_STATUS_CANCELING = True
terminalStatus OP_STATUS_CANCELED = True
terminalStatus _ = False
softStatus OP_STATUS_SUCCESS = True
softStatus OP_STATUS_QUEUED = True
softStatus _ = False
extractOpSt [] _ True = JOB_STATUS_SUCCESS
extractOpSt [] d False = d
extractOpSt (x:xs) d old_all
| terminalStatus x = opStatusToJob x
| softStatus x = extractOpSt xs d new_all
| otherwise = extractOpSt xs (opStatusToJob x) new_all
where new_all = x == OP_STATUS_SUCCESS && old_all
opStatusFinalized :: OpStatus -> Bool
opStatusFinalized = (> OP_STATUS_RUNNING)
calcJobPriority :: QueuedJob -> Int
calcJobPriority QueuedJob { qjOps = ops } =
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
where helper [] = C.opPrioDefault
helper ps = minimum ps
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
ignoreIOError a ignore_noent msg e = do
unless (isDoesNotExistError e && ignore_noent) .
logWarning $ msg ++ ": " ++ show e
return a
allArchiveDirs :: FilePath -> IO [FilePath]
allArchiveDirs rootdir = do
let adir = rootdir </> jobQueueArchiveSubDir
contents <- getDirectoryContents adir `Control.Exception.catch`
ignoreIOError [] False
("Failed to list queue directory " ++ adir)
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
filterM (\path ->
liftM isDirectory (getFileStatus (adir </> path))
`Control.Exception.catch`
ignoreIOError False True
("Failed to stat archive path " ++ path)) fpaths
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
determineJobDirectories rootdir archived = do
other <- if archived
then allArchiveDirs rootdir
else return []
return $ rootdir:other
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
seqFolder :: Either IOError [[JobId]]
-> Either IOError [JobId]
-> Either IOError [[JobId]]
seqFolder (Left e) _ = Left e
seqFolder (Right _) (Left e) = Left e
seqFolder (Right l) (Right el) = Right $ el:l
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
sortJobIDs :: [JobId] -> [JobId]
sortJobIDs = sortBy (comparing fromJobId)
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
getDirJobIDs path = do
either_contents <-
try (getDirectoryContents path) :: IO (Either IOError [FilePath])
case either_contents of
Left e -> do
logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
return $ Left e
Right contents -> do
let jids = foldl (\ids file ->
case parseJobFileId file of
Nothing -> ids
Just new_id -> new_id:ids) [] contents
return . Right $ reverse jids
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
readJobDataFromDisk rootdir archived jid = do
let live_path = liveJobFile rootdir jid
archived_path = archivedJobFile rootdir jid
all_paths = if archived
then [(live_path, False), (archived_path, True)]
else [(live_path, False)]
foldM (\state (path, isarchived) ->
liftM (\r -> Just (r, isarchived)) (readFile path)
`Control.Exception.catch`
ignoreIOError state True
("Failed to read job file " ++ path)) Nothing all_paths
noSuchJob :: Result (QueuedJob, Bool)
noSuchJob = Bad "Can't load job file"
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
loadJobFromDisk rootdir archived jid = do
raw <- readJobDataFromDisk rootdir archived jid
return $! case raw of
Nothing -> noSuchJob
Just (str, arch) ->
liftM (\qj -> (qj, arch)) .
fromJResult "Parsing job file" $ Text.JSON.decode str