{-| Balancing task of the maintenance daemon.

This module carries out the automated balancing done by the
maintenance daemon. The actual balancing algorithm is imported
from htools.

-}
{-

Copyright (C) 2015 Google Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-}

module Ganeti.MaintD.Balance
  ( balanceTask
  ) where

import Control.Arrow ((***), (&&&))
import Control.Exception.Lifted (bracket)
import Control.Monad (liftM, unless, when)
import Control.Monad.IO.Class (liftIO)
import Data.IORef (IORef)
import qualified Data.IntMap as IntMap
import qualified Data.Set as Set
import qualified Data.Map as Map
import Data.Maybe (mapMaybe, isJust)
import qualified Data.Traversable as Traversable
import System.IO.Error (tryIOError)
import Text.Printf (printf)

import Ganeti.BasicTypes ( ResultT, mkResultT, mkResultT'
                         , GenericResult(..), Result)
import Ganeti.Cpu.Types (emptyCPUavgload, CPUavgload(..))
import Ganeti.HTools.AlgorithmParams (AlgorithmOptions(..), defaultOptions)
import qualified Ganeti.HTools.Backend.MonD as MonD
import qualified Ganeti.HTools.Cluster as Cluster
import qualified Ganeti.HTools.Cluster.Metrics as Metrics
import qualified Ganeti.HTools.Cluster.Utils as ClusterUtils
import qualified Ganeti.HTools.Container as Container
import qualified Ganeti.HTools.Instance as Instance
import qualified Ganeti.HTools.Node as Node
import Ganeti.JQueue (currentTimestamp)
import Ganeti.JQueue.Objects (Timestamp)
import Ganeti.Jobs (submitJobs)
import Ganeti.HTools.Types ( zeroUtil, DynUtil(cpuWeight), addUtil, subUtil
                           , MoveJob, iPolicyMemoryRatio)
import Ganeti.Logging.Lifted (logDebug)
import Ganeti.MaintD.MemoryState ( MemoryState, getEvacuated
                                 , addEvacuated, rmEvacuated)
import Ganeti.MaintD.Utils (annotateOpCode)
import qualified Ganeti.Luxi as L
import Ganeti.OpCodes (MetaOpCode)
import qualified Ganeti.Path as Path
import qualified Ganeti.Query.Language as Qlang
import Ganeti.Types (JobId)
import Ganeti.Utils (logAndBad)

-- * Collection of dynamic load data

data AllReports = AllReports { rTotal :: MonD.Report
                             , rIndividual :: MonD.Report
                             , rMem :: MonD.Report
                             }

-- | Empty report. It describes an idle node and can be used as
-- default value for nodes marked as offline.
emptyReports :: AllReports
emptyReports = AllReports (MonD.CPUavgloadReport emptyCPUavgload)
                          (MonD.InstanceCpuReport Map.empty)
                          (MonD.InstanceRSSReport Map.empty)

-- | Query a node unless it is offline and return all
-- CPU reports. For offline nodes return the empty report.
queryNode :: Node.Node -> ResultT String IO AllReports
queryNode node = do
  let getReport dc = mkResultT
                     . liftM (maybe (Bad $ "Failed collecting "
                                           ++ MonD.dName dc
                                           ++ " from " ++ Node.name node) Ok
                              . MonD.mkReport dc)
                     $ MonD.fromCurl dc node
  if Node.offline node
    then return emptyReports
    else do
      total <- getReport MonD.totalCPUCollector
      xeninstances <- getReport MonD.xenCPUCollector
      rssinstances <- getReport MonD.kvmRSSCollector
      return $ AllReports total xeninstances rssinstances

-- | Get a map with the CPU live data for all nodes; for offline nodes
-- the empty report is guessed.
queryLoad :: Node.List -> ResultT String IO (Container.Container AllReports)
queryLoad = Traversable.mapM queryNode

-- | Ask luxid about the hypervisors used. As, at the moment, we only
-- have specialised CPU collectors for xen, we're only interested which
-- instances run under the Xen hypervisor.
getXenInstances :: ResultT String IO (Set.Set String)
getXenInstances = do
  let query = L.Query (Qlang.ItemTypeOpCode Qlang.QRInstance)
              ["name", "hypervisor"] Qlang.EmptyFilter
  luxiSocket <- liftIO Path.defaultQuerySocket
  raw <- bracket (mkResultT . liftM (either (Bad . show) Ok)
                   . tryIOError $ L.getLuxiClient luxiSocket)
                 (liftIO . L.closeClient)
                 $ mkResultT' . L.callMethod query
  answer <- L.extractArray raw >>= mapM (mapM L.fromJValWithStatus)
  let getXen [name, hv] | hv `elem` ["xen-pvm", "xen-hvm"] = [name]
      getXen _ = []
  return $ Set.fromList (answer >>= getXen)

