module Ganeti.Curl.Multi where
import Control.Concurrent
import Control.Monad
import Data.IORef
import qualified Data.Map as Map
import Foreign.C.String
import Foreign.C.Types
import Foreign.Marshal
import Foreign.Ptr
import Foreign.Storable
import Network.Curl
import Ganeti.Curl.Internal
import Ganeti.Logging
data CurlM_
type CurlMH = Ptr CurlM_
type HandleMap = Map.Map CurlH (IORef CurlCode)
foreign import ccall
"curl_multi_init" curl_multi_init :: IO CurlMH
foreign import ccall
"curl_multi_cleanup" curl_multi_cleanup :: CurlMH -> IO CInt
foreign import ccall
"curl_multi_add_handle" curl_multi_add_handle :: CurlMH -> CurlH -> IO CInt
foreign import ccall
"curl_multi_remove_handle" curl_multi_remove_handle :: CurlMH -> CurlH ->
IO CInt
foreign import ccall
"curl_multi_perform" curl_multi_perform :: CurlMH -> Ptr CInt -> IO CInt
foreign import ccall
"curl_multi_info_read" curl_multi_info_read :: CurlMH -> Ptr CInt
-> IO (Ptr CurlMsg)
curlMultiAddHandle :: CurlMH -> Curl -> IO ()
curlMultiAddHandle multi easy = do
r <- curlPrim easy $ \_ x -> curl_multi_add_handle multi x
when (toMCode r /= CurlmOK) .
fail $ "Failed adding easy handle to multi handle: " ++ show r
curlMultiInfoRead :: CurlMH -> IO (Maybe CurlMsg, CInt)
curlMultiInfoRead multi =
alloca $ \ppending -> do
pmsg <- curl_multi_info_read multi ppending
pending <- peek ppending
msg <- if pmsg == nullPtr
then return Nothing
else Just `fmap` peek pmsg
return (msg, pending)
curlMultiPerform :: CurlMH -> IO (CurlMCode, CInt)
curlMultiPerform multi =
alloca $ \running -> do
mcode <- curl_multi_perform multi running
running' <- peek running
return (toMCode mcode, running')
pollDelayInterval :: Int
pollDelayInterval = 10000
writeHandle :: IORef [String] -> Ptr CChar -> CInt -> CInt -> Ptr () -> IO CInt
writeHandle bufref cstr sz nelems _ = do
let full_sz = sz * nelems
hs_str <- peekCStringLen (cstr, fromIntegral full_sz)
modifyIORef bufref (hs_str:)
return full_sz
readMessages :: CurlMH -> HandleMap -> IO ()
readMessages mh hmap = do
(cmsg, pending) <- curlMultiInfoRead mh
case cmsg of
Nothing -> return ()
Just (CurlMsg msg eh res) -> do
logDebug $ "Got msg! msg " ++ show msg ++ " res " ++ show res ++
", " ++ show pending ++ " messages left"
let cref = (Map.!) hmap eh
writeIORef cref res
_ <- curl_multi_remove_handle mh eh
when (pending > 0) $ readMessages mh hmap
performMulti :: CurlMH -> HandleMap -> CInt -> IO ()
performMulti mh hmap expected = do
(mcode, running) <- curlMultiPerform mh
delay <- case mcode of
CurlmCallMultiPerform -> return $ return ()
CurlmOK -> return $ threadDelay pollDelayInterval
code -> error $ "Received bad return code from" ++
"'curl_multi_perform': " ++ show code
logDebug $ "mcode: " ++ show mcode ++ ", remaining: " ++ show running
when (expected /= running) $ readMessages mh hmap
when (running > 0) $ delay >> performMulti mh hmap running
errorBuffer :: String
errorBuffer = replicate errorBufferSize '\0'
mallocErrorBuffer :: IO CString
mallocErrorBuffer = fst `fmap` newCStringLen errorBuffer
makeEasyHandle :: (IORef [String], Ptr CChar, ([CurlOption], URLString))
-> IO Curl
makeEasyHandle (f, e, (opts, url)) = do
h <- initialize
setopts h opts
setopts h [ CurlWriteFunction (writeHandle f)
, CurlErrorBuffer e
, CurlURL url
, CurlFailOnError True
, CurlNoSignal True
, CurlProxy ""
]
return h
execMultiCall :: [([CurlOption], String)] -> IO [(CurlCode, String)]
execMultiCall ous = do
errorbufs <- mapM (const mallocErrorBuffer) ous
outbufs <- mapM (\_ -> newIORef []) ous
ehandles <- mapM makeEasyHandle $ zip3 outbufs errorbufs ous
hmap <- foldM (\m h -> curlPrim h (\_ hnd -> do
ccode <- newIORef CurlOK
return $ Map.insert hnd ccode m
)) Map.empty ehandles
mh <- curl_multi_init
mapM_ (curlMultiAddHandle mh) ehandles
performMulti mh hmap (fromIntegral $ length ehandles)
mapM_ (\h -> curlPrim h (\_ _ -> return ())) ehandles
mh_cleanup <- toMCode `fmap` curl_multi_cleanup mh
when (mh_cleanup /= CurlmOK) .
logError $ "Non-OK return from multi_cleanup: " ++ show mh_cleanup
mapM (\(e, b, h) -> do
s <- peekCString e
free e
cref <- curlPrim h (\_ hnd -> return $ (Map.!) hmap hnd)
ccode <- readIORef cref
result <- if ccode == CurlOK
then (concat . reverse) `fmap` readIORef b
else return s
return (ccode, result)
) $ zip3 errorbufs outbufs ehandles