module Ganeti.Jobs
( submitJobs
, Annotator
, execWithCancel
, execJobsWait
, execJobsWaitOk
, waitForJobs
) where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket)
import Data.List
import Data.Tuple
import Data.IORef
import System.Exit
import System.Posix.Process
import System.Posix.Signals
import Ganeti.BasicTypes
import Ganeti.Errors
import qualified Ganeti.Luxi as L
import Ganeti.OpCodes
import Ganeti.Types
import Ganeti.Utils
type Annotator = OpCode -> MetaOpCode
execCancelWrapper :: Annotator -> String -> IORef Int
-> [([[OpCode]], String)]
-> [(String, [(Int, JobStatus)])]-> IO (Result ())
execCancelWrapper _ _ _ [] _ = return $ Ok ()
execCancelWrapper anno master cref jobs submitted = do
cancel <- readIORef cref
if cancel > 0
then do
putStrLn "Exiting early due to user request, "
putStrLn $ show (length submitted) ++ "jobset(s) submitted: "
print submitted
putStrLn $ show (length jobs) ++ " jobset(s) not submitted:"
print $ map swap jobs
return $ Ok ()
else execJobSet anno master cref jobs submitted
execJobSet :: Annotator -> String -> IORef Int
-> [([[OpCode]], String)]
-> [(String, [(Int, JobStatus)])] -> IO (Result ())
execJobSet _ _ _ [] _ = return $ Ok ()
execJobSet anno master cref ((opcodes, descr):jobs) submitted = do
jrs <- bracket (L.getLuxiClient master) L.closeClient $
execJobsWait metaopcodes logfn
case jrs of
Bad x -> return $ Bad x
Ok x -> let failures = filter ((/= JOB_STATUS_SUCCESS) . snd) x in
if null failures
then execCancelWrapper anno master cref jobs $
submitted ++ [(descr, jobs_info x)]
else return . Bad . unlines $ [
"Not all jobs completed successfully: " ++ show failures,
"Aborting."]
where metaopcodes = map (map anno) opcodes
logfn = putStrLn . ("Got job IDs " ++)
. commaJoin . map (show . fromJobId)
jobs_info ji = zip (map (fromJobId . fst) ji) $ map snd ji
handleSigInt :: IORef Int -> IO ()
handleSigInt cref = do
writeIORef cref 1
putStrLn ("Cancel request registered, will exit at" ++
" the end of the current job set...")
handleSigTerm :: IORef Int -> IO ()
handleSigTerm cref = do
writeIORef cref 2
putStrLn "Double cancel request, exiting now..."
exitImmediately $ ExitFailure 2
execWithCancel :: Annotator -> String -> [([[OpCode]], String)]
-> IO (Result ())
execWithCancel anno master cmd_jobs = do
cref <- newIORef 0
mapM_ (\(hnd, sig) -> installHandler sig (Catch (hnd cref)) Nothing)
[(handleSigTerm, softwareTermination), (handleSigInt, keyboardSignal)]
execCancelWrapper anno master cref cmd_jobs []
submitJobs :: [[MetaOpCode]] -> L.Client -> IO (Result [L.JobId])
submitJobs opcodes client = do
jids <- L.submitManyJobs client opcodes
return (case jids of
Bad e -> Bad $ "Job submission error: " ++ formatError e
Ok jids' -> Ok jids')
execJobsWait :: [[MetaOpCode]]
-> ([L.JobId] -> IO ())
-> L.Client
-> IO (Result [(L.JobId, JobStatus)])
execJobsWait opcodes callback client = do
jids <- submitJobs opcodes client
case jids of
Bad e -> return $ Bad e
Ok jids' -> do
callback jids'
waitForJobs jids' client
waitForJobs :: [L.JobId] -> L.Client -> IO (Result [(L.JobId, JobStatus)])
waitForJobs jids client = waitForJobs' 500000 15000000
where
waitForJobs' delay maxdelay = do
threadDelay delay
sts <- L.queryJobsStatus client jids
case sts of
Bad e -> return . Bad $ "Checking job status: " ++ formatError e
Ok sts' -> if any (<= JOB_STATUS_RUNNING) sts' then
waitForJobs' (min (delay * 2) maxdelay) maxdelay
else
return . Ok $ zip jids sts'
execJobsWaitOk :: [[MetaOpCode]] -> L.Client -> IO (Result ())
execJobsWaitOk opcodes client = do
let nullog = const (return () :: IO ())
failed = filter ((/=) JOB_STATUS_SUCCESS . snd)
fmtfail (i, s) = show (fromJobId i) ++ "=>" ++ jobStatusToRaw s
sts <- execJobsWait opcodes nullog client
case sts of
Bad e -> return $ Bad e
Ok sts' -> return (if null $ failed sts' then
Ok ()
else
Bad ("The following jobs failed: " ++
(intercalate ", " . map fmtfail $ failed sts')))