module Ganeti.WConfd.Monad
( DaemonHandle
, dhConfigPath
, dhLivelock
, mkDaemonHandle
, WConfdMonadInt
, runWConfdMonadInt
, WConfdMonad
, daemonHandle
, modifyConfigState
, forceConfigStateDistribution
, readConfigState
, modifyConfigDataErr_
, modifyLockWaiting
, modifyLockWaiting_
, readLockWaiting
, readLockAllocation
, modifyTempResState
, modifyTempResStateErr
, readTempResState
) where
#define MIN_VERSION_monad_control(maj,min,rev) \
(((maj)<MONAD_CONTROL_MAJOR)|| \
(((maj)==MONAD_CONTROL_MAJOR)&&((min)<=MONAD_CONTROL_MINOR))|| \
(((maj)==MONAD_CONTROL_MAJOR)&&((min)==MONAD_CONTROL_MINOR)&& \
((rev)<=MONAD_CONTROL_REV)))
import Control.Applicative
import Control.Arrow ((&&&), second)
import Control.Monad
import Control.Monad.Base
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.State
import Control.Monad.Trans.Control
import Data.Functor.Identity
import Data.IORef.Lifted
import Data.Monoid (Any(..))
import qualified Data.Set as S
import Data.Tuple (swap)
import System.Time (getClockTime)
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.JQueue (notifyJob)
import Ganeti.Lens
import Ganeti.Locking.Allocation (LockAllocation)
import Ganeti.Locking.Locks
import Ganeti.Locking.Waiting (getAllocation)
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import Ganeti.Objects (ConfigData)
import Ganeti.Utils.AsyncWorker
import Ganeti.Utils.IORef
import Ganeti.Utils.Livelock (Livelock)
import Ganeti.WConfd.ConfigState
import Ganeti.WConfd.TempRes
data DaemonState = DaemonState
{ dsConfigState :: ConfigState
, dsLockWaiting :: GanetiLockWaiting
, dsTempRes :: TempResState
}
$(makeCustomLenses ''DaemonState)
data DaemonHandle = DaemonHandle
{ dhDaemonState :: IORef DaemonState
, dhConfigPath :: FilePath
, dhSaveConfigWorker :: AsyncWorker Any ()
, dhSaveLocksWorker :: AsyncWorker () ()
, dhSaveTempResWorker :: AsyncWorker () ()
, dhLivelock :: Livelock
}
mkDaemonHandle :: FilePath
-> ConfigState
-> GanetiLockWaiting
-> TempResState
-> (IO ConfigState -> [AsyncWorker () ()]
-> ResultG (AsyncWorker Any ()))
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-> (IO ConfigState -> ResultG (AsyncWorker () ()))
-> (IO GanetiLockWaiting -> ResultG (AsyncWorker () ()))
-> (IO TempResState -> ResultG (AsyncWorker () ()))
-> Livelock
-> ResultG DaemonHandle
mkDaemonHandle cpath cstat lstat trstat
saveWorkerFn distMCsWorkerFn distSSConfWorkerFn
saveLockWorkerFn saveTempResWorkerFn
livelock = do
ds <- newIORef $ DaemonState cstat lstat trstat
let readConfigIO = dsConfigState `liftM` readIORef ds :: IO ConfigState
ssconfWorker <- distSSConfWorkerFn readConfigIO
distMCsWorker <- distMCsWorkerFn readConfigIO
saveWorker <- saveWorkerFn readConfigIO [ distMCsWorker
, ssconfWorker ]
saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds
saveTempResWorker <- saveTempResWorkerFn $ dsTempRes `liftM` readIORef ds
return $ DaemonHandle ds cpath saveWorker saveLockWorker saveTempResWorker
livelock
type WConfdMonadIntType = ReaderT DaemonHandle IO
newtype WConfdMonadInt a = WConfdMonadInt
{ getWConfdMonadInt :: WConfdMonadIntType a }
deriving (Functor, Applicative, Monad, MonadIO, MonadBase IO, MonadLog)
instance MonadBaseControl IO WConfdMonadInt where
#if MIN_VERSION_monad_control(1,0,0)
type StM WConfdMonadInt b = StM WConfdMonadIntType b
liftBaseWith f = WConfdMonadInt . liftBaseWith
$ \r -> f (r . getWConfdMonadInt)
restoreM = WConfdMonadInt . restoreM
#else
newtype StM WConfdMonadInt b = StMWConfdMonadInt
{ runStMWConfdMonadInt :: StM WConfdMonadIntType b }
liftBaseWith f = WConfdMonadInt . liftBaseWith
$ \r -> f (liftM StMWConfdMonadInt . r . getWConfdMonadInt)
restoreM = WConfdMonadInt . restoreM . runStMWConfdMonadInt
#endif
runWConfdMonadInt :: WConfdMonadInt a -> DaemonHandle -> IO a
runWConfdMonadInt (WConfdMonadInt k) = runReaderT k
type WConfdMonad = ResultT GanetiException WConfdMonadInt
type AtomicModifyMonad a = ResultT GanetiException WriterLog a
daemonHandle :: WConfdMonad DaemonHandle
daemonHandle = lift . WConfdMonadInt $ ask
readConfigState :: WConfdMonad ConfigState
readConfigState = liftM dsConfigState . readIORef . dhDaemonState
=<< daemonHandle
modifyConfigStateErr
:: (TempResState -> ConfigState -> AtomicModifyMonad (a, ConfigState))
-> WConfdMonad a
modifyConfigStateErr f = do
dh <- daemonHandle
now <- liftIO getClockTime
let unpackResult cs (r, cs')
| cs /= cs' = ( (r, True, needsFullDist cs cs')
, over csConfigDataL (bumpSerial now) cs' )
| otherwise = ((r, False, False), cs')
let modCS ds@(DaemonState { dsTempRes = tr }) =
mapMOf2 dsConfigStateL (\cs -> liftM (unpackResult cs) (f tr cs)) ds
(r, modified, distSync) <- atomicModifyIORefErrLog (dhDaemonState dh)
(liftM swap . modCS)
when modified $ do
if distSync
then do
logDebug $ "Triggering config write" ++
" together with full synchronous distribution"
liftBase . triggerAndWait (Any True) . dhSaveConfigWorker $ dh
logDebug "Config write and distribution finished"
else do
logDebug $ "Triggering config write" ++
" and asynchronous distribution"
liftBase . triggerAndWait (Any False) . dhSaveConfigWorker $ dh
logDebug "Config writer finished with local task"
return ()
return r
modifyConfigStateErr_
:: (TempResState -> ConfigState -> AtomicModifyMonad ConfigState)
-> WConfdMonad ()
modifyConfigStateErr_ f = modifyConfigStateErr ((liftM ((,) ()) .) . f)
modifyConfigState :: (ConfigState -> (a, ConfigState)) -> WConfdMonad a
modifyConfigState f = modifyConfigStateErr ((return .) . const f)
forceConfigStateDistribution :: WConfdMonad ()
forceConfigStateDistribution = do
logDebug "Forcing synchronous config write together with full distribution"
dh <- daemonHandle
liftBase . triggerAndWait (Any True) . dhSaveConfigWorker $ dh
logDebug "Forced config write and distribution finished"
modifyConfigDataErr_
:: (TempResState -> ConfigData -> AtomicModifyMonad ConfigData)
-> WConfdMonad ()
modifyConfigDataErr_ f =
modifyConfigStateErr_ (traverseOf csConfigDataL . f)
modifyTempResStateErr
:: (ConfigData -> StateT TempResState ErrorResult a) -> WConfdMonad a
modifyTempResStateErr f = do
let f' ds = traverseOf2 dsTempResL
(runStateT (f (csConfigData . dsConfigState $ ds))) ds
dh <- daemonHandle
r <- toErrorBase $ atomicModifyIORefErr (dhDaemonState dh)
(liftM swap . f')
logDebug "Triggering temporary reservations write"
liftBase . triggerAndWait_ . dhSaveTempResWorker $ dh
logDebug "Temporary reservations write finished"
return r
modifyTempResState :: (ConfigData -> State TempResState a) -> WConfdMonad a
modifyTempResState f =
modifyTempResStateErr (mapStateT (return . runIdentity) . f)
readTempResState :: WConfdMonad (ConfigData, TempResState)
readTempResState = liftM (csConfigData . dsConfigState &&& dsTempRes)
. readIORef . dhDaemonState
=<< daemonHandle
modifyLockWaiting :: (GanetiLockWaiting -> ( GanetiLockWaiting
, (a, S.Set ClientId) ))
-> WConfdMonad a
modifyLockWaiting f = do
dh <- lift . WConfdMonadInt $ ask
let f' = (id &&& fst) . f
(lockAlloc, (r, nfy)) <- atomicModifyWithLens
(dhDaemonState dh) dsLockWaitingL f'
logDebug $ "Current lock status: " ++ J.encode lockAlloc
logDebug "Triggering lock state write"
liftBase . triggerAndWait_ . dhSaveLocksWorker $ dh
logDebug "Lock write finished"
unless (S.null nfy) $ do
logDebug . (++) "Locks became available for " . show $ S.toList nfy
liftIO . mapM_ (notifyJob . ciPid) $ S.toList nfy
logDebug "Finished notifying processes"
return r
modifyLockWaiting_ :: (GanetiLockWaiting -> (GanetiLockWaiting, S.Set ClientId))
-> WConfdMonad ()
modifyLockWaiting_ = modifyLockWaiting . ((second $ (,) ()) .)
readLockWaiting :: WConfdMonad GanetiLockWaiting
readLockWaiting = liftM dsLockWaiting
. readIORef . dhDaemonState
=<< daemonHandle
readLockAllocation :: WConfdMonad (LockAllocation GanetiLocks ClientId)
readLockAllocation = liftM getAllocation readLockWaiting