module Ganeti.Query.Exec
( forkJobProcess
) where
import Control.Concurrent.Lifted (threadDelay)
import Control.Monad
import Control.Monad.Error
import qualified Data.Map as M
import Data.Maybe (mapMaybe, fromJust)
import System.Environment
import System.IO.Error (annotateIOError, modifyIOError)
import System.IO
import System.Process
import System.Posix.Process
import System.Posix.Signals (sigABRT, sigKILL, sigTERM, signalProcess)
import System.Posix.Types (ProcessID)
import Text.JSON
import qualified AutoConf as AC
import Ganeti.BasicTypes
import Ganeti.JQueue.Objects
import Ganeti.JSON (MaybeForJSON(..))
import Ganeti.Logging
import Ganeti.Logging.WriterLog
import Ganeti.OpCodes
import qualified Ganeti.Path as P
import Ganeti.Types
import Ganeti.UDSServer
import Ganeti.Compat (getPid')
connectConfig :: ConnectConfig
connectConfig = ConnectConfig { recvTmo = 30
, sendTmo = 30
}
rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
spawnJobProcess :: JobId -> IO (ProcessID, Client)
spawnJobProcess jid = withErrorLogAt CRITICAL (show jid) $
do
use_debug <- isDebugMode
env_ <- (M.toList . M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
. M.insert "PYTHONPATH" AC.versionedsharedir
. M.fromList)
`liftM` getEnvironment
execPy <- P.jqueueExecutorPy
logDebug $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
++ " with PYTHONPATH=" ++ AC.versionedsharedir
(master, child) <- pipeClient connectConfig
let (rh, wh) = clientToHandle child
let jobProc = (proc AC.pythonPath [execPy, show (fromJobId jid)]){
std_in = UseHandle rh,
std_out = UseHandle wh,
std_err = Inherit,
env = Just env_,
close_fds = True}
(_, _, _, hchild) <- createProcess jobProc
pid <- getPid' hchild
return (fromJust pid, master)
filterSecretParameters :: [QueuedOpCode] -> [MaybeForJSON (JSObject
(Private JSValue))]
filterSecretParameters =
map (MaybeForJSON . fmap revealValInJSObject
. getSecretParams) . mapMaybe (transformOpCode . qoInput)
where
transformOpCode :: InputOpCode -> Maybe OpCode
transformOpCode inputCode =
case inputCode of
ValidOpCode moc -> Just (metaOpCode moc)
_ -> Nothing
getSecretParams :: OpCode -> Maybe (JSObject (Secret JSValue))
getSecretParams opcode =
case opcode of
(OpInstanceCreate {opOsparamsSecret = x}) -> x
(OpInstanceReinstall {opOsparamsSecret = x}) -> x
(OpTestOsParams {opOsparamsSecret = x}) -> x
_ -> Nothing
forkJobProcess :: (Error e, Show e)
=> QueuedJob
-> FilePath
-> (FilePath -> ResultT e IO ())
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess job luxiLivelock update = do
let jidStr = show . fromJobId . qjId $ job
let secretParams = encodeStrict . filterSecretParameters . qjOps $ job
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
ResultT . execWriterLogT . runResultT $ do
(pid, master) <- liftIO $ spawnJobProcess (qjId job)
let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
logDebugJob = logDebug . (jobLogPrefix ++)
logDebugJob "Forked a new process"
let killIfAlive [] = return ()
killIfAlive (sig : sigs) = do
logDebugJob "Getting the status of the process"
status <- tryError . liftIO $ getProcessStatus False True pid
case status of
Left e -> logDebugJob $ "Job process already gone: " ++ show e
Right (Just s) -> logDebugJob $ "Child process status: " ++ show s
Right Nothing -> do
logDebugJob $ "Child process running, killing by " ++ show sig
liftIO $ signalProcess sig pid
unless (null sigs) $ do
threadDelay 100000
killIfAlive sigs
let onError = do
logDebugJob "Closing the pipe to the client"
withErrorLogAt WARNING "Closing the communication pipe failed"
(liftIO (closeClient master)) `orElse` return ()
killIfAlive [sigTERM, sigABRT, sigKILL]
flip catchError (\e -> onError >> throwError e)
$ do
let annotatedIO msg k = do
logDebugJob msg
liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"
logDebugJob $ "Setting the lockfile to the final " ++ lockfile
toErrorBase $ update lockfile
send "Confirming the client it can start" ""
_ <- recv "Waiting for the job to ask for secret parameters"
send "Writing secret parameters to the client" secretParams
liftIO $ closeClient master
return (lockfile, pid)