module Ganeti.Query.Exec
( isForkSupported
, forkJobProcess
) where
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Lifted (threadDelay)
import Control.Exception (finally)
import Control.Monad
import Control.Monad.Error
import Data.Functor
import qualified Data.Map as M
import Data.Maybe (listToMaybe, mapMaybe)
import System.Directory (getDirectoryContents)
import System.Environment
import System.IO.Error (tryIOError, annotateIOError, modifyIOError)
import System.Posix.Process
import System.Posix.IO
import System.Posix.Signals (sigABRT, sigKILL, sigTERM, signalProcess)
import System.Posix.Types (Fd, ProcessID)
import System.Time
import Text.Printf
import qualified AutoConf as AC
import Ganeti.BasicTypes
import qualified Ganeti.Constants as C
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import qualified Ganeti.Path as P
import Ganeti.Types
import Ganeti.UDSServer
import Ganeti.Utils
import Ganeti.Utils.Monad
import Ganeti.Utils.Random (delayRandom)
isForkSupported :: IO Bool
isForkSupported = return $ not rtsSupportsBoundThreads
connectConfig :: ConnectConfig
connectConfig = ConnectConfig { recvTmo = 30
, sendTmo = 30
}
listOpenFds :: (Error e) => ResultT e IO [Fd]
listOpenFds = liftM filterReadable
$ liftIO (getDirectoryContents "/proc/self/fd") `orElse`
liftIO (getDirectoryContents "/dev/fd") `orElse`
([] <$ logInfo "Listing open file descriptors isn't\
\ supported by the system,\
\ not cleaning them up!")
where
filterReadable :: (Read a) => [String] -> [a]
filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
runJobProcess :: JobId -> Client -> IO ()
runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
do
closeFd stdError
let logLater _ = return ()
logLater $ "Forking a new process for job " ++ show (fromJobId jid)
(TOD ts _) <- getClockTime
lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts
logLater $ "Locking livelock file " ++ show lockfile
fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock file"
logLater "Sending the lockfile name to the master process"
sendMsg s lockfile
logLater "Waiting for the master process to confirm the lock"
_ <- recvMsg s
logLater "Closing the client"
(clFdR, clFdW) <- clientToFd s
logLater "Reconnecting the file descriptors to stdin/out"
_ <- dupTo clFdR stdInput
_ <- dupTo clFdW stdOutput
logLater "Closing the old file descriptors"
closeFd clFdR
closeFd clFdW
fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds
logLater $ "Closing every superfluous file descriptor: " ++ show fds
mapM_ (tryIOError . closeFd) fds
use_debug <- isDebugMode
env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
. M.insert "PYTHONPATH" AC.versionedsharedir
. M.fromList)
`liftM` getEnvironment
execPy <- P.jqueueExecutorPy
logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
++ " with PYTHONPATH=" ++ AC.versionedsharedir
() <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
(Just $ M.toList env)
failError $ "Failed to execute " ++ AC.pythonPath ++ " " ++ execPy
forkWithPipe :: ConnectConfig -> (Client -> IO ()) -> IO (ProcessID, Client)
forkWithPipe conf childAction = do
(master, child) <- pipeClient conf
pid <- finally
(forkProcess (closeClient master >> childAction child))
$ closeClient child
return (pid, master)
forkJobProcess :: (Error e, Show e)
=> JobId
-> FilePath
-> (FilePath -> ResultT e IO ())
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess jid luxiLivelock update = do
let jidStr = show . fromJobId $ jid
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
let execWriterLogInside = ResultT . execWriterLogT . runResultT
retryErrorN C.luxidRetryForkCount
$ \tryNo -> execWriterLogInside $ do
let maxWaitUS = 2^(tryNo 1) * C.luxidRetryForkStepUS
when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
(pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess jid)
let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
logDebugJob = logDebug . (jobLogPrefix ++)
logDebugJob "Forked a new process"
let killIfAlive [] = return ()
killIfAlive (sig : sigs) = do
logDebugJob "Getting the status of the process"
status <- tryError . liftIO $ getProcessStatus False True pid
case status of
Left e -> logDebugJob $ "Job process already gone: " ++ show e
Right (Just s) -> logDebugJob $ "Child process status: " ++ show s
Right Nothing -> do
logDebugJob $ "Child process running, killing by " ++ show sig
liftIO $ signalProcess sig pid
unless (null sigs) $ do
threadDelay 100000
killIfAlive sigs
let onError = do
logDebugJob "Closing the pipe to the client"
withErrorLogAt WARNING "Closing the communication pipe failed"
(liftIO (closeClient master)) `orElse` return ()
killIfAlive [sigTERM, sigABRT, sigKILL]
flip catchError (\e -> onError >> throwError e)
$ do
let annotatedIO msg k = do
logDebugJob msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"
logDebugJob $ "Setting the lockfile to the final " ++ lockfile
toErrorBase $ update lockfile
send "Confirming the client it can start" ""
_ <- recv "Waiting for the job to ask for the job id"
send "Writing job id to the client" jidStr
_ <- recv "Waiting for the job to ask for the lock file name"
send "Writing the lock file name to the client" lockfile
liftIO $ closeClient master
return (lockfile, pid)