{-# LANGUAGE TemplateHaskell, RankNTypes #-}
{-| Implementation of a reader for the job queue.

-}

{-

Copyright (C) 2013 Google Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-}

module Ganeti.JQScheduler
  ( JQStatus
  , jqLivelock
  , jqForkLock
  , emptyJQStatus
  , initJQScheduler
  , enqueueNewJobs
  , dequeueJob
  , setJobPriority
  ) where

import Control.Applicative (liftA2)
import Control.Arrow
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Function (on)
import Data.Functor ((<$))
import Data.IORef
import Data.List
import Data.Maybe
import qualified Data.Set as S
import System.INotify

import Ganeti.BasicTypes
import Ganeti.Constants as C
import Ganeti.Errors
import Ganeti.JQueue as JQ
import Ganeti.Lens hiding (chosen)
import Ganeti.Logging
import Ganeti.Objects
import Ganeti.Path
import Ganeti.Types
import Ganeti.Utils
import Ganeti.Utils.Livelock
import Ganeti.Utils.MVarLock

data JobWithStat = JobWithStat { jINotify :: Maybe INotify
                               , jStat :: FStat
                               , jJob :: QueuedJob
                               }

$(makeCustomLenses' ''JobWithStat ['jJob])

data Queue = Queue { qEnqueued :: [JobWithStat]
                   , qRunning :: [JobWithStat]
                   , qManipulated :: [JobWithStat] -- ^ running jobs that are
                                                   -- being manipulated by
                                                   -- some thread
                   }

$(makeCustomLenses ''Queue)

{-| Representation of the job queue

We keep two lists of jobs (together with information about the last
fstat result observed): the jobs that are enqueued, but not yet handed
over for execution, and the jobs already handed over for execution. They
are kept together in a single IORef, so that we can atomically update
both, in particular when scheduling jobs to be handed over for execution.

-}

data JQStatus = JQStatus
  { jqJobs :: IORef Queue
  , jqConfig :: IORef (Result ConfigData)
  , jqLivelock :: Livelock
  , jqForkLock :: Lock
  }


emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
emptyJQStatus config = do
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
  (_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
  forkLock <- newLock
  return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
                  , jqForkLock = forkLock }

-- | Apply a function on the running jobs.
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onRunningJobs = over qRunningL

-- | Apply a function on the queued jobs.
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
onQueuedJobs = over qEnqueuedL

-- | Obtain a JobWithStat from a QueuedJob.
unreadJob :: QueuedJob -> JobWithStat
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}

-- | Reload interval for polling the running jobs for updates in microseconds.
watchInterval :: Int
watchInterval = C.luxidJobqueuePollInterval * 1000000 

-- | Read a cluster parameter from the configuration, using a default if the
-- configuration is not available.
getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
getConfigValue param defaultvalue =
  liftM (genericResult (const defaultvalue) (param . configCluster))
  . readIORef . jqConfig

-- | Get the maximual number of jobs to be run simultaneously from the
-- configuration. If the configuration is not available, be conservative
-- and use the smallest possible value, i.e., 1.
getMaxRunningJobs :: JQStatus -> IO Int
getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1

-- | Get the maximual number of jobs to be tracked simultaneously from the
-- configuration. If the configuration is not available, be conservative
-- and use the smallest possible value, i.e., 1.
getMaxTrackedJobs :: JQStatus -> IO Int
getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1

-- | Get the number of jobs currently running.
getRQL :: JQStatus -> IO Int
getRQL = liftM (length . qRunning) . readIORef . jqJobs

-- | Wrapper function to atomically update the jobs in the queue status.
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)

