{-| Implementation of the job queue.

-}

{-

Copyright (C) 2010, 2012 Google Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-}

module Ganeti.JQueue
    ( queuedOpCodeFromMetaOpCode
    , queuedJobFromOpCodes
    , changeOpCodePriority
    , changeJobPriority
    , cancelQueuedJob
    , failQueuedJob
    , fromClockTime
    , noTimestamp
    , currentTimestamp
    , advanceTimestamp
    , reasonTrailTimestamp
    , setReceivedTimestamp
    , extendJobReasonTrail
    , getJobDependencies
    , opStatusFinalized
    , extractOpSummary
    , calcJobStatus
    , jobStarted
    , jobFinalized
    , jobArchivable
    , calcJobPriority
    , jobFileName
    , liveJobFile
    , archivedJobFile
    , determineJobDirectories
    , getJobIDs
    , sortJobIDs
    , loadJobFromDisk
    , noSuchJob
    , readSerialFromDisk
    , allocateJobIds
    , allocateJobId
    , writeJobToDisk
    , replicateManyJobs
    , writeAndReplicateJob
    , isQueueOpen
    , startJobs
    , cancelJob
    , tellJobPriority
    , notifyJob
    , queueDirPermissions
    , archiveJobs
    -- re-export
    , Timestamp
    , InputOpCode(..)
    , QueuedOpCode(..)
    , QueuedJob(..)
    ) where

import Control.Applicative (liftA2, (<|>), (<$>))
import Control.Arrow (first, second)
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception
import Control.Lens (over)
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import Data.Functor ((<$))
import Data.List
import Data.Maybe
import Data.Ord (comparing)
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
import Prelude hiding (id, log)
import System.Directory
import System.FilePath
import System.IO.Error (isDoesNotExistError)
import System.Posix.Files
import System.Posix.Signals (sigHUP, sigTERM, sigUSR1, sigKILL, signalProcess)
import System.Posix.Types (ProcessID)
import System.Time
import qualified Text.JSON
import Text.JSON.Types

import Ganeti.BasicTypes
import qualified Ganeti.Config as Config
import qualified Ganeti.Constants as C
import Ganeti.Errors (ErrorResult, ResultG)
import Ganeti.JQueue.Lens (qoInputL, validOpCodeL)
import Ganeti.JQueue.Objects
import Ganeti.JSON
import Ganeti.Logging
import Ganeti.Luxi
import Ganeti.Objects (ConfigData, Node)
import Ganeti.OpCodes
import Ganeti.OpCodes.Lens (metaParamsL, opReasonL)
import Ganeti.Path
import Ganeti.Query.Exec as Exec
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
import Ganeti.Runtime (GanetiDaemon(..), GanetiGroup(..), MiscGroup(..))
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Atomic
import Ganeti.Utils.Livelock (Livelock, isDead)
import Ganeti.Utils.MVarLock
import Ganeti.VCluster (makeVirtualPath)

-- * Data types

-- | Missing timestamp type.
noTimestamp :: Timestamp
noTimestamp = (-1, -1)

-- | Obtain a Timestamp from a given clock time
fromClockTime :: ClockTime -> Timestamp
fromClockTime (TOD ctime pico) =
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)

-- | Get the current time in the job-queue timestamp format.
currentTimestamp :: IO Timestamp
currentTimestamp = fromClockTime `liftM` getClockTime

-- | From a given timestamp, obtain the timestamp of the
-- time that is the given number of seconds later.
advanceTimestamp :: Int -> Timestamp -> Timestamp
advanceTimestamp = first . (+)


-- | From an InputOpCode obtain the MetaOpCode, if any.
toMetaOpCode :: InputOpCode -> [MetaOpCode]
toMetaOpCode (ValidOpCode mopc) = [mopc]
toMetaOpCode _ = []

-- | Invalid opcode summary.
invalidOp :: String
invalidOp = "INVALID_OP"

-- | Tries to extract the opcode summary from an 'InputOpCode'. This
-- duplicates some functionality from the 'opSummary' function in
-- "Ganeti.OpCodes".
extractOpSummary :: InputOpCode -> String
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
extractOpSummary (InvalidOpCode (JSObject o)) =
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
    Just s -> drop 3 s -- drop the OP_ prefix
    Nothing -> invalidOp
