module Ganeti.WConfd.Monad
( DaemonHandle
, dhConfigPath
, dhLivelock
, mkDaemonHandle
, WConfdMonadInt
, runWConfdMonadInt
, WConfdMonad
, daemonHandle
, modifyConfigState
, modifyConfigStateWithImmediate
, forceConfigStateDistribution
, readConfigState
, modifyConfigDataErr_
, modifyConfigAndReturnWithLock
, modifyConfigWithLock
, modifyLockWaiting
, modifyLockWaiting_
, readLockWaiting
, readLockAllocation
, modifyTempResState
, modifyTempResStateErr
, readTempResState
, DistributionTarget(..)
) where
import Control.Applicative
import Control.Arrow ((&&&), second)
import Control.Concurrent (forkIO, myThreadId)
import Control.Exception.Lifted (bracket)
import Control.Monad
import Control.Monad.Base
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.State
import Control.Monad.Trans.Control
import Data.Functor.Identity
import Data.IORef.Lifted
import Data.Monoid (Any(..), Monoid(..))
import qualified Data.Set as S
import Data.Tuple (swap)
import System.Posix.Process (getProcessID)
import System.Time (getClockTime, ClockTime)
import qualified Text.JSON as J
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.JQueue (notifyJob)
import Ganeti.Lens
import qualified Ganeti.Locking.Allocation as LA
import Ganeti.Locking.Locks
import qualified Ganeti.Locking.Waiting as LW
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import Ganeti.Objects (ConfigData)
import Ganeti.Utils.AsyncWorker
import Ganeti.Utils.IORef
import Ganeti.Utils.Livelock (Livelock)
import Ganeti.WConfd.ConfigState
import Ganeti.WConfd.TempRes
data DistributionTarget = Everywhere | ToGroups (S.Set String) deriving Show
instance Monoid DistributionTarget where
mempty = ToGroups S.empty
mappend Everywhere _ = Everywhere
mappend _ Everywhere = Everywhere
mappend (ToGroups a) (ToGroups b) = ToGroups (a `S.union` b)
data DaemonState = DaemonState
{ dsConfigState :: ConfigState
, dsLockWaiting :: GanetiLockWaiting
, dsTempRes :: TempResState
}
$(makeCustomLenses ''DaemonState)
data DaemonHandle = DaemonHandle
{ dhDaemonState :: IORef DaemonState
, dhConfigPath :: FilePath
, dhSaveConfigWorker :: AsyncWorker (Any, DistributionTarget) ()
, dhSaveLocksWorker :: AsyncWorker () ()
, dhSaveTempResWorker :: AsyncWorker () ()
, dhLivelock :: Livelock
}
mkDaemonHandle :: FilePath
-> ConfigState
-> GanetiLockWaiting
-> TempResState
-> (IO ConfigState
-> [AsyncWorker DistributionTarget ()]
-> ResultG (AsyncWorker (Any, DistributionTarget) ()))
-> (IO ConfigState
-> ResultG (AsyncWorker DistributionTarget ()))
-> (IO ConfigState
-> ResultG (AsyncWorker DistributionTarget ()))
-> (IO GanetiLockWaiting -> ResultG (AsyncWorker () ()))
-> (IO TempResState -> ResultG (AsyncWorker () ()))
-> Livelock
-> ResultG DaemonHandle
mkDaemonHandle cpath cstat lstat trstat
saveWorkerFn distMCsWorkerFn distSSConfWorkerFn
saveLockWorkerFn saveTempResWorkerFn livelock = do
ds <- newIORef $ DaemonState cstat lstat trstat
let readConfigIO = dsConfigState `liftM` readIORef ds :: IO ConfigState
ssconfWorker <- distSSConfWorkerFn readConfigIO
distMCsWorker <- distMCsWorkerFn readConfigIO
saveWorker <- saveWorkerFn readConfigIO [ distMCsWorker
, ssconfWorker ]
saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds
saveTempResWorker <- saveTempResWorkerFn $ dsTempRes `liftM` readIORef ds
return $ DaemonHandle ds cpath saveWorker saveLockWorker saveTempResWorker
livelock
type WConfdMonadIntType = ReaderT DaemonHandle IO
newtype WConfdMonadInt a = WConfdMonadInt
{ getWConfdMonadInt :: WConfdMonadIntType a }
deriving (Functor, Applicative, Monad, MonadIO, MonadBase IO, MonadLog)
instance MonadBaseControl IO WConfdMonadInt where
#if MIN_VERSION_monad_control(1,0,0)
type StM WConfdMonadInt b = StM WConfdMonadIntType b
liftBaseWith f = WConfdMonadInt . liftBaseWith
$ \r -> f (r . getWConfdMonadInt)
restoreM = WConfdMonadInt . restoreM
#else
newtype StM WConfdMonadInt b = StMWConfdMonadInt
{ runStMWConfdMonadInt :: StM WConfdMonadIntType b }
liftBaseWith f = WConfdMonadInt . liftBaseWith
$ \r -> f (liftM StMWConfdMonadInt . r . getWConfdMonadInt)
restoreM = WConfdMonadInt . restoreM . runStMWConfdMonadInt
#endif
runWConfdMonadInt :: WConfdMonadInt a -> DaemonHandle -> IO a
runWConfdMonadInt (WConfdMonadInt k) = runReaderT k
type WConfdMonad = ResultT GanetiException WConfdMonadInt
type AtomicModifyMonad a = ResultT GanetiException WriterLog a
daemonHandle :: WConfdMonad DaemonHandle
daemonHandle = lift . WConfdMonadInt $ ask
readConfigState :: WConfdMonad ConfigState
readConfigState = liftM dsConfigState . readIORef . dhDaemonState
=<< daemonHandle
unpackConfigResult :: ClockTime -> ConfigState
-> (a, ConfigState) -> ((a, Bool, Bool), ConfigState)
unpackConfigResult now cs (r, cs')
| cs /= cs' = ( (r, True, needsFullDist cs cs')
, over csConfigDataL (bumpSerial now) cs'
)
| otherwise = ((r, False, False), cs')
modifyConfigStateErrWithImmediate
:: (TempResState -> ConfigState -> AtomicModifyMonad (a, ConfigState))
-> WConfdMonad ()
-> WConfdMonad a
modifyConfigStateErrWithImmediate f immediateFollowup = do
dh <- daemonHandle
now <- liftIO getClockTime
let modCS ds@(DaemonState { dsTempRes = tr }) =
mapMOf2
dsConfigStateL (\cs -> liftM (unpackConfigResult now cs) (f tr cs)) ds
(r, modified, distSync) <- atomicModifyIORefErrLog (dhDaemonState dh)
(liftM swap . modCS)
if modified
then if distSync
then do
logDebug $ "Triggering config write" ++
" together with full synchronous distribution"
res <- liftBase . triggerWithResult (Any True, Everywhere)
$ dhSaveConfigWorker dh
immediateFollowup
wait res
logDebug "Config write and distribution finished"
else do
logDebug $ "Triggering config write" ++
" and asynchronous distribution"
res <- liftBase . triggerWithResult (Any False, Everywhere)
$ dhSaveConfigWorker dh
immediateFollowup
wait res
logDebug "Config writer finished with local task"
else
immediateFollowup
return r
modifyConfigStateErr
:: (TempResState -> ConfigState -> AtomicModifyMonad (a, ConfigState))
-> WConfdMonad a
modifyConfigStateErr = flip modifyConfigStateErrWithImmediate (return ())
modifyConfigStateErr_
:: (TempResState -> ConfigState -> AtomicModifyMonad ConfigState)
-> WConfdMonad ()
modifyConfigStateErr_ f = modifyConfigStateErr ((liftM ((,) ()) .) . f)
modifyConfigState :: (ConfigState -> (a, ConfigState)) -> WConfdMonad a
modifyConfigState f = modifyConfigStateErr ((return .) . const f)
modifyConfigStateWithImmediate :: (ConfigState -> (a, ConfigState))
-> WConfdMonad ()
-> WConfdMonad a
modifyConfigStateWithImmediate f =
modifyConfigStateErrWithImmediate ((return .) . const f)
forceConfigStateDistribution :: DistributionTarget -> WConfdMonad ()
forceConfigStateDistribution target = do
logDebug "Forcing synchronous config write together with full distribution"
dh <- daemonHandle
liftBase . triggerAndWait (Any True, target) . dhSaveConfigWorker $ dh
logDebug "Forced config write and distribution finished"
modifyConfigDataErr_
:: (TempResState -> ConfigData -> AtomicModifyMonad ConfigData)
-> WConfdMonad ()
modifyConfigDataErr_ f =
modifyConfigStateErr_ (traverseOf csConfigDataL . f)
modifyTempResStateErr
:: (ConfigData -> StateT TempResState ErrorResult a) -> WConfdMonad a
modifyTempResStateErr f = do
let f' ds = traverseOf2 dsTempResL
(runStateT (f (csConfigData . dsConfigState $ ds))) ds
dh <- daemonHandle
r <- toErrorBase $ atomicModifyIORefErr (dhDaemonState dh)
(liftM swap . f')
logDebug "Triggering temporary reservations write"
liftBase . triggerAndWait_ . dhSaveTempResWorker $ dh
logDebug "Temporary reservations write finished"
return r
modifyTempResState :: (ConfigData -> State TempResState a) -> WConfdMonad a
modifyTempResState f =
modifyTempResStateErr (mapStateT (return . runIdentity) . f)
readTempResState :: WConfdMonad (ConfigData, TempResState)
readTempResState = liftM (csConfigData . dsConfigState &&& dsTempRes)
. readIORef . dhDaemonState
=<< daemonHandle
modifyLockWaiting :: (GanetiLockWaiting -> ( GanetiLockWaiting
, (a, S.Set ClientId) ))
-> WConfdMonad a
modifyLockWaiting f = do
dh <- lift . WConfdMonadInt $ ask
let f' = (id &&& fst) . f
(lockAlloc, (r, nfy)) <- atomicModifyWithLens
(dhDaemonState dh) dsLockWaitingL f'
logDebug $ "Current lock status: " ++ J.encode lockAlloc
logDebug "Triggering lock state write"
liftBase . triggerAndWait_ . dhSaveLocksWorker $ dh
logDebug "Lock write finished"
unless (S.null nfy) $ do
logDebug . (++) "Locks became available for " . show $ S.toList nfy
liftIO . mapM_ (notifyJob . ciPid) $ S.toList nfy
logDebug "Finished notifying processes"
return r
modifyLockWaiting_ :: (GanetiLockWaiting -> (GanetiLockWaiting, S.Set ClientId))
-> WConfdMonad ()
modifyLockWaiting_ = modifyLockWaiting . ((second $ (,) ()) .)
readLockWaiting :: WConfdMonad GanetiLockWaiting
readLockWaiting = liftM dsLockWaiting
. readIORef . dhDaemonState
=<< daemonHandle
readLockAllocation :: WConfdMonad (LA.LockAllocation GanetiLocks ClientId)
readLockAllocation = liftM LW.getAllocation readLockWaiting
modifyConfigAndReturnWithLock
:: (TempResState -> ConfigState -> AtomicModifyMonad (a, ConfigState))
-> State TempResState ()
-> WConfdMonad (Maybe a)
modifyConfigAndReturnWithLock f tempres = do
now <- liftIO getClockTime
dh <- lift . WConfdMonadInt $ ask
pid <- liftIO getProcessID
tid <- liftIO myThreadId
let cid = ClientId { ciIdentifier = ClientOther $ "wconfd-" ++ show tid
, ciLockFile = dhLivelock dh
, ciPid = pid
}
let modCS ds@(DaemonState { dsTempRes = tr }) =
mapMOf2
dsConfigStateL
(\cs -> liftM (unpackConfigResult now cs) (f tr cs))
ds
maybeDist <- bracket
(atomicModifyWithLens (dhDaemonState dh) dsLockWaitingL
$ swap . LW.updateLocks cid [LA.requestExclusive ConfigLock])
(\(res, _) -> case res of
Ok s | S.null s -> do
(_, nfy) <- atomicModifyWithLens (dhDaemonState dh) dsLockWaitingL
$ swap . LW.updateLocks cid [LA.requestRelease ConfigLock]
unless (S.null nfy) . liftIO . void . forkIO $ do
logDebug . (++) "Locks became available for " . show $ S.toList nfy
mapM_ (notifyJob . ciPid) $ S.toList nfy
logDebug "Finished notifying processes"
_ -> return ())
(\(res, _) -> case res of
Ok s | S.null s ->do
ret <- atomicModifyIORefErrLog (dhDaemonState dh)
(liftM swap . modCS)
atomicModifyWithLens (dhDaemonState dh) dsTempResL $ runState tempres
return $ Just ret
_ -> return Nothing)
flip (maybe $ return Nothing) maybeDist $ \(val, modified, dist) -> do
when modified $ do
logDebug . (++) "Triggering config write; distribution "
$ if dist then "synchronously" else "asynchronously"
liftBase . triggerAndWait (Any dist, Everywhere) $ dhSaveConfigWorker dh
logDebug "Config write finished"
logDebug "Triggering temporary reservations write"
liftBase . triggerAndWait_ . dhSaveTempResWorker $ dh
logDebug "Temporary reservations write finished"
return $ Just val
modifyConfigWithLock
:: (TempResState -> ConfigState -> AtomicModifyMonad ConfigState)
-> State TempResState ()
-> WConfdMonad (Maybe ())
modifyConfigWithLock f = modifyConfigAndReturnWithLock f'
where f' tr cs = fmap ((,) ()) (f tr cs)