module Ganeti.Query.Exec
( isForkSupported
, forkJobProcess
, forkPostHooksProcess
) where
import Prelude ()
import Ganeti.Prelude
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Lifted (threadDelay)
import Control.Exception (finally)
import Control.Monad
import Control.Monad.Error.Class (MonadError(..))
import qualified Data.Map as M
import Data.Maybe (listToMaybe, mapMaybe)
import System.Directory (getDirectoryContents)
import System.Environment
import System.IO.Error (tryIOError, annotateIOError, modifyIOError)
import System.Posix.Process
import System.Posix.IO
import System.Posix.Signals (sigABRT, sigKILL, sigTERM, signalProcess)
import System.Posix.Types (Fd, ProcessID)
import System.Time
import Text.JSON
import Text.Printf
import qualified AutoConf as AC
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import Ganeti.JQueue.Objects
import Ganeti.JSON (MaybeForJSON(..))
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import Ganeti.OpCodes
import qualified Ganeti.Path as P
import Ganeti.Types
import Ganeti.UDSServer
import Ganeti.Utils
import Ganeti.Utils.Monad
import Ganeti.Utils.Random (delayRandom)
isForkSupported :: IO Bool
isForkSupported = return $ not rtsSupportsBoundThreads
connectConfig :: ConnectConfig
connectConfig = ConnectConfig { recvTmo = 30
, sendTmo = 30
}
listOpenFds :: (FromString e) => ResultT e IO [Fd]
listOpenFds = liftM filterReadable
$ liftIO (getDirectoryContents "/proc/self/fd") `orElse`
liftIO (getDirectoryContents "/dev/fd") `orElse`
([] <$ logInfo "Listing open file descriptors isn't\
\ supported by the system,\
\ not cleaning them up!")
where
filterReadable :: (Read a) => [String] -> [a]
filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
runProcess :: JobId
-> Client
-> IO FilePath
-> ((String -> IO ()) -> JobId -> Client -> IO Fd)
-> IO ()
runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
do
closeFd stdError
let logLater _ = return ()
logLater $ "Forking a new process for job " ++ show (fromJobId jid)
preserve_fd <- commFn logLater jid s
logLater "Closing the client"
(clFdR, clFdW) <- clientToFd s
logLater "Reconnecting the file descriptors to stdin/out"
_ <- dupTo clFdR stdInput
_ <- dupTo clFdW stdOutput
logLater "Closing the old file descriptors"
closeFd clFdR
closeFd clFdW
fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase listOpenFds
logLater $ "Closing every superfluous file descriptor: " ++ show fds
mapM_ (tryIOError . closeFd) fds
use_debug <- isDebugMode
env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
. M.insert "PYTHONPATH" AC.versionedsharedir
. M.fromList)
`liftM` getEnvironment
execPy <- pyExecIO
logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
++ " with PYTHONPATH=" ++ AC.versionedsharedir
() <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
(Just $ M.toList env)
failError $ "Failed to execute " ++ AC.pythonPath ++ " " ++ execPy
filterSecretParameters :: [QueuedOpCode] -> [MaybeForJSON (JSObject
(Private JSValue))]
filterSecretParameters =
map (MaybeForJSON . fmap revealValInJSObject
. getSecretParams) . mapMaybe (transformOpCode . qoInput)
where
transformOpCode :: InputOpCode -> Maybe OpCode
transformOpCode inputCode =
case inputCode of
ValidOpCode moc -> Just (metaOpCode moc)
_ -> Nothing
getSecretParams :: OpCode -> Maybe (JSObject (Secret JSValue))
getSecretParams opcode =
case opcode of
(OpInstanceCreate {opOsparamsSecret = x}) -> x
(OpInstanceReinstall {opOsparamsSecret = x}) -> x
(OpTestOsParams {opOsparamsSecret = x}) -> x
_ -> Nothing
forkWithPipe :: ConnectConfig -> (Client -> IO ()) -> IO (ProcessID, Client)
forkWithPipe conf childAction = do
(master, child) <- pipeClient conf
pid <- finally
(forkProcess (closeClient master >> childAction child))
$ closeClient child
return (pid, master)
killProcessOnError :: (FromString e, Show e)
=> ProcessID
-> Client
-> (String -> ResultT e (WriterLogT IO) ())
-> ResultT e (WriterLogT IO) ()
killProcessOnError pid master logFn = do
logFn "Closing the pipe to the client"
withErrorLogAt WARNING "Closing the communication pipe failed"
(liftIO (closeClient master)) `orElse` return ()
killIfAlive [sigTERM, sigABRT, sigKILL]
where killIfAlive [] = return ()
killIfAlive (sig : sigs) = do
logFn "Getting the status of the process"
status <- tryError . liftIO $ getProcessStatus False True pid
case status of
Left e -> logFn $ "Job process already gone: " ++ show e
Right (Just s) -> logFn $ "Child process status: " ++ show s
Right Nothing -> do
logFn $ "Child process running, killing by " ++ show sig
liftIO $ signalProcess sig pid
unless (null sigs) $ do
threadDelay 100000
killIfAlive sigs
data ForkProcessRet = ForkJob (FilePath, ProcessID) |
ForkPostHooks ProcessID
forkProcessCatchErrors :: (Show e, FromString e)
=> (Client -> IO ())
-> (ProcessID -> String -> ResultT e (WriterLogT IO) ())
-> (ProcessID -> Client
-> ResultT e (WriterLogT IO) ForkProcessRet)
-> ResultT e IO ForkProcessRet
forkProcessCatchErrors runFn logFn commFn = do
let execWriterLogInside = ResultT . execWriterLogT . runResultT
retryErrorN C.luxidRetryForkCount
$ \tryNo -> execWriterLogInside $ do
let maxWaitUS = 2^(tryNo 1) * C.luxidRetryForkStepUS
when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
(pid, master) <- liftIO $ forkWithPipe connectConfig runFn
logFn pid "Forked a new process"
flip catchError (\e -> killProcessOnError pid master (logFn pid)
>> throwError e) $ commFn pid master
forkJobProcess :: (FromString e, Show e)
=> QueuedJob
-> FilePath
-> (FilePath -> ResultT e IO ())
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess job luxiLivelock update = do
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
ForkJob ret <- forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
parentMain
return ret
where
secretParams = encodeStrict . filterSecretParameters . qjOps $ job
jidStr = show . fromJobId . qjId $ job
jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
logDebugJob pid = logDebug . (jobLogPrefix pid ++)
parentMain pid master = do
let annotatedIO msg k = do
logDebugJob pid msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"
logDebugJob pid ("Setting the lockfile to the final " ++ lockfile)
toErrorBase $ update lockfile
send "Confirming the client it can start" ""
_ <- recv "Waiting for the job to ask for the job id"
send "Writing job id to the client" jidStr
_ <- recv "Waiting for the job to ask for the lock file name"
send "Writing the lock file name to the client" lockfile
_ <- recv "Waiting for the job to ask for secret parameters"
send "Writing secret parameters to the client" secretParams
liftIO $ closeClient master
return $ ForkJob (lockfile, pid)
childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
where
commFn logFn jid' s' = do
(TOD ts _) <- getClockTime
lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid') ts
_ <- logFn $ "Locking livelock file " ++ show lockfile
fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock"
_ <- logFn "Sending the lockfile name to the master process"
sendMsg s' lockfile
_ <- logFn "Waiting for the master process to confirm the lock"
_ <- recvMsg s'
return fd
forkPostHooksProcess :: (FromString e, Show e)
=> JobId
-> ResultT e IO ProcessID
forkPostHooksProcess jid = do
ForkPostHooks ret <- forkProcessCatchErrors (childMain jid) logDebugJob
parentMain
return ret
where
jidStr = show $ fromJobId jid
jobLogPrefix pid = "[start:post_hooks:job-" ++ jidStr ++ ",pid="
++ show pid ++ "] "
logDebugJob pid = logDebug . (jobLogPrefix pid ++)
parentMain pid master = do
let annotatedIO msg k = do
logDebugJob pid msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
_ <- recv "Waiting for the post hooks executor to ask for the job id"
send "Writing job id to the client" jidStr
liftIO $ closeClient master
return $ ForkPostHooks pid
childMain jid' s = runProcess jid' s P.postHooksExecutorPy commFn
where commFn _ _ _ = return (0 :: Fd)