extractOpSummary _ = invalidOp

-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
queuedOpCodeFromMetaOpCode op =
  QueuedOpCode { qoInput = ValidOpCode op
               , qoStatus = OP_STATUS_QUEUED
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
                              $ op
               , qoLog = []
               , qoResult = JSNull
               , qoStartTimestamp = Nothing
               , qoEndTimestamp = Nothing
               , qoExecTimestamp = Nothing
               }

-- | From a job-id and a list of op-codes create a job. This is
-- the pure part of job creation, as allocating a new job id
-- lives in IO.
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
queuedJobFromOpCodes jobid ops = do
  ops' <- mapM (`resolveDependencies` jobid) ops
  return QueuedJob { qjId = jobid
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
                   , qjReceivedTimestamp = Nothing
                   , qjStartTimestamp = Nothing
                   , qjEndTimestamp = Nothing
                   , qjLivelock = Nothing
                   , qjProcessId = Nothing
                   }

-- | Attach a received timestamp to a Queued Job.
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }

-- | Build a timestamp in the format expected by the reason trail (nanoseconds)
-- starting from a JQueue Timestamp.
reasonTrailTimestamp :: Timestamp -> Integer
reasonTrailTimestamp (sec, micro) =
  let sec' = toInteger sec
      micro' = toInteger micro
  in sec' * 1000000000 + micro' * 1000

-- | Append an element to the reason trail of an input opcode.
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
                             -> InputOpCode
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
  let metaP = metaParams vOp
      op = metaOpCode vOp
      trail = opReason metaP
      reasonSrc = opReasonSrcID op
      reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
      reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
      trail' = trail ++ [reason]
  in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }

-- | Append an element to the reason trail of a queued opcode.
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
                        -> QueuedOpCode
extendOpCodeReasonTrail jid ts i op =
  let inOp = qoInput op
  in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }

-- | Append an element to the reason trail of all the OpCodes of a queued job.
extendJobReasonTrail :: QueuedJob -> QueuedJob
extendJobReasonTrail job =
  let jobId = qjId job
      mTimestamp = qjReceivedTimestamp job
      -- This function is going to be called on QueuedJobs that already contain
      -- a timestamp. But for safety reasons we cannot assume mTimestamp will
      -- be (Just timestamp), so we use the value 0 in the extremely unlikely
      -- case this is not true.
      timestamp = fromMaybe (0, 0) mTimestamp
    in job
        { qjOps =
            zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
              qjOps job
        }

-- | From a queued job obtain the list of jobs it depends on.
getJobDependencies :: QueuedJob -> [JobId]
getJobDependencies job = do
  op <- qjOps job
  mopc <- toMetaOpCode $ qoInput op
  dep <- fromMaybe [] . opDepends $ metaParams mopc
  getJobIdFromDependency dep

-- | Change the priority of a QueuedOpCode, if it is not already
-- finalized.
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
changeOpCodePriority prio op =
  if qoStatus op > OP_STATUS_RUNNING
     then op
     else op { qoPriority = prio }

-- | Set the state of a QueuedOpCode to canceled.
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
cancelOpCode now op =
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }

-- | Change the priority of a job, i.e., change the priority of the
-- non-finalized opcodes.
changeJobPriority :: Int -> QueuedJob -> QueuedJob
changeJobPriority prio job =
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }

