module Ganeti.Query.Exec
( isForkSupported
, forkJobProcess
) where
import Prelude ()
import Ganeti.Prelude
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Lifted (threadDelay)
import Control.Exception (finally)
import Control.Monad
import Control.Monad.Error.Class (MonadError(..))
import qualified Data.Map as M
import Data.Maybe (listToMaybe, mapMaybe)
import System.Directory (getDirectoryContents)
import System.Environment
import System.IO.Error (tryIOError, annotateIOError, modifyIOError)
import System.Posix.Process
import System.Posix.IO
import System.Posix.Signals (sigABRT, sigKILL, sigTERM, signalProcess)
import System.Posix.Types (Fd, ProcessID)
import System.Time
import Text.JSON
import Text.Printf
import qualified AutoConf as AC
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import Ganeti.JQueue.Objects
import Ganeti.JSON (MaybeForJSON(..))
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import Ganeti.OpCodes
import qualified Ganeti.Path as P
import Ganeti.Types
import Ganeti.UDSServer
import Ganeti.Utils
import Ganeti.Utils.Monad
import Ganeti.Utils.Random (delayRandom)
isForkSupported :: IO Bool
isForkSupported = return $ not rtsSupportsBoundThreads
connectConfig :: ConnectConfig
connectConfig = ConnectConfig { recvTmo = 30
, sendTmo = 30
}
listOpenFds :: (FromString e) => ResultT e IO [Fd]
listOpenFds = liftM filterReadable
$ liftIO (getDirectoryContents "/proc/self/fd") `orElse`
liftIO (getDirectoryContents "/dev/fd") `orElse`
([] <$ logInfo "Listing open file descriptors isn't\
\ supported by the system,\
\ not cleaning them up!")
where
filterReadable :: (Read a) => [String] -> [a]
filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
runJobProcess :: JobId -> Client -> IO ()
runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
do
closeFd stdError
let logLater _ = return ()
logLater $ "Forking a new process for job " ++ show (fromJobId jid)
(TOD ts _) <- getClockTime
lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts
logLater $ "Locking livelock file " ++ show lockfile
fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock file"
logLater "Sending the lockfile name to the master process"
sendMsg s lockfile
logLater "Waiting for the master process to confirm the lock"
_ <- recvMsg s
logLater "Closing the client"
(clFdR, clFdW) <- clientToFd s
logLater "Reconnecting the file descriptors to stdin/out"
_ <- dupTo clFdR stdInput
_ <- dupTo clFdW stdOutput
logLater "Closing the old file descriptors"
closeFd clFdR
closeFd clFdW
fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds
logLater $ "Closing every superfluous file descriptor: " ++ show fds
mapM_ (tryIOError . closeFd) fds
use_debug <- isDebugMode
env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
. M.insert "PYTHONPATH" AC.versionedsharedir
. M.fromList)
`liftM` getEnvironment
execPy <- P.jqueueExecutorPy
logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
++ " with PYTHONPATH=" ++ AC.versionedsharedir
() <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
(Just $ M.toList env)
failError $ "Failed to execute " ++ AC.pythonPath ++ " " ++ execPy
filterSecretParameters :: [QueuedOpCode] -> [MaybeForJSON (JSObject
(Private JSValue))]
filterSecretParameters =
map (MaybeForJSON . fmap revealValInJSObject
. getSecretParams) . mapMaybe (transformOpCode . qoInput)
where
transformOpCode :: InputOpCode -> Maybe OpCode
transformOpCode inputCode =
case inputCode of
ValidOpCode moc -> Just (metaOpCode moc)
_ -> Nothing
getSecretParams :: OpCode -> Maybe (JSObject (Secret JSValue))
getSecretParams opcode =
case opcode of
(OpInstanceCreate {opOsparamsSecret = x}) -> x
(OpInstanceReinstall {opOsparamsSecret = x}) -> x
(OpTestOsParams {opOsparamsSecret = x}) -> x
_ -> Nothing
forkWithPipe :: ConnectConfig -> (Client -> IO ()) -> IO (ProcessID, Client)
forkWithPipe conf childAction = do
(master, child) <- pipeClient conf
pid <- finally
(forkProcess (closeClient master >> childAction child))
$ closeClient child
return (pid, master)
forkJobProcess :: (FromString e, Show e)
=> QueuedJob
-> FilePath
-> (FilePath -> ResultT e IO ())
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess job luxiLivelock update = do
let jidStr = show . fromJobId . qjId $ job
let secretParams = encodeStrict . filterSecretParameters . qjOps $ job
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
let execWriterLogInside = ResultT . execWriterLogT . runResultT
retryErrorN C.luxidRetryForkCount
$ \tryNo -> execWriterLogInside $ do
let maxWaitUS = 2^(tryNo 1) * C.luxidRetryForkStepUS
when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
(pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess
. qjId $ job)
let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
logDebugJob = logDebug . (jobLogPrefix ++)
logDebugJob "Forked a new process"
let killIfAlive [] = return ()
killIfAlive (sig : sigs) = do
logDebugJob "Getting the status of the process"
status <- tryError . liftIO $ getProcessStatus False True pid
case status of
Left e -> logDebugJob $ "Job process already gone: " ++ show e
Right (Just s) -> logDebugJob $ "Child process status: " ++ show s
Right Nothing -> do
logDebugJob $ "Child process running, killing by " ++ show sig
liftIO $ signalProcess sig pid
unless (null sigs) $ do
threadDelay 100000
killIfAlive sigs
let onError = do
logDebugJob "Closing the pipe to the client"
withErrorLogAt WARNING "Closing the communication pipe failed"
(liftIO (closeClient master)) `orElse` return ()
killIfAlive [sigTERM, sigABRT, sigKILL]
flip catchError (\e -> onError >> throwError e)
$ do
let annotatedIO msg k = do
logDebugJob msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"
logDebugJob $ "Setting the lockfile to the final " ++ lockfile
toErrorBase $ update lockfile
send "Confirming the client it can start" ""
_ <- recv "Waiting for the job to ask for the job id"
send "Writing job id to the client" jidStr
_ <- recv "Waiting for the job to ask for the lock file name"
send "Writing the lock file name to the client" lockfile
_ <- recv "Waiting for the job to ask for secret parameters"
send "Writing secret parameters to the client" secretParams
liftIO $ closeClient master
return (lockfile, pid)