{-# LANGUAGE FlexibleContexts #-}

{-| Provides a general functionality for workers that run on the background
and perform some task when triggered.

Each task can process multiple triggers, if they're coming faster than the
tasks are being processed.

Properties:

- If a worked is triggered, it will perform its action eventually.
  (i.e. it won't miss a trigger).

- If the worker is busy, the new action will start immediately when it finishes
  the current one.

- If the worker is idle, it'll start the action immediately.

- If the caller uses 'triggerAndWait', the call will return just after the
  earliest action following the trigger is finished.

- If the caller uses 'triggerWithResult', it will recive an 'Async' value that
  can be used to wait for the result (which will be available once the earliest
  action following the trigger finishes).

- If the worker finishes an action and there are no pending triggers since the
  start of the last action, it becomes idle and waits for a new trigger.

-}

{-

Copyright (C) 2014 Google Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-}

module Ganeti.Utils.AsyncWorker
  ( AsyncWorker
  , mkAsyncWorker
  , mkAsyncWorker_
  , trigger
  , trigger_
  , triggerWithResult
  , triggerWithResult_
  , triggerWithResultMany
  , triggerWithResultMany_
  , triggerAndWait
  , triggerAndWait_
  , triggerAndWaitMany
  , triggerAndWaitMany_
  , Async
  , wait
  , waitMany
  ) where

import Control.Monad
import Control.Monad.Base
import Control.Monad.Trans.Control
import Control.Concurrent (ThreadId)
import Control.Concurrent.Lifted (fork, yield)
import Control.Concurrent.MVar.Lifted
import Data.Monoid
import qualified Data.Traversable as T
import Data.IORef.Lifted

-- * The definition and construction of asynchronous workers

-- Represents the state of the requests to the worker. The worker is either
-- 'Idle', or has 'Pending' triggers to process. After the corresponding
-- action is run, all the 'MVar's in the list are notified with the result.
-- Note that the action needs to be run even if the list is empty, as it
-- means that there are pending requests, only nobody needs to be notified of
-- their results.
data TriggerState i a
  = Idle
  | Pending i [MVar a]


-- | Adds a new trigger to the current state (therefore the result is always
-- 'Pending'), optionally adding a 'MVar' that will receive the output.
addTrigger :: (Monoid i)
           => i -> Maybe (MVar a) -> TriggerState i a -> TriggerState i a
addTrigger i mmvar state = let rs = recipients state
                           in Pending (input state <> i)
                                      (maybe rs (: rs) mmvar)
  where
    recipients Idle           = []
    recipients (Pending _ rs) = rs
    input Idle          = mempty
    input (Pending j _) = j

-- | Represent an asynchronous worker whose single action execution returns a
-- value of type @a@.
data AsyncWorker i a
    = AsyncWorker ThreadId (IORef (TriggerState i a)) (MVar ())

-- | Given an action, construct an 'AsyncWorker'.
mkAsyncWorker :: (Monoid i, MonadBaseControl IO m)
              => (i -> m a) -> m (AsyncWorker i a)
mkAsyncWorker act = do
    trig <- newMVar ()
    ref <- newIORef Idle
    thId <- fork . forever $ do
        takeMVar trig           -- wait for a trigger
        state <- swap ref Idle  -- check the state of pending requests
        -- if there are pending requests, run the action and send them results
        case state of
          Idle          -> return () -- all trigers have been processed, we've
                                     -- been woken up by a trigger that has been
                                     -- already included in the last run
          Pending i rs  -> act i >>= forM_ rs . flip tryPutMVar
        -- Give other threads a chance to do work while we're waiting for
        -- something to happen.
        yield
    return $ AsyncWorker thId ref trig
  where
    swap :: (MonadBase IO m) => IORef a -> a -> m a
    swap ref x = atomicModifyIORef ref ((,) x)

-- | Given an action, construct an 'AsyncWorker' with no input.
mkAsyncWorker_ :: (MonadBaseControl IO m)
               => m a -> m (AsyncWorker () a)
mkAsyncWorker_ = mkAsyncWorker . const

-- * Triggering workers and obtaining their results

-- | An asynchronous result that will eventually yield a value.
newtype Async a = Async { asyncResult :: MVar a }

-- | Waits for an asynchronous result to finish and yield a value.
wait :: (MonadBase IO m) => Async a -> m a
wait = readMVar . asyncResult

-- | Waits for all asynchronous results in a collection to finish and yield a
-- value.
waitMany :: (MonadBase IO m, T.Traversable t) => t (Async a) -> m (t a)
waitMany = T.mapM wait

-- An internal function for triggering a worker, optionally registering
-- a callback 'MVar'
triggerInternal :: (MonadBase IO m, Monoid i)
                => i -> Maybe (MVar a) -> AsyncWorker i a -> m ()
triggerInternal i mmvar (AsyncWorker _ ref trig) = do
    atomicModifyIORef ref (\ts -> (addTrigger i mmvar ts, ()))
    _ <- tryPutMVar trig ()
    return ()

-- | Trigger a worker, letting it run its action asynchronously, but do not
-- wait for the result.
trigger :: (MonadBase IO m, Monoid i) => i -> AsyncWorker i a -> m ()
trigger = flip triggerInternal Nothing

-- | Trigger a worker with no input, letting it run its action asynchronously,
-- but do not wait for the result.
trigger_ :: (MonadBase IO m) => AsyncWorker () a -> m ()
trigger_ = trigger ()

-- | Trigger a worker and wait until the action following this trigger
-- finishes. The returned `Async` value can be used to wait for the result of
-- the action.
triggerWithResult :: (MonadBase IO m, Monoid i)
                  => i -> AsyncWorker i a -> m (Async a)
triggerWithResult i worker = do
    result <- newEmptyMVar
    triggerInternal i (Just result) worker
    return $ Async result

-- | Trigger a worker and wait until the action following this trigger
-- finishes.
--
-- See 'triggerWithResult'.
triggerWithResult_ :: (MonadBase IO m) => AsyncWorker () a -> m (Async a)
triggerWithResult_ = triggerWithResult ()

-- | Trigger a list of workers and wait until all the actions following these
-- triggers finish. The returned collection of `Async` values can be used to
-- wait for the results of the actions.
triggerWithResultMany :: (T.Traversable t, MonadBase IO m, Monoid i)
                      => i -> t (AsyncWorker i a) -> m (t (Async a))
triggerWithResultMany i = T.mapM (triggerWithResult i)
--
-- | Trigger a list of workers with no inputs and wait until all the actions
-- following these triggers finish.
--
-- See 'triggerWithResultMany'.
triggerWithResultMany_ :: (T.Traversable t, MonadBase IO m)
                       => t (AsyncWorker () a) -> m (t (Async a))
triggerWithResultMany_ = triggerWithResultMany ()

-- * Helper functions for waiting for results just after triggering workers

-- | Trigger a list of workers and wait until all the actions following these
-- triggers finish. Returns the results of the actions.
--
-- Note that there is a significant difference between 'triggerAndWaitMany'
-- and @mapM triggerAndWait@. The latter runs all the actions of the workers
-- sequentially, while the former runs them in parallel.
triggerAndWaitMany :: (T.Traversable t, MonadBase IO m, Monoid i)
                   => i -> t (AsyncWorker i a) -> m (t a)
triggerAndWaitMany i = waitMany <=< triggerWithResultMany i

-- See 'triggetAndWaitMany'.
triggerAndWaitMany_ :: (T.Traversable t, MonadBase IO m)
                    => t (AsyncWorker () a) -> m (t a)
triggerAndWaitMany_ = triggerAndWaitMany ()

-- | Trigger a worker and wait until the action following this trigger
-- finishes. Return the result of the action.
triggerAndWait :: (MonadBase IO m, Monoid i) => i -> AsyncWorker i a -> m a
triggerAndWait i = wait <=< triggerWithResult i

-- | Trigger a worker with no input and wait until the action following this
-- trigger finishes. Return the result of the action.
triggerAndWait_ :: (MonadBase IO m) => AsyncWorker () a -> m a
triggerAndWait_ = triggerAndWait ()