-- | Transform a QueuedJob that has not been started into its canceled form.
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
cancelQueuedJob now job =
  let ops' = map (cancelOpCode now) $ qjOps job
  in job { qjOps = ops', qjEndTimestamp = Just now }

-- | Set the state of a QueuedOpCode to failed
-- and set the Op result using the given reason message.
failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode
failOpCode reason@(_, msg, _) now op =
  over (qoInputL . validOpCodeL . metaParamsL . opReasonL) (++ [reason])
  op { qoStatus = OP_STATUS_ERROR
     , qoResult = Text.JSON.JSString . Text.JSON.toJSString $ msg
     , qoEndTimestamp = Just now }

-- | Transform a QueuedJob that has not been started into its failed form.
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
failQueuedJob reason now job =
  let ops' = map (failOpCode reason now) $ qjOps job
  in job { qjOps = ops', qjEndTimestamp = Just now }

-- | Job file prefix.
jobFilePrefix :: String
jobFilePrefix = "job-"

-- | Computes the filename for a given job ID.
jobFileName :: JobId -> FilePath
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)

-- | Parses a job ID from a file name.
parseJobFileId :: (Monad m) => FilePath -> m JobId
parseJobFileId path =
  case stripPrefix jobFilePrefix path of
    Nothing -> fail $ "Job file '" ++ path ++
                      "' doesn't have the correct prefix"
    Just suffix -> makeJobIdS suffix

-- | Computes the full path to a live job.
liveJobFile :: FilePath -> JobId -> FilePath
liveJobFile rootdir jid = rootdir </> jobFileName jid

-- | Computes the full path to an archives job. BROKEN.
archivedJobFile :: FilePath -> JobId -> FilePath
archivedJobFile rootdir jid =
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid

-- | Map from opcode status to job status.
opStatusToJob :: OpStatus -> JobStatus
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR

-- | Computes a queued job's status.
calcJobStatus :: QueuedJob -> JobStatus
calcJobStatus QueuedJob { qjOps = ops } =
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
    where
      terminalStatus OP_STATUS_ERROR     = True
      terminalStatus OP_STATUS_CANCELING = True
      terminalStatus OP_STATUS_CANCELED  = True
      terminalStatus _                   = False
      softStatus     OP_STATUS_SUCCESS   = True
      softStatus     OP_STATUS_QUEUED    = True
      softStatus     _                   = False
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
      extractOpSt [] d False = d
      extractOpSt (x:xs) d old_all
           | terminalStatus x = opStatusToJob x -- abort recursion
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
           where new_all = x == OP_STATUS_SUCCESS && old_all

-- | Determine if a job has started
jobStarted :: QueuedJob -> Bool
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus

-- | Determine if a job is finalised.
jobFinalized :: QueuedJob -> Bool
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus

-- | Determine if a job is finalized and its timestamp is before
-- a given time.
jobArchivable :: Timestamp -> QueuedJob -> Bool
jobArchivable ts = liftA2 (&&) jobFinalized
  $ maybe False (< ts)
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp

-- | Determine whether an opcode status is finalized.
opStatusFinalized :: OpStatus -> Bool
opStatusFinalized = (> OP_STATUS_RUNNING)

-- | Compute a job's priority.
calcJobPriority :: QueuedJob -> Int
calcJobPriority QueuedJob { qjOps = ops } =
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
    where helper [] = C.opPrioDefault
          helper ps = minimum ps

-- | Log but ignore an 'IOError'.
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
ignoreIOError a ignore_noent msg e = do
  unless (isDoesNotExistError e && ignore_noent) .
    logWarning $ msg ++ ": " ++ show e
  return a

-- | Compute the list of existing archive directories. Note that I/O
-- exceptions are swallowed and ignored.
allArchiveDirs :: FilePath -> IO [FilePath]
allArchiveDirs rootdir = do
  let adir = rootdir </> jobQueueArchiveSubDir
  contents <- getDirectoryContents adir `Control.Exception.catch`
               ignoreIOError [] False
                 ("Failed to list queue directory " ++ adir)
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
  filterM (\path ->
             liftM isDirectory (getFileStatus (adir </> path))
               `Control.Exception.catch`
               ignoreIOError False True
                 ("Failed to stat archive path " ++ path)) fpaths

-- | Build list of directories containing job files. Note: compared to
-- the Python version, this doesn't ignore a potential lost+found
-- file.
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
determineJobDirectories rootdir archived = do
  other <- if archived
             then allArchiveDirs rootdir
             else return []
  return $ rootdir:other

-- | Computes the list of all jobs in the given directories.
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs

-- | Sorts the a list of job IDs.
sortJobIDs :: [JobId] -> [JobId]
sortJobIDs = sortBy (comparing fromJobId)

-- | Computes the list of jobs in a given directory.
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
getDirJobIDs path =
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)