-- | Look for an instance in a given report.
findInstanceLoad :: String -> AllReports -> Maybe Double
findInstanceLoad  name r | MonD.InstanceCpuReport m <- rIndividual r =
  Map.lookup name m
findInstanceLoad _ _ = Nothing

-- | Update the CPU load of one instance based on the reports.
-- Fail if instance CPU load is not (yet) available. However, do
-- accpet missing load data for instances on offline nodes, as well
-- as old load data for recently migrated instances.
updateCPUInstance :: Node.List
                  -> Container.Container AllReports
                  -> Set.Set String
                  -> [String]
                  -> Instance.Instance
                  -> Result Instance.Instance
updateCPUInstance nl reports xeninsts evacuated inst =
  let name = Instance.name inst
      nidx = Instance.pNode inst
  in if name `Set.member` xeninsts
    then let onNodeLoad = findInstanceLoad name (Container.find nidx reports)
             allLoads = mapMaybe (findInstanceLoad name)
                          $ Container.elems reports
         in case () of
           _ | Just load <- onNodeLoad ->
                 return $ inst { Instance.util = zeroUtil { cpuWeight = load } }
           _ | (load:_) <- allLoads ->
                 return $ inst { Instance.util = zeroUtil { cpuWeight = load } }
           _ | Node.offline $ Container.find nidx nl ->
                 return $ inst { Instance.util = zeroUtil }
           _ | Instance.name inst `elem` evacuated ->
                 return $ inst { Instance.util = zeroUtil }
           _ -> fail $ "Xen CPU data unavailable for " ++ name
    else let rep = rTotal $ Container.find nidx reports
         in case rep of MonD.CPUavgloadReport (CPUavgload _ _ ndload) ->
                          let w = ndload * fromIntegral (Instance.vcpus inst)
                                  / (fromIntegral . Node.uCpu
                                       $ Container.find nidx nl)
                          in return $ inst { Instance.util =
                                                zeroUtil { cpuWeight = w }}
                        _ -> fail $ "CPU data unavailable for node of " ++ name

-- | Update CPU usage data based on the collected reports. That is, get the
-- CPU usage of all instances from the reports and also update the nodes
-- accordingly.
updateCPULoad :: (Node.List, Instance.List)
              -> Container.Container AllReports
              -> Set.Set String
              -> [ String ]
              -> Result (Node.List, Instance.List)
updateCPULoad (nl, il) reports xeninsts evacuated = do
  il' <- Traversable.mapM (updateCPUInstance nl reports xeninsts evacuated) il
  let addNodeUtil n delta = n { Node.utilLoad = addUtil (Node.utilLoad n) delta
                              , Node.utilLoadForth =
                                  addUtil (Node.utilLoadForth n) delta
                              }
  let updateNodeUtil nnl inst_old inst_new =
        let delta = subUtil (Instance.util inst_new) $ Instance.util inst_old
            nidx = Instance.pNode inst_old
            n = Container.find nidx nnl
            n' = addNodeUtil n delta
        in Container.add nidx n' nnl
  let nl' = foldl (\nnl i -> updateNodeUtil nnl (Container.find i il)
                               $ Container.find i il') nl $ Container.keys il
  return (nl', il')

-- | For an instance, given by name, verify if an individual load report is
-- available again.
cleanUpEvacuation :: IORef MemoryState
                  -> Instance.List
                  -> Container.Container AllReports
                  -> String
                  -> IO ()
cleanUpEvacuation memstate il reports name = do
  let insts = filter ((==) name . Instance.name) $ Container.elems il
  case insts of
    [] -> do
            logDebug $ "Instnace " ++ name ++ "no longer on the cluster"
            rmEvacuated memstate name
    inst:_ -> do
                 let nidx = Instance.pNode inst
                 when (isJust . findInstanceLoad name
                         $ Container.find nidx reports) $ do
                   logDebug $ "Load data for " ++ name ++ " available again"
                   rmEvacuated memstate name

-- * Balancing

-- | Transform an instance move into a submittable job.
moveToJob :: Timestamp -> (Node.List, Instance.List) -> MoveJob -> [MetaOpCode]
moveToJob now (nl, il) (_, idx, move, _) =
  let opCodes = Cluster.iMoveToJob nl il idx move
  in map (annotateOpCode "auto-balancing the cluster" now) opCodes

-- | Iteratively improve a cluster by iterating over tryBalance.
iterateBalance :: AlgorithmOptions
               -> Cluster.Table -- ^ the starting table
               -> [MoveJob]     -- ^ current command list
               -> [MoveJob]     -- ^ resulting commands
iterateBalance opts ini_tbl cmds =
  let Cluster.Table ini_nl ini_il _ _ = ini_tbl
      m_next_tbl = Cluster.tryBalance opts ini_tbl
  in case m_next_tbl of
    Just next_tbl@(Cluster.Table _ _ _ plc@(curplc:_)) ->
      let (idx, _, _, move, _) = curplc
          plc_len = length plc
          (_, cs) = Cluster.printSolutionLine ini_nl ini_il 1 1 curplc plc_len
          afn = Cluster.involvedNodes ini_il curplc
          cmds' = (afn, idx, move, cs):cmds
      in iterateBalance opts next_tbl cmds'
    _ -> cmds