-- | Reread a job from disk, if the file has changed.
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
  let jid = qjId job
  qdir <- queueDir
  let fpath = liveJobFile qdir jid
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
  changedResult <- try $ needsReload fstat fpath
                   :: IO (Either IOError (Maybe FStat))
  let changed = either (const $ Just nullFStat) id changedResult
  case changed of
    Nothing -> do
      logDebug $ "File " ++ fpath ++ " not changed on disk."
      return Nothing
    Just fstat' -> do
      let jids = show $ fromJobId jid
      logDebug $ "Rereading job "  ++ jids
      readResult <- loadJobFromDisk qdir True jid
      case readResult of
        Bad s -> do
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
          return Nothing
        Ok (job', _) -> do
          logDebug $ "Read job " ++ jids ++ ", status is "
                     ++ show (calcJobStatus job')
          return . Just $ jWS {jStat=fstat', jJob=job'}
                          -- jINotify unchanged

-- | Update a job in the job queue, if it is still there. This is the
-- pure function for inserting a previously read change into the queue.
-- as the change contains its time stamp, we don't have to worry about a
-- later read change overwriting a newer read state. If this happens, the
-- fstat value will be outdated, so the next poller run will fix this.
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
updateJobStatus job' =
  let jid = qjId $ jJob job' in
  map (\job -> if qjId (jJob job) == jid then job' else job)

-- | Update a single job by reading it from disk, if necessary.
updateJob :: JQStatus -> JobWithStat -> IO ()
updateJob state jb = do
  jb' <- readJobStatus jb
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
    logDebug "Scheduler noticed a job to have finished."
    cleanupFinishedJobs state
    scheduleSomeJobs state

-- | Move a job from one part of the queue to another.
-- Return the job that was moved, or 'Nothing' if it wasn't found in
-- the queue.
moveJob :: Lens' Queue [JobWithStat] -- ^ from queue
        -> Lens' Queue [JobWithStat] -- ^ to queue
        -> JobId
        -> Queue
        -> (Queue, Maybe JobWithStat)
moveJob fromQ toQ jid queue =
    -- traverse over the @(,) [JobWithStats]@ functor to extract the job
    case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
      (job : _, queue') -> (over toQ (++ [job]) queue', Just job)
      _                 -> (queue, Nothing)

-- | Atomically move a job from one part of the queue to another.
-- Return the job that was moved, or 'Nothing' if it wasn't found in
-- the queue.
moveJobAtomic :: Lens' Queue [JobWithStat] -- ^ from queue
              -> Lens' Queue [JobWithStat] -- ^ to queue
              -> JobId
              -> JQStatus
              -> IO (Maybe JobWithStat)
moveJobAtomic fromQ toQ jid qstat =
  atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)

-- | Manipulate a running job by atomically moving it from 'qRunning'
-- into 'qManipulated', running a given IO action and then atomically
-- returning it back.
--
-- Returns the result of the IO action, or 'Nothing', if the job wasn't found
-- in the queue.
manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
manipulateRunningJob qstat jid k = do
  jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
  case jobOpt of
    Nothing -> return Nothing
    Just _  -> (Just `liftM` k)
               `finally` moveJobAtomic qManipulatedL qRunningL jid qstat

-- | Sort out the finished jobs from the monitored part of the queue.
-- This is the pure part, splitting the queue into a remaining queue
-- and the jobs that were removed.
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
sortoutFinishedJobs queue =
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
  in (queue {qRunning=run'}, fin)

-- | Actually clean up the finished jobs. This is the IO wrapper around
-- the pure `sortoutFinishedJobs`.
cleanupFinishedJobs :: JQStatus -> IO ()
cleanupFinishedJobs qstate = do
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
      jlist = commaJoin $ map showJob finished
  unless (null finished)
    . logInfo $ "Finished jobs: " ++ jlist
  mapM_ (maybe (return ()) killINotify . jINotify) finished

-- | Watcher task for a job, to update it on file changes. It also
-- reinstantiates itself upon receiving an Ignored event.
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
jobWatcher state jWS e = do
  let jid = qjId $ jJob jWS
      jids = show $ fromJobId jid
  logInfo $ "Scheduler notified of change of job " ++ jids
  logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
  let inotify = jINotify jWS
  when (e == Ignored  && isJust inotify) $ do
    qdir <- queueDir
    let fpath = liveJobFile qdir jid
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
           (jobWatcher state jWS)
    return ()
  updateJob state jWS

-- | Attach the job watcher to a running job.
attachWatcher :: JQStatus -> JobWithStat -> IO ()
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
  max_watch <- getMaxTrackedJobs state
  rql <- getRQL state
  if rql < max_watch
   then do
     inotify <- initINotify
     qdir <- queueDir
     let fpath = liveJobFile qdir . qjId $ jJob jWS
         jWS' = jWS { jINotify=Just inotify }
     logDebug $ "Attaching queue watcher for " ++ fpath
     _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
     modifyJobs state . onRunningJobs $ updateJobStatus jWS'
   else logDebug $ "Not attaching watcher for job "
                   ++ (show . fromJobId . qjId $ jJob jWS)
                   ++ ", run queue length is " ++ show rql

-- | For a queued job, determine whether it is eligible to run, i.e.,
-- if no jobs it depends on are either enqueued or running.
jobEligible :: Queue -> JobWithStat -> Bool
jobEligible queue jWS =
  let jdeps = getJobDependencies $ jJob jWS
      blocks = flip elem jdeps . qjId . jJob
  in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue

-- | Decide on which jobs to schedule next for execution. This is the
-- pure function doing the scheduling.
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
selectJobsToRun count queue =
  let n = count - length (qRunning queue) - length (qManipulated queue)
      chosen = take n . filter (jobEligible queue) $ qEnqueued queue
      remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)

-- | Logs errors of failed jobs and returns the set of job IDs.
logFailedJobs :: (MonadLog m)
              => [(JobWithStat, GanetiException)] -> m (S.Set JobId)
logFailedJobs [] = return S.empty
logFailedJobs jobs = do
  let jids = S.fromList . map (qjId . jJob . fst) $ jobs
      jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
  logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
               ++ show (map snd jobs)
  return jids

-- | Requeue jobs that were previously selected for execution
-- but couldn't be started.
requeueJobs :: JQStatus -> [(JobWithStat, GanetiException)] -> IO ()
requeueJobs qstate jobs = do
  jids <- logFailedJobs jobs
  let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
  logWarning "Rescheduling jobs"
  modifyJobs qstate $ onQueuedJobs (map fst jobs ++)
                      . onRunningJobs rmJobs

-- | Fail jobs that were previously selected for execution
-- but couldn't be started.
failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
         -> IO ()
failJobs cfg qstate jobs = do
  qdir <- queueDir
  now <- currentTimestamp
  jids <- logFailedJobs jobs
  let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
  let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
  logWarning $ "Failing jobs " ++ sjobs
  modifyJobs qstate $ onRunningJobs rmJobs
  let trySaveJob :: JobWithStat -> ResultT String IO ()
      trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
      reason jid msg =
        ( "gnt:daemon:luxid:startjobs"
        , "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
        , reasonTrailTimestamp now )
      failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
      failAndSaveJobWithStat (jws, err) =
        trySaveJob . over jJobL (failJob err) $ jws
  mapM_ (runResultT . failAndSaveJobWithStat) jobs
  logDebug $ "Failed jobs " ++ sjobs

-- | Schedule jobs to be run. This is the IO wrapper around the
-- pure `selectJobsToRun`.
scheduleSomeJobs :: JQStatus -> IO ()
scheduleSomeJobs qstate = do
  count <- getMaxRunningJobs qstate
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
  let jobs = map jJob chosen
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
    $ map (show . fromJobId . qjId) jobs
  mapM_ (attachWatcher qstate) chosen
  cfgR <- readIORef (jqConfig qstate)
  case cfgR of
    Bad err -> do
      let msg = "Configuration unavailable: " ++ err
      logError msg
      requeueJobs qstate . map (\x -> (x, strMsg msg)) $ chosen
    Ok cfg -> do
      result <- JQ.startJobs cfg (jqLivelock qstate) (jqForkLock qstate) jobs
      let badWith (x, Bad y) = Just (x, y)
          badWith _          = Nothing
      let failed = mapMaybe badWith $ zip chosen result
      unless (null failed) $ failJobs cfg qstate failed

-- | Format the job queue status in a compact, human readable way.
showQueue :: Queue -> String
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
  let showids = show . map (fromJobId . qjId . jJob)
  in "Waiting jobs: " ++ showids waiting 
       ++ "; running jobs: " ++ showids running

-- | Check if a job died, and clean up if so.
checkForDeath :: JQStatus -> JobWithStat -> IO ()
checkForDeath state jobWS = do
  let job = jJob jobWS
      jid = qjId job
      sjid = show $ fromJobId jid
      livelock = qjLivelock job
  logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
  died <- maybe (return False) isDead
          . mfilter (/= jqLivelock state)
          $ livelock
  when died $ do
    logInfo $ "Detected death of job " ++ sjid
    -- if we manage to remove the job from the queue, we own the job file
    -- and can manipulate it.
    void . manipulateRunningJob state jid . runResultT $ do
      jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
      unless (jobFinalized . jJob $ jobWS') . void $ do
        -- If the job isn't finalized, but dead, add a corresponding
        -- failed status.
        now <- liftIO currentTimestamp
        qDir <- liftIO queueDir
        let reason = ( "gnt:daemon:luxid:deathdetection"
                     , "detected death of job " ++ sjid
                     , reasonTrailTimestamp now )
            failedJob = failQueuedJob reason now $ jJob jobWS'
        cfg <- mkResultT . readIORef $ jqConfig state
        writeAndReplicateJob cfg qDir failedJob

-- | Time-based watcher for updating the job queue.
onTimeWatcher :: JQStatus -> IO ()
onTimeWatcher qstate = forever $ do
  threadDelay watchInterval
  logDebug "Job queue watcher timer fired"
  jobs <- readIORef (jqJobs qstate)
  mapM_ (checkForDeath qstate) $ qRunning jobs
  jobs' <- readIORef (jqJobs qstate)
  mapM_ (updateJob qstate) $ qRunning jobs'
  cleanupFinishedJobs qstate
  jobs'' <- readIORef (jqJobs qstate)
  logInfo $ showQueue jobs''
  scheduleSomeJobs qstate
  logDebug "Job queue watcher cycle finished"

-- | Read a single, non-archived, job, specified by its id, from disk.
readJobFromDisk :: JobId -> IO (Result JobWithStat)
readJobFromDisk jid = do
  qdir <- queueDir
  let fpath = liveJobFile qdir jid
  logDebug $ "Reading " ++ fpath
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
  let fstat = either (const nullFStat) id tryFstat
  loadResult <- JQ.loadJobFromDisk qdir False jid
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult

-- | Read all non-finalized jobs from disk.
readJobsFromDisk :: IO [JobWithStat]
readJobsFromDisk = do
  logInfo "Loading job queue"
  qdir <- queueDir
  eitherJids <- JQ.getJobIDs [qdir]
  let jids = genericResult (const []) JQ.sortJobIDs eitherJids
      jidsstring = commaJoin $ map (show . fromJobId) jids
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
  jobs <- mapM readJobFromDisk jids
  return $ justOk jobs

-- | Set up the job scheduler. This will also start the monitoring
-- of changes to the running jobs.
initJQScheduler :: JQStatus -> IO ()
initJQScheduler qstate = do
  alljobs <- readJobsFromDisk
  let jobs = filter (not . jobFinalized . jJob) alljobs
      (running, queued) = partition (jobStarted . jJob) jobs
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
  jqjobs <- readIORef (jqJobs qstate)
  logInfo $ showQueue jqjobs
  scheduleSomeJobs qstate
  logInfo "Starting time-based job queue watcher"
  _ <- forkIO $ onTimeWatcher qstate
  return ()

-- | Enqueue new jobs. This will guarantee that the jobs will be executed
-- eventually.
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
enqueueNewJobs state jobs = do
  logInfo . (++) "New jobs enqueued: " . commaJoin
    $ map (show . fromJobId . qjId) jobs
  let jobs' = map unreadJob jobs
      insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
      addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
  modifyJobs state (onQueuedJobs addJobs)
  scheduleSomeJobs state

-- | Pure function for removing a queued job from the job queue by
-- atomicModifyIORef. The answer is Just the job if the job could be removed
-- before being handed over to execution, Nothing if it already was started
-- and a Bad result if the job is not found in the queue.
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
rmJob jid q =
  let isJid = (jid ==) . qjId . jJob
      (found, queued') = partition isJid $ qEnqueued q
      isRunning = any isJid $ qRunning q
      sJid = (++) "Job " . show $ fromJobId jid
  in case (found, isRunning) of
    ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
    (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
                           ++ sJid ++ " queued multiple times")
    (_, True) -> (q, Ok Nothing)
    _ -> (q, Bad $ sJid ++ " not found in queue")

-- | Try to remove a queued job from the job queue. Return True, if
-- the job could be removed from the queue before being handed over
-- to execution, False if the job already started, and a Bad result
-- if the job is unknown.
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
dequeueJob state jid = do
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
  let result' = fmap isJust result
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
              ++ " is " ++ show result'
  return result'

-- | Change the priority of a queued job (once the job is handed over
-- to execution, the job itself needs to be informed). To avoid the
-- job being started unmodified, it is temporarily unqueued during the
-- change. Return the modified job, if the job's priority was sucessfully
-- modified, Nothing, if the job already started, and a Bad value, if the job
-- is unkown.
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
setJobPriority state jid prio = runResultT $ do
  maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
  case maybeJob of
    Nothing -> return Nothing
    Just job -> do
      let job' = changeJobPriority prio job
      qDir <- liftIO queueDir
      mkResultT $ writeJobToDisk qDir job'
      liftIO $ enqueueNewJobs state [job']
      return $ Just job'