-- | Reads the job data from disk.
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
readJobDataFromDisk rootdir archived jid = do
  let live_path = liveJobFile rootdir jid
      archived_path = archivedJobFile rootdir jid
      all_paths = if archived
                    then [(live_path, False), (archived_path, True)]
                    else [(live_path, False)]
  foldM (\state (path, isarchived) ->
           liftM (\r -> Just (r, isarchived)) (readFile path)
             `Control.Exception.catch`
             ignoreIOError state True
               ("Failed to read job file " ++ path)) Nothing all_paths

-- | Failed to load job error.
noSuchJob :: Result (QueuedJob, Bool)
noSuchJob = Bad "Can't load job file"

-- | Loads a job from disk.
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
loadJobFromDisk rootdir archived jid = do
  raw <- readJobDataFromDisk rootdir archived jid
  -- note: we need some stricness below, otherwise the wrapping in a
  -- Result will create too much lazyness, and not close the file
  -- descriptors for the individual jobs
  return $! case raw of
             Nothing -> noSuchJob
             Just (str, arch) ->
               liftM (\qj -> (qj, arch)) .
               fromJResult "Parsing job file" $ Text.JSON.decode str

-- | Write a job to disk.
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
writeJobToDisk rootdir job = do
  let filename = liveJobFile rootdir . qjId $ job
      content = Text.JSON.encode . Text.JSON.showJSON $ job
  tryAndLogIOError (atomicWriteFile filename content)
                   ("Failed to write " ++ filename) Ok

-- | Replicate a job to all master candidates.
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
replicateJob rootdir mastercandidates job = do
  let filename = liveJobFile rootdir . qjId $ job
      content = Text.JSON.encode . Text.JSON.showJSON $ job
  filename' <- makeVirtualPath filename
  callresult <- executeRpcCall mastercandidates
                  $ RpcCallJobqueueUpdate filename' content
  let result = map (second (() <$)) callresult
  _ <- logRpcErrors result
  return result

-- | Replicate many jobs to all master candidates.
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
replicateManyJobs rootdir mastercandidates =
  mapM_ (replicateJob rootdir mastercandidates)

-- | Writes a job to a file and replicates it to master candidates.
writeAndReplicateJob :: (Error e)
                     => ConfigData -> FilePath -> QueuedJob
                     -> ResultT e IO [(Node, ERpcError ())]
writeAndReplicateJob cfg rootdir job = do
  mkResultT $ writeJobToDisk rootdir job
  liftIO $ replicateJob rootdir (Config.getMasterCandidates cfg) job

-- | Read the job serial number from disk.
readSerialFromDisk :: IO (Result JobId)
readSerialFromDisk = do
  filename <- jobQueueSerialFile
  tryAndLogIOError (readFile filename) "Failed to read serial file"
                   (makeJobIdS . rStripSpace)

-- | Allocate new job ids.
-- To avoid races while accessing the serial file, the threads synchronize
-- over a lock, as usual provided by a Lock.
allocateJobIds :: [Node] -> Lock -> Int -> IO (Result [JobId])
allocateJobIds mastercandidates lock n =
  if n <= 0
    then if n == 0
           then return $ Ok []
           else return . Bad
                  $ "Can only allocate non-negative number of job ids"
    else withLock lock $ do
      rjobid <- readSerialFromDisk
      case rjobid of
        Bad s -> return . Bad $ s
        Ok jid -> do
          let current = fromJobId jid
              serial_content = show (current + n) ++  "\n"
          serial <- jobQueueSerialFile
          write_result <- try $ atomicWriteFile serial serial_content
                          :: IO (Either IOError ())
          case write_result of
            Left e -> do
              let msg = "Failed to write serial file: " ++ show e
              logError msg
              return . Bad $ msg
            Right () -> do
              serial' <- makeVirtualPath serial
              _ <- executeRpcCall mastercandidates
                     $ RpcCallJobqueueUpdate serial' serial_content
              return $ mapM makeJobId [(current+1)..(current+n)]