-- | List instances evacuated in a move job, if any.
evacuatedInsts :: (Node.List, Instance.List)
               -> MoveJob
               -> [String]
evacuatedInsts (nl, il) (_, idx, _, _) =
  let inst = Container.find idx il
      node = Container.find (Instance.pNode inst) nl
  in [Instance.name inst | Node.offline node]

-- | Balance a single group, restricted to the allowed nodes and
-- minimal gain.
balanceGroup :: IORef MemoryState
             -> Set.Set String
             -> L.Client
             -> Set.Set Int
             -> Double
             -> (Int,  (Node.List, Instance.List))
             -> ResultT String IO [JobId]
balanceGroup memstate xens client allowedNodes threshold (gidx, (nl, il)) = do
  logDebug $ printf "Balancing group %d, %d nodes, %d instances." gidx
               (Container.size nl) (Container.size il)
  let ini_cv = Metrics.compCV nl
      ini_tbl = Cluster.Table nl il ini_cv []
      opts = defaultOptions { algAllowedNodes = Just allowedNodes
                            , algMinGain = threshold
                            , algMinGainLimit = 10 * threshold
                            }
      cmds = iterateBalance opts ini_tbl []
      tasks = take 1 $ Cluster.splitJobs cmds
  logDebug $ "First task group: " ++ show tasks
  now <- liftIO currentTimestamp
  let jobs = tasks >>= map (moveToJob now (nl, il))
      evacs = filter (`Set.member` xens)
              (concat tasks >>= evacuatedInsts (nl, il))
  if null jobs
    then return []
    else do
      unless (null evacs) $ do
        logDebug $ "Evacuation of instances " ++ show evacs
        liftIO $ addEvacuated memstate evacs
      jids <- liftIO $ submitJobs jobs client
      case jids of
        Bad e -> mkResultT . logAndBad
                   $ "Failure submitting balancing jobs: " ++ e
        Ok jids' -> return jids'

-- * Memory balancing

-- | Decide the weight that dynamic memory utilization should have
-- based on the memory-over-commitment ratio. This function is likely
-- to change once more experience with memory over-commited clusters
-- is gained.
weightFromMemRatio :: Double -> Double
weightFromMemRatio f = 0.0 `max` (f - 1) * 5.0

-- | Apply the memory data to the cluster data.
useMemData :: Double
           -> Container.Container AllReports
           -> (Node.List, Instance.List)
           -> ResultT String IO (Node.List, Instance.List)
useMemData ratio allreports (nl, il) = do
  logDebug "Taking dynamic memory data into account"
  let memoryReports =
        map (flip Container.find nl *** rMem) $ IntMap.toList allreports
  mkResultT . return . liftM (MonD.scaleMemoryWeight (weightFromMemRatio ratio))
    $ MonD.useInstanceRSSData memoryReports (nl, il)

-- * Interface function

-- | Carry out all the needed balancing, based on live CPU data, only touching
-- the available nodes. Only carry out balancing steps where the gain is above
-- the threshold.
balanceTask :: IORef MemoryState
            -> (Node.List, Instance.List) -- ^ current cluster configuration
            -> Set.Set Int -- ^ node indices on which actions may be taken
            -> Double -- ^ threshold for improvement
            -> ResultT String IO [JobId] -- ^ jobs submitted
balanceTask memstate (nl, il) okNodes threshold = do
  logDebug "Collecting dynamic load values"
  evacuated <- getEvacuated memstate
  logDebug $ "Not expecting load data from: " ++ show evacuated
  reports <- queryLoad nl
  xenInstances <- getXenInstances
  (nl', il') <- mkResultT . return
                  $ updateCPULoad (nl, il) reports xenInstances evacuated
  liftIO $ mapM_ (cleanUpEvacuation memstate il reports) evacuated
  let memoryOvercommitment =
        maximum . (0.0:) . map (iPolicyMemoryRatio .Node.iPolicy)
        $ IntMap.elems nl
  logDebug $ "Memory over-commitment ratio is " ++ show memoryOvercommitment
  (nl'', il'') <- if memoryOvercommitment > 1.0
                    then useMemData memoryOvercommitment reports (nl', il')
                    else return (nl', il')
  logDebug . (++) "Dynamic node load: " . show
    . map (Node.name &&& Node.utilLoad) $ Container.elems nl''
  let ngroups = ClusterUtils.splitCluster nl'' il''
  luxiSocket <- liftIO Path.defaultQuerySocket
  bracket (liftIO $ L.getLuxiClient luxiSocket) (liftIO . L.closeClient) $ \c ->
    liftM concat $ mapM (balanceGroup memstate xenInstances c okNodes threshold)
                        ngroups