module Ganeti.Luxi
( LuxiOp(..)
, Client
, getClient
, closeClient
, callMethod
, submitManyJobs
, queryJobsStatus
) where
import Data.IORef
import Control.Monad
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
import Text.JSON.Types
import System.Timeout
import qualified Network.Socket as S
import Ganeti.HTools.Utils
import Ganeti.HTools.Types
import Ganeti.Jobs (JobStatus)
import Ganeti.OpCodes (OpCode)
withTimeout :: Int -> String -> IO a -> IO a
withTimeout secs descr action = do
result <- timeout (secs * 1000000) action
(case result of
Nothing -> fail $ "Timeout in " ++ descr
Just v -> return v)
data LuxiOp = QueryInstances [String] [String] Bool
| QueryNodes [String] [String] Bool
| QueryGroups [String] [String] Bool
| QueryJobs [Int] [String]
| QueryExports [String] Bool
| QueryConfigValues [String]
| QueryClusterInfo
| QueryTags String String
| SubmitJob [OpCode]
| SubmitManyJobs [[OpCode]]
| WaitForJobChange Int [String] JSValue JSValue Int
| ArchiveJob Int
| AutoArchiveJobs Int Int
| CancelJob Int
| SetDrainFlag Bool
| SetWatcherPause Double
deriving (Show, Read)
strOfOp :: LuxiOp -> String
strOfOp QueryNodes {} = "QueryNodes"
strOfOp QueryGroups {} = "QueryGroups"
strOfOp QueryInstances {} = "QueryInstances"
strOfOp QueryJobs {} = "QueryJobs"
strOfOp QueryExports {} = "QueryExports"
strOfOp QueryConfigValues {} = "QueryConfigValues"
strOfOp QueryClusterInfo {} = "QueryClusterInfo"
strOfOp QueryTags {} = "QueryTags"
strOfOp SubmitManyJobs {} = "SubmitManyJobs"
strOfOp WaitForJobChange {} = "WaitForJobChange"
strOfOp SubmitJob {} = "SubmitJob"
strOfOp ArchiveJob {} = "ArchiveJob"
strOfOp AutoArchiveJobs {} = "AutoArchiveJobs"
strOfOp CancelJob {} = "CancelJob"
strOfOp SetDrainFlag {} = "SetDrainFlag"
strOfOp SetWatcherPause {} = "SetWatcherPause"
eOM :: Char
eOM = '\3'
data MsgKeys = Method
| Args
| Success
| Result
strOfKey :: MsgKeys -> String
strOfKey Method = "method"
strOfKey Args = "args"
strOfKey Success = "success"
strOfKey Result = "result"
data Client = Client { socket :: S.Socket
, rbuf :: IORef String
}
getClient :: String -> IO Client
getClient path = do
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
withTimeout connTimeout "creating luxi connection" $
S.connect s (S.SockAddrUnix path)
rf <- newIORef ""
return Client { socket=s, rbuf=rf}
closeClient :: Client -> IO ()
closeClient = S.sClose . socket
sendMsg :: Client -> String -> IO ()
sendMsg s buf =
let _send obuf = do
sbytes <- withTimeout queryTimeout
"sending luxi message" $
S.send (socket s) obuf
unless (sbytes == length obuf) $ _send (drop sbytes obuf)
in _send (buf ++ [eOM])
recvMsg :: Client -> IO String
recvMsg s = do
let _recv obuf = do
nbuf <- withTimeout queryTimeout "reading luxi response" $
S.recv (socket s) 4096
let (msg, remaining) = break (eOM ==) nbuf
(if null remaining
then _recv (obuf ++ msg)
else return (obuf ++ msg, tail remaining))
cbuf <- readIORef $ rbuf s
let (imsg, ibuf) = break (eOM ==) cbuf
(msg, nbuf) <-
(if null ibuf
then _recv cbuf
else return (imsg, tail ibuf))
writeIORef (rbuf s) nbuf
return msg
opToArgs :: LuxiOp -> JSValue
opToArgs (QueryNodes names fields lock) = J.showJSON (names, fields, lock)
opToArgs (QueryGroups names fields lock) = J.showJSON (names, fields, lock)
opToArgs (QueryInstances names fields lock) = J.showJSON (names, fields, lock)
opToArgs (QueryJobs ids fields) = J.showJSON (map show ids, fields)
opToArgs (QueryExports nodes lock) = J.showJSON (nodes, lock)
opToArgs (QueryConfigValues fields) = J.showJSON fields
opToArgs (QueryClusterInfo) = J.showJSON ()
opToArgs (QueryTags kind name) = J.showJSON (kind, name)
opToArgs (SubmitJob j) = J.showJSON j
opToArgs (SubmitManyJobs ops) = J.showJSON ops
opToArgs (WaitForJobChange a b c d e) =
JSArray [ J.showJSON a, J.showJSON b, J.showJSON c
, J.showJSON d, J.showJSON e]
opToArgs (ArchiveJob a) = J.showJSON (show a)
opToArgs (AutoArchiveJobs a b) = J.showJSON (a, b)
opToArgs (CancelJob a) = J.showJSON (show a)
opToArgs (SetDrainFlag flag) = J.showJSON flag
opToArgs (SetWatcherPause duration) = J.showJSON [duration]
buildCall :: LuxiOp
-> String
buildCall lo =
let ja = [ (strOfKey Method, JSString $ toJSString $ strOfOp lo::JSValue)
, (strOfKey Args, opToArgs lo::JSValue)
]
jo = toJSObject ja
in encodeStrict jo
validateResult :: String -> Result JSValue
validateResult s = do
oarr <- fromJResult "Parsing LUXI response"
(decodeStrict s)::Result (JSObject JSValue)
let arr = J.fromJSObject oarr
status <- fromObj arr (strOfKey Success)::Result Bool
let rkey = strOfKey Result
(if status
then fromObj arr rkey
else fromObj arr rkey >>= fail)
callMethod :: LuxiOp -> Client -> IO (Result JSValue)
callMethod method s = do
sendMsg s $ buildCall method
result <- recvMsg s
let rval = validateResult result
return rval
submitManyJobs :: Client -> [[OpCode]] -> IO (Result [String])
submitManyJobs s jobs = do
rval <- callMethod (SubmitManyJobs jobs) s
return $ case rval of
Bad x -> Bad x
Ok (JSArray r) ->
mapM (\v -> case v of
JSArray [JSBool True, JSString x] ->
Ok (fromJSString x)
JSArray [JSBool False, JSString x] ->
Bad (fromJSString x)
_ -> Bad "Unknown result from the master daemon"
) r
x -> Bad ("Cannot parse response from Ganeti: " ++ show x)
queryJobsStatus :: Client -> [String] -> IO (Result [JobStatus])
queryJobsStatus s jids = do
rval <- callMethod (QueryJobs (map read jids) ["status"]) s
return $ case rval of
Bad x -> Bad x
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
J.Ok vals -> if any null vals
then Bad "Missing job status field"
else Ok (map head vals)
J.Error x -> Bad x