-- | Allocate one new job id.
allocateJobId :: [Node] -> Lock -> IO (Result JobId)
allocateJobId mastercandidates lock = do
  jids <- allocateJobIds mastercandidates lock 1
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")

-- | Decide if job queue is open
isQueueOpen :: IO Bool
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)

-- | Start enqueued jobs by executing the Python code.
startJobs :: Livelock -- ^ Luxi's livelock path
          -> Lock -- ^ lock for forking new processes
          -> [QueuedJob] -- ^ the list of jobs to start
          -> IO [ErrorResult QueuedJob]
startJobs luxiLivelock forkLock jobs = do
  qdir <- queueDir
  let updateJob job llfile =
        void . mkResultT . writeJobToDisk qdir
          $ job { qjLivelock = Just llfile }
  let runJob job = withLock forkLock $ do
        (llfile, _) <- Exec.forkJobProcess job luxiLivelock
                                           (updateJob job)
        return $ job { qjLivelock = Just llfile }
  mapM (runResultT . runJob) jobs

-- | Try to prove that a queued job is dead. This function needs to know
-- the livelock of the caller (i.e., luxid) to avoid considering a job dead
-- that is in the process of forking off.
isQueuedJobDead :: MonadIO m => Livelock -> QueuedJob -> m Bool
isQueuedJobDead ownlivelock =
  maybe (return False) (liftIO . isDead)
  . mfilter (/= ownlivelock)
  . qjLivelock

-- | Waits for a job to finalize its execution.
waitForJob :: JobId -> Int -> ResultG (Bool, String)
waitForJob jid tmout = do
  qDir <- liftIO queueDir
  let jobfile = liveJobFile qDir jid
      load = liftM fst <$> loadJobFromDisk qDir False jid
      finalizedR = genericResult (const False) jobFinalized
  jobR <- liftIO $ watchFileBy jobfile tmout finalizedR load
  case calcJobStatus <$> jobR of
    Ok s | s == JOB_STATUS_CANCELED ->
             return (True, "Job successfully cancelled")
         | finalizedR jobR ->
            return (False, "Job exited before it could have been canceled,\
                           \ status " ++ show s)
         | otherwise ->
             return (False, "Job could not be canceled, status "
                            ++ show s)
    Bad e -> failError $ "Can't read job status: " ++ e

-- | Try to cancel a job that has already been handed over to execution,
-- by terminating the process.
cancelJob :: Bool -- ^ if True, use sigKILL instead of sigTERM
          -> Livelock -- ^ Luxi's livelock path
          -> JobId -- ^ the job to cancel
          -> IO (ErrorResult (Bool, String))
cancelJob kill luxiLivelock jid = runResultT $ do
  -- we can't terminate the job if it's just being started, so
  -- retry several times in such a case
  result <- runMaybeT . msum . flip map [0..5 :: Int] $ \tryNo -> do
    -- if we're retrying, sleep for some time
    when (tryNo > 0) . liftIO . threadDelay $ 100000 * (2 ^ tryNo)

    -- first check if the job is alive so that we don't kill some other
    -- process by accident
    qDir <- liftIO queueDir
    (job, _) <- lift . mkResultT $ loadJobFromDisk qDir True jid
    let jName = ("Job " ++) . show . fromJobId . qjId $ job
    dead <- isQueuedJobDead luxiLivelock job
    case qjProcessId job of
      _ | dead ->
        return (True, jName ++ " has been already dead")
      Just pid -> do
        liftIO $ signalProcess (if kill then sigKILL else sigTERM) pid
        if not kill then
          if calcJobStatus job > JOB_STATUS_WAITING
            then return (False, "Job no longer waiting, can't cancel\
                                \ (informed it anyway)")
            else lift $ waitForJob jid C.luxiCancelJobTimeout
          else return (True, "SIGKILL send to the process")
      _ -> do
        logDebug $ jName ++ " in its startup phase, retrying"
        mzero
  return $ fromMaybe (False, "Timeout: job still in its startup phase") result

-- | Inform a job that it is requested to change its priority. This is done
-- by writing the new priority to a file and sending SIGUSR1.
tellJobPriority :: Livelock -- ^ Luxi's livelock path
                -> JobId -- ^ the job to inform
                -> Int -- ^ the new priority
                -> IO (ErrorResult (Bool, String))
