1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34 import itertools
35
36 from ganeti import opcodes
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
50 """Exception to report timeouts on acquiring locks.
51
52 """
53
78
81 """Class with lock acquire timeout strategy.
82
83 """
84 __slots__ = [
85 "_timeouts",
86 "_random_fn",
87 "_time_fn",
88 ]
89
90 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
93 """Initializes this class.
94
95 @param _time_fn: Time function for unittests
96 @param _random_fn: Random number generator for unittests
97
98 """
99 object.__init__(self)
100
101 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102 self._time_fn = _time_fn
103 self._random_fn = _random_fn
104
106 """Returns the timeout for the next attempt.
107
108 """
109 try:
110 timeout = self._timeouts.next()
111 except StopIteration:
112
113 timeout = None
114
115 if timeout is not None:
116
117
118 variation_range = timeout * 0.1
119 timeout += ((self._random_fn() * variation_range) -
120 (variation_range * 0.5))
121
122 return timeout
123
126 """Base class for OpCode execution callbacks.
127
128 """
130 """Called when we are about to execute the LU.
131
132 This function is called when we're about to start the lu's Exec() method,
133 that is, after we have acquired all locks.
134
135 """
136
138 """Sends feedback from the LU code to the end-user.
139
140 """
141
143 """Check whether job has been cancelled.
144
145 """
146
148 """Submits jobs for processing.
149
150 See L{jqueue.JobQueue.SubmitManyJobs}.
151
152 """
153 raise NotImplementedError
154
157 """Computes the LU name for a given OpCode name.
158
159 """
160 assert opname.startswith(_OP_PREFIX), \
161 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163 return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
173
176 """Copies basic opcode parameters.
177
178 @type src: L{opcodes.OpCode}
179 @param src: Source opcode
180 @type defcomment: string
181 @param defcomment: Comment to specify if not already given
182 @type dst: L{opcodes.OpCode}
183 @param dst: Destination opcode
184
185 """
186 if hasattr(src, "debug_level"):
187 dst.debug_level = src.debug_level
188
189 if (getattr(dst, "priority", None) is None and
190 hasattr(src, "priority")):
191 dst.priority = src.priority
192
193 if not getattr(dst, opcodes.COMMENT_ATTR, None):
194 dst.comment = defcomment
195
198 """Examines opcode result.
199
200 If necessary, additional processing on the result is done.
201
202 """
203 if isinstance(result, cmdlib.ResultWithJobs):
204
205 map(compat.partial(_SetBaseOpParams, op,
206 "Submitted by %s" % op.OP_ID),
207 itertools.chain(*result.jobs))
208
209
210 job_submission = submit_fn(result.jobs)
211
212
213 result = result.other
214
215 assert constants.JOB_IDS_KEY not in result, \
216 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
217
218 result[constants.JOB_IDS_KEY] = job_submission
219
220 return result
221
224 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
225
226 """
227 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
228 " queries) can not submit jobs")
229
232 """Function to convert RPC results to the format expected by HooksMaster.
233
234 @type rpc_results: dict(node: L{rpc.RpcResult})
235 @param rpc_results: RPC results
236 @rtype: dict(node: (fail_msg, offline, hooks_results))
237 @return: RPC results unpacked according to the format expected by
238 L({mcpu.HooksMaster}
239
240 """
241 return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
242 for (node, rpc_res) in rpc_results.items())
243
246 """Object which runs OpCodes"""
247 DISPATCH_TABLE = _ComputeDispatchTable()
248
249 - def __init__(self, context, ec_id, enable_locks=True):
250 """Constructor for Processor
251
252 @type context: GanetiContext
253 @param context: global Ganeti context
254 @type ec_id: string
255 @param ec_id: execution context identifier
256
257 """
258 self.context = context
259 self._ec_id = ec_id
260 self._cbs = None
261 self.rpc = context.rpc
262 self.hmclass = HooksMaster
263 self._enable_locks = enable_locks
264
266 """Checks if locking is enabled.
267
268 @raise errors.ProgrammerError: In case locking is not enabled
269
270 """
271 if not self._enable_locks:
272 raise errors.ProgrammerError("Attempted to use disabled locks")
273
274 - def _AcquireLocks(self, level, names, shared, timeout, priority):
275 """Acquires locks via the Ganeti lock manager.
276
277 @type level: int
278 @param level: Lock level
279 @type names: list or string
280 @param names: Lock names
281 @type shared: bool
282 @param shared: Whether the locks should be acquired in shared mode
283 @type timeout: None or float
284 @param timeout: Timeout for acquiring the locks
285 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
286 amount of time
287
288 """
289 self._CheckLocksEnabled()
290
291 if self._cbs:
292 self._cbs.CheckCancel()
293
294 acquired = self.context.glm.acquire(level, names, shared=shared,
295 timeout=timeout, priority=priority)
296
297 if acquired is None:
298 raise LockAcquireTimeout()
299
300 return acquired
301
303 """Logical Unit execution sequence.
304
305 """
306 write_count = self.context.cfg.write_count
307 lu.CheckPrereq()
308
309 hm = self.BuildHooksManager(lu)
310 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
311 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
312 self.Log, None)
313
314 if getattr(lu.op, "dry_run", False):
315
316
317
318 self.LogInfo("dry-run mode requested, not actually executing"
319 " the operation")
320 return lu.dry_run_result
321
322 if self._cbs:
323 submit_mj_fn = self._cbs.SubmitManyJobs
324 else:
325 submit_mj_fn = _FailingSubmitManyJobs
326
327 try:
328 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
329 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
330 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
331 self.Log, result)
332 finally:
333
334 if write_count != self.context.cfg.write_count:
335 hm.RunConfigUpdate()
336
337 return result
338
341
343 """Execute a Logical Unit, with the needed locks.
344
345 This is a recursive function that starts locking the given level, and
346 proceeds up, till there are no more locks to acquire. Then it executes the
347 given LU and its opcodes.
348
349 """
350 adding_locks = level in lu.add_locks
351 acquiring_locks = level in lu.needed_locks
352 if level not in locking.LEVELS:
353 if self._cbs:
354 self._cbs.NotifyStart()
355
356 result = self._ExecLU(lu)
357
358 elif adding_locks and acquiring_locks:
359
360
361 raise NotImplementedError("Can't declare locks to acquire when adding"
362 " others")
363
364 elif adding_locks or acquiring_locks:
365 self._CheckLocksEnabled()
366
367 lu.DeclareLocks(level)
368 share = lu.share_locks[level]
369
370 try:
371 assert adding_locks ^ acquiring_locks, \
372 "Locks must be either added or acquired"
373
374 if acquiring_locks:
375
376 needed_locks = lu.needed_locks[level]
377
378 self._AcquireLocks(level, needed_locks, share,
379 calc_timeout(), priority)
380 else:
381
382 add_locks = lu.add_locks[level]
383 lu.remove_locks[level] = add_locks
384
385 try:
386 self.context.glm.add(level, add_locks, acquired=1, shared=share)
387 except errors.LockError:
388 logging.exception("Detected lock error in level %s for locks"
389 " %s, shared=%s", level, add_locks, share)
390 raise errors.OpPrereqError(
391 "Couldn't add locks (%s), most likely because of another"
392 " job who added them first" % add_locks,
393 errors.ECODE_NOTUNIQUE)
394
395 try:
396 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
397 finally:
398 if level in lu.remove_locks:
399 self.context.glm.remove(level, lu.remove_locks[level])
400 finally:
401 if self.context.glm.is_owned(level):
402 self.context.glm.release(level)
403
404 else:
405 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
406
407 return result
408
409 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
410 """Execute an opcode.
411
412 @type op: an OpCode instance
413 @param op: the opcode to be executed
414 @type cbs: L{OpExecCbBase}
415 @param cbs: Runtime callbacks
416 @type timeout: float or None
417 @param timeout: Maximum time to acquire all locks, None for no timeout
418 @type priority: number or None
419 @param priority: Priority for acquiring lock(s)
420 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
421 amount of time
422
423 """
424 if not isinstance(op, opcodes.OpCode):
425 raise errors.ProgrammerError("Non-opcode instance passed"
426 " to ExecOpcode (%s)" % type(op))
427
428 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
429 if lu_class is None:
430 raise errors.OpCodeUnknown("Unknown opcode")
431
432 if timeout is None:
433 calc_timeout = lambda: None
434 else:
435 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
436
437 self._cbs = cbs
438 try:
439 if self._enable_locks:
440
441
442
443 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
444 not lu_class.REQ_BGL, calc_timeout(),
445 priority)
446 elif lu_class.REQ_BGL:
447 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
448 " disabled" % op.OP_ID)
449
450 try:
451 lu = lu_class(self, op, self.context, self.rpc)
452 lu.ExpandNames()
453 assert lu.needed_locks is not None, "needed_locks not set by LU"
454
455 try:
456 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
457 priority)
458 finally:
459 if self._ec_id:
460 self.context.cfg.DropECReservations(self._ec_id)
461 finally:
462
463 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
464 assert self._enable_locks
465 self.context.glm.release(locking.LEVEL_CLUSTER)
466 finally:
467 self._cbs = None
468
469 resultcheck_fn = op.OP_RESULT
470 if not (resultcheck_fn is None or resultcheck_fn(result)):
471 logging.error("Expected opcode result matching %s, got %s",
472 resultcheck_fn, result)
473 if not getattr(op, "dry_run", False):
474
475
476
477 raise errors.OpResultError("Opcode result does not match %s: %s" %
478 (resultcheck_fn, utils.Truncate(result, 80)))
479
480 return result
481
482 - def Log(self, *args):
483 """Forward call to feedback callback function.
484
485 """
486 if self._cbs:
487 self._cbs.Feedback(*args)
488
489 - def LogStep(self, current, total, message):
490 """Log a change in LU execution progress.
491
492 """
493 logging.debug("Step %d/%d %s", current, total, message)
494 self.Log("STEP %d/%d %s" % (current, total, message))
495
497 """Log a warning to the logs and the user.
498
499 The optional keyword argument is 'hint' and can be used to show a
500 hint to the user (presumably related to the warning). If the
501 message is empty, it will not be printed at all, allowing one to
502 show only a hint.
503
504 """
505 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
506 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
507 if args:
508 message = message % tuple(args)
509 if message:
510 logging.warning(message)
511 self.Log(" - WARNING: %s" % message)
512 if "hint" in kwargs:
513 self.Log(" Hint: %s" % kwargs["hint"])
514
515 - def LogInfo(self, message, *args):
523
525 """Returns the current execution context ID.
526
527 """
528 if not self._ec_id:
529 raise errors.ProgrammerError("Tried to use execution context id when"
530 " not set")
531 return self._ec_id
532
535 - def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
536 hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
537 master_name=None):
538 """Base class for hooks masters.
539
540 This class invokes the execution of hooks according to the behaviour
541 specified by its parameters.
542
543 @type opcode: string
544 @param opcode: opcode of the operation to which the hooks are tied
545 @type hooks_path: string
546 @param hooks_path: prefix of the hooks directories
547 @type nodes: 2-tuple of lists
548 @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
549 run and nodes on which post-hooks must be run
550 @type hooks_execution_fn: function that accepts the following parameters:
551 (node_list, hooks_path, phase, environment)
552 @param hooks_execution_fn: function that will execute the hooks; can be
553 None, indicating that no conversion is necessary.
554 @type hooks_results_adapt_fn: function
555 @param hooks_results_adapt_fn: function that will adapt the return value of
556 hooks_execution_fn to the format expected by RunPhase
557 @type build_env_fn: function that returns a dictionary having strings as
558 keys
559 @param build_env_fn: function that builds the environment for the hooks
560 @type log_fn: function that accepts a string
561 @param log_fn: logging function
562 @type htype: string or None
563 @param htype: None or one of L{constants.HTYPE_CLUSTER},
564 L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
565 @type cluster_name: string
566 @param cluster_name: name of the cluster
567 @type master_name: string
568 @param master_name: name of the master
569
570 """
571 self.opcode = opcode
572 self.hooks_path = hooks_path
573 self.hooks_execution_fn = hooks_execution_fn
574 self.hooks_results_adapt_fn = hooks_results_adapt_fn
575 self.build_env_fn = build_env_fn
576 self.log_fn = log_fn
577 self.htype = htype
578 self.cluster_name = cluster_name
579 self.master_name = master_name
580
581 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
582 (self.pre_nodes, self.post_nodes) = nodes
583
585 """Compute the environment and the target nodes.
586
587 Based on the opcode and the current node list, this builds the
588 environment for the hooks and the target node list for the run.
589
590 """
591 if phase == constants.HOOKS_PHASE_PRE:
592 prefix = "GANETI_"
593 elif phase == constants.HOOKS_PHASE_POST:
594 prefix = "GANETI_POST_"
595 else:
596 raise AssertionError("Unknown phase '%s'" % phase)
597
598 env = {}
599
600 if self.hooks_path is not None:
601 phase_env = self.build_env_fn()
602 if phase_env:
603 assert not compat.any(key.upper().startswith(prefix)
604 for key in phase_env)
605 env.update(("%s%s" % (prefix, key), value)
606 for (key, value) in phase_env.items())
607
608 if phase == constants.HOOKS_PHASE_PRE:
609 assert compat.all((key.startswith("GANETI_") and
610 not key.startswith("GANETI_POST_"))
611 for key in env)
612
613 elif phase == constants.HOOKS_PHASE_POST:
614 assert compat.all(key.startswith("GANETI_POST_") for key in env)
615 assert isinstance(self.pre_env, dict)
616
617
618 assert not compat.any(key.startswith("GANETI_POST_")
619 for key in self.pre_env)
620 env.update(self.pre_env)
621 else:
622 raise AssertionError("Unknown phase '%s'" % phase)
623
624 return env
625
626 - def _RunWrapper(self, node_list, hpath, phase, phase_env):
627 """Simple wrapper over self.callfn.
628
629 This method fixes the environment before executing the hooks.
630
631 """
632 env = {
633 "PATH": constants.HOOKS_PATH,
634 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
635 "GANETI_OP_CODE": self.opcode,
636 "GANETI_DATA_DIR": constants.DATA_DIR,
637 "GANETI_HOOKS_PHASE": phase,
638 "GANETI_HOOKS_PATH": hpath,
639 }
640
641 if self.htype:
642 env["GANETI_OBJECT_TYPE"] = self.htype
643
644 if self.cluster_name is not None:
645 env["GANETI_CLUSTER"] = self.cluster_name
646
647 if self.master_name is not None:
648 env["GANETI_MASTER"] = self.master_name
649
650 if phase_env:
651 env = utils.algo.JoinDisjointDicts(env, phase_env)
652
653
654 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
655
656 assert compat.all(key == "PATH" or key.startswith("GANETI_")
657 for key in env)
658
659 return self.hooks_execution_fn(node_list, hpath, phase, env)
660
662 """Run all the scripts for a phase.
663
664 This is the main function of the HookMaster.
665 It executes self.hooks_execution_fn, and after running
666 self.hooks_results_adapt_fn on its results it expects them to be in the form
667 {node_name: (fail_msg, [(script, result, output), ...]}).
668
669 @param phase: one of L{constants.HOOKS_PHASE_POST} or
670 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
671 @param nodes: overrides the predefined list of nodes for the given phase
672 @return: the processed results of the hooks multi-node rpc call
673 @raise errors.HooksFailure: on communication failure to the nodes
674 @raise errors.HooksAbort: on failure of one of the hooks
675
676 """
677 if phase == constants.HOOKS_PHASE_PRE:
678 if nodes is None:
679 nodes = self.pre_nodes
680 env = self.pre_env
681 elif phase == constants.HOOKS_PHASE_POST:
682 if nodes is None:
683 nodes = self.post_nodes
684 env = self._BuildEnv(phase)
685 else:
686 raise AssertionError("Unknown phase '%s'" % phase)
687
688 if not nodes:
689
690
691
692 return
693
694 results = self._RunWrapper(nodes, self.hooks_path, phase, env)
695 if not results:
696 msg = "Communication Failure"
697 if phase == constants.HOOKS_PHASE_PRE:
698 raise errors.HooksFailure(msg)
699 else:
700 self.log_fn(msg)
701 return results
702
703 converted_res = results
704 if self.hooks_results_adapt_fn:
705 converted_res = self.hooks_results_adapt_fn(results)
706
707 errs = []
708 for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
709 if offline:
710 continue
711
712 if fail_msg:
713 self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
714 continue
715
716 for script, hkr, output in hooks_results:
717 if hkr == constants.HKR_FAIL:
718 if phase == constants.HOOKS_PHASE_PRE:
719 errs.append((node_name, script, output))
720 else:
721 if not output:
722 output = "(no output)"
723 self.log_fn("On %s script %s failed, output: %s" %
724 (node_name, script, output))
725
726 if errs and phase == constants.HOOKS_PHASE_PRE:
727 raise errors.HooksAbort(errs)
728
729 return results
730
732 """Run the special configuration update hook
733
734 This is a special hook that runs only on the master after each
735 top-level LI if the configuration has been updated.
736
737 """
738 phase = constants.HOOKS_PHASE_POST
739 hpath = constants.HOOKS_NAME_CFGUPDATE
740 nodes = [self.master_name]
741 self._RunWrapper(nodes, hpath, phase, self.pre_env)
742
743 @staticmethod
745 if lu.HPATH is None:
746 nodes = (None, None)
747 else:
748 nodes = map(frozenset, lu.BuildHooksNodes())
749
750 master_name = cluster_name = None
751 if lu.cfg:
752 master_name = lu.cfg.GetMasterNode()
753 cluster_name = lu.cfg.GetClusterName()
754
755 return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
756 _RpcResultsToHooksResults, lu.BuildHooksEnv,
757 lu.LogWarning, lu.HTYPE, cluster_name, master_name)
758