tellJobPriority luxiLivelock jid prio = runResultT $ do
  let  jidS = show $ fromJobId jid
       jName = "Job " ++ jidS
  mDir <- liftIO luxidMessageDir
  let prioFile = mDir </> jidS ++ ".prio"
  liftIO . atomicWriteFile prioFile $ show prio
  qDir <- liftIO queueDir
  (job, _) <- mkResultT $ loadJobFromDisk qDir True jid
  dead <- isQueuedJobDead luxiLivelock job
  case qjProcessId job of
    _ | dead -> do
      liftIO $ removeFile prioFile
      return (False, jName ++ " is dead")
    Just pid -> do
      liftIO $ signalProcess sigUSR1 pid
      return (True, jName ++ " with pid " ++ show pid ++ " signaled")
    _ -> return (False, jName ++ "'s pid unknown")

-- | Notify a job that something relevant happened, e.g., a lock became
-- available. We do this by sending sigHUP to the process.
notifyJob :: ProcessID  -> IO (ErrorResult ())
notifyJob pid = runResultT $ do
  logDebug $ "Signalling process " ++ show pid
  liftIO $ signalProcess sigHUP pid

-- | Permissions for the archive directories.
queueDirPermissions :: FilePermissions
queueDirPermissions = FilePermissions { fpOwner = Just GanetiMasterd
                                      , fpGroup = Just $ ExtraGroup DaemonsGroup
                                      , fpPermissions = 0o0750
                                      }

-- | Try, at most until the given endtime, to archive some of the given
-- jobs, if they are older than the specified cut-off time; also replicate
-- archival of the additional jobs. Return the pair of the number of jobs
-- archived, and the number of jobs remaining int he queue, asuming the
-- given numbers about the not considered jobs.
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
                        -> FilePath -- ^ queue root directory
                        -> ClockTime -- ^ Endtime
                        -> Timestamp -- ^ cut-off time for archiving jobs
                        -> Int -- ^ number of jobs alread archived
                        -> [JobId] -- ^ Additional jobs to replicate
                        -> [JobId] -- ^ List of job-ids still to consider
                        -> IO (Int, Int)
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
  unless (null torepl) . (>> return ())
   . forkIO $ replicateFn torepl
  return (arch, 0)

archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
      continue = archiveMore arch torepl jids
      jidname = show $ fromJobId jid
  time <- getClockTime
  if time >= endt
    then do
      _ <- forkIO $ replicateFn torepl
      return (arch, length (jid:jids))
    else do
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
      loadResult <- loadJobFromDisk qDir False jid
      case loadResult of
        Bad _ -> continue
        Ok (job, _) ->
          if jobArchivable cutt job
            then do
              let live = liveJobFile qDir jid
                  archive = archivedJobFile qDir jid
              renameResult <- safeRenameFile queueDirPermissions
                                live archive
              case renameResult of
                Bad s -> do
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
                                 ++ " failed unexpectedly: " ++ s
                  continue
                Ok () -> do
                  let torepl' = jid:torepl
                  if length torepl' >= 10
                    then do
                      _ <- forkIO $ replicateFn torepl'
                      archiveMore (arch + 1) [] jids
                    else archiveMore (arch + 1) torepl' jids
            else continue

-- | Archive jobs older than the given time, but do not exceed the timeout for
-- carrying out this task.
archiveJobs :: ConfigData -- ^ cluster configuration
               -> Int  -- ^ time the job has to be in the past in order
                       -- to be archived
               -> Int -- ^ timeout
               -> [JobId] -- ^ jobs to consider
               -> IO (Int, Int)
archiveJobs cfg age timeout jids = do
  now <- getClockTime
  qDir <- queueDir
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
      cuttime = if age < 0 then noTimestamp
                           else advanceTimestamp (- age) (fromClockTime now)
      mcs = Config.getMasterCandidates cfg
      replicateFn jobs = do
        let olds = map (liveJobFile qDir) jobs
            news = map (archivedJobFile qDir) jobs
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
        return ()
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids