1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
66
67 _NODE_ALLOC_WHITELIST = frozenset([])
68
69
70
71
72 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
73 cmdlib.LUBackupExport,
74 cmdlib.LUBackupRemove,
75 cmdlib.LUOobCommand,
76 ])
77
78
80 """Exception to report timeouts on acquiring locks.
81
82 """
83
84
108
109
111 """Class with lock acquire timeout strategy.
112
113 """
114 __slots__ = [
115 "_timeouts",
116 "_random_fn",
117 "_time_fn",
118 ]
119
120 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
121
122 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
123 """Initializes this class.
124
125 @param _time_fn: Time function for unittests
126 @param _random_fn: Random number generator for unittests
127
128 """
129 object.__init__(self)
130
131 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
132 self._time_fn = _time_fn
133 self._random_fn = _random_fn
134
136 """Returns the timeout for the next attempt.
137
138 """
139 try:
140 timeout = self._timeouts.next()
141 except StopIteration:
142
143 timeout = None
144
145 if timeout is not None:
146
147
148 variation_range = timeout * 0.1
149 timeout += ((self._random_fn() * variation_range) -
150 (variation_range * 0.5))
151
152 return timeout
153
154
156 """Base class for OpCode execution callbacks.
157
158 """
160 """Called when we are about to execute the LU.
161
162 This function is called when we're about to start the lu's Exec() method,
163 that is, after we have acquired all locks.
164
165 """
166
168 """Called when we are about to reset an LU to retry again.
169
170 This function is called after PrepareRetry successfully completed.
171
172 """
173
175 """Sends feedback from the LU code to the end-user.
176
177 """
178
180 """Returns current priority or C{None}.
181
182 """
183 return None
184
186 """Submits jobs for processing.
187
188 See L{jqueue.JobQueue.SubmitManyJobs}.
189
190 """
191 raise NotImplementedError
192
193
195 """Computes the LU name for a given OpCode name.
196
197 """
198 assert opname.startswith(_OP_PREFIX), \
199 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
200
201 return _LU_PREFIX + opname[len(_OP_PREFIX):]
202
203
211
212
214 """Copies basic opcode parameters.
215
216 @type src: L{opcodes.OpCode}
217 @param src: Source opcode
218 @type defcomment: string
219 @param defcomment: Comment to specify if not already given
220 @type dst: L{opcodes.OpCode}
221 @param dst: Destination opcode
222
223 """
224 if hasattr(src, "debug_level"):
225 dst.debug_level = src.debug_level
226
227 if (getattr(dst, "priority", None) is None and
228 hasattr(src, "priority")):
229 dst.priority = src.priority
230
231 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
232 dst.comment = defcomment
233
234 if hasattr(src, constants.OPCODE_REASON):
235 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
236 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
237
238
240 """Examines opcode result.
241
242 If necessary, additional processing on the result is done.
243
244 """
245 if isinstance(result, cmdlib.ResultWithJobs):
246
247 map(compat.partial(_SetBaseOpParams, op,
248 "Submitted by %s" % op.OP_ID),
249 itertools.chain(*result.jobs))
250
251
252 job_submission = submit_fn(result.jobs)
253
254
255 result = result.other
256
257 assert constants.JOB_IDS_KEY not in result, \
258 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
259
260 result[constants.JOB_IDS_KEY] = job_submission
261
262 return result
263
264
266 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
267
268 """
269 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
270 " queries) can not submit jobs")
271
272
275 """Performs consistency checks on locks acquired by a logical unit.
276
277 @type lu: L{cmdlib.LogicalUnit}
278 @param lu: Logical unit instance
279
280 """
281 if not __debug__:
282 return
283
284 allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC)
285 have_nal = locking.NAL in allocset
286
287 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
288
289 if level in lu.needed_locks:
290 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
291 share_level = lu.share_locks[level]
292
293 if lu.__class__ in _mode_whitelist:
294 assert share_node_alloc != share_level, \
295 "LU is whitelisted to use different modes for node allocation lock"
296 else:
297 assert bool(share_node_alloc) == bool(share_level), \
298 ("Node allocation lock must be acquired using the same mode as nodes"
299 " and node resources")
300
301 if lu.__class__ in _nal_whitelist:
302 assert not have_nal, \
303 "LU is whitelisted for not acquiring the node allocation lock"
304 elif lu.needed_locks[level] == locking.ALL_SET:
305 assert have_nal, \
306 ("Node allocation lock must be used if an LU acquires all nodes"
307 " or node resources")
308
309
311 """If 'names' is a string, make it a single-element list.
312
313 @type names: list or string or NoneType
314 @param names: Lock names
315 @rtype: a list of strings
316 @return: if 'names' argument is an iterable, a list of it;
317 if it's a string, make it a one-element list;
318 if L{locking.ALL_SET}, L{locking.ALL_SET}
319
320 """
321 if names == locking.ALL_SET:
322 return names
323 elif isinstance(names, basestring):
324 return [names]
325 else:
326 return list(names)
327
328
330 """Object which runs OpCodes"""
331 DISPATCH_TABLE = _ComputeDispatchTable()
332
333 - def __init__(self, context, ec_id, enable_locks=True):
334 """Constructor for Processor
335
336 @type context: GanetiContext
337 @param context: global Ganeti context
338 @type ec_id: string
339 @param ec_id: execution context identifier
340
341 """
342 self.context = context
343 self._ec_id = ec_id
344 self._cbs = None
345 self.cfg = context.GetConfig(ec_id)
346 self.rpc = context.GetRpc(self.cfg)
347 self.hmclass = hooksmaster.HooksMaster
348 self._enable_locks = enable_locks
349 self.wconfd = wconfd
350 self._wconfdcontext = context.GetWConfdContext(ec_id)
351
353 """Checks if locking is enabled.
354
355 @raise errors.ProgrammerError: In case locking is not enabled
356
357 """
358 if not self._enable_locks:
359 raise errors.ProgrammerError("Attempted to use disabled locks")
360
362 """Request locks from WConfD and wait for them to be granted.
363
364 @type request: list
365 @param request: the lock request to be sent to WConfD
366 @type timeout: float
367 @param timeout: the time to wait for the request to be granted
368 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
369 amount of time; in this case, locks still might be acquired or a request
370 pending.
371
372 """
373 logging.debug("Trying %ss to request %s for %s",
374 timeout, request, self._wconfdcontext)
375 if self._cbs:
376 priority = self._cbs.CurrentPriority()
377 else:
378 priority = None
379
380 if priority is None:
381 priority = constants.OP_PRIO_DEFAULT
382
383
384 if sighupReceived[0]:
385 logging.warning("Ignoring unexpected SIGHUP")
386 sighupReceived[0] = False
387
388
389 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
390 request)
391 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
392
393 if pending:
394 def _HasPending():
395 if sighupReceived[0]:
396 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
397 else:
398 return True
399
400 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
401
402 signal = sighupReceived[0]
403
404 if pending:
405 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
406
407 if pending and signal:
408 logging.warning("Ignoring unexpected SIGHUP")
409 sighupReceived[0] = False
410
411 logging.debug("Finished trying. Pending: %s", pending)
412 if pending:
413 raise LockAcquireTimeout()
414
415 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
416 opportunistic_count=1, request_only=False):
417 """Acquires locks via the Ganeti lock manager.
418
419 @type level: int
420 @param level: Lock level
421 @type names: list or string
422 @param names: Lock names
423 @type shared: bool
424 @param shared: Whether the locks should be acquired in shared mode
425 @type opportunistic: bool
426 @param opportunistic: Whether to acquire opportunistically
427 @type timeout: None or float
428 @param timeout: Timeout for acquiring the locks
429 @type request_only: bool
430 @param request_only: do not acquire the locks, just return the request
431 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
432 amount of time; in this case, locks still might be acquired or a request
433 pending.
434
435 """
436 self._CheckLocksEnabled()
437
438 if self._cbs:
439 priority = self._cbs.CurrentPriority()
440 else:
441 priority = None
442
443 if priority is None:
444 priority = constants.OP_PRIO_DEFAULT
445
446 if names == locking.ALL_SET:
447 if opportunistic:
448 expand_fns = {
449 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
450 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
451 locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
452 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
453 locking.LEVEL_NODE: self.cfg.GetNodeList,
454 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
455 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
456 }
457 names = expand_fns[level]()
458 else:
459 names = locking.LOCKSET_NAME
460
461 names = _LockList(names)
462
463
464 names.sort()
465
466 levelname = locking.LEVEL_NAMES[level]
467
468 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
469
470 if not names:
471 logging.debug("Acquiring no locks for (%s) at level %s",
472 self._wconfdcontext, levelname)
473 return []
474
475 if shared:
476 request = [[lock, "shared"] for lock in locks]
477 else:
478 request = [[lock, "exclusive"] for lock in locks]
479
480 if request_only:
481 logging.debug("Lock request for level %s is %s", level, request)
482 return request
483
484 self.cfg.OutDate()
485
486 if timeout is None:
487
488
489
490 logging.info("Definitely requesting %s for %s",
491 request, self._wconfdcontext)
492
493
494 for r in request:
495 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
496 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
497 [r])
498 while True:
499 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
500 if not pending:
501 break
502 time.sleep(10.0 * random.random())
503
504 elif opportunistic:
505 logging.debug("For %ss trying to opportunistically acquire"
506 " at least %d of %s for %s.",
507 timeout, opportunistic_count, locks, self._wconfdcontext)
508 locks = utils.SimpleRetry(
509 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
510 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
511 logging.debug("Managed to get the following locks: %s", locks)
512 if locks == []:
513 raise LockAcquireTimeout()
514 else:
515 self._RequestAndWait(request, timeout)
516
517 return locks
518
520 """Logical Unit execution sequence.
521
522 """
523 write_count = self.cfg.write_count
524 lu.cfg.OutDate()
525 lu.CheckPrereq()
526
527 hm = self.BuildHooksManager(lu)
528 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
529 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
530 self.Log, None)
531
532 if getattr(lu.op, "dry_run", False):
533
534
535
536 self.LogInfo("dry-run mode requested, not actually executing"
537 " the operation")
538 return lu.dry_run_result
539
540 if self._cbs:
541 submit_mj_fn = self._cbs.SubmitManyJobs
542 else:
543 submit_mj_fn = _FailingSubmitManyJobs
544
545 lusExecuting[0] += 1
546 try:
547 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
548 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
549 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
550 self.Log, result)
551 finally:
552
553 lusExecuting[0] -= 1
554 if write_count != self.cfg.write_count:
555 hm.RunConfigUpdate()
556
557 return result
558
561
563 """Execute a Logical Unit, with the needed locks.
564
565 This is a recursive function that starts locking the given level, and
566 proceeds up, till there are no more locks to acquire. Then it executes the
567 given LU and its opcodes.
568
569 """
570 pending = pending or []
571 logging.debug("Looking at locks of level %s, still need to obtain %s",
572 level, pending)
573 adding_locks = level in lu.add_locks
574 acquiring_locks = level in lu.needed_locks
575
576 if level not in locking.LEVELS:
577 if pending:
578 self._RequestAndWait(pending, calc_timeout())
579 lu.cfg.OutDate()
580 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
581 pending = []
582
583 logging.debug("Finished acquiring locks")
584
585 _VerifyLocks(lu)
586
587 if self._cbs:
588 self._cbs.NotifyStart()
589
590 try:
591 result = self._ExecLU(lu)
592 except errors.OpPrereqError, err:
593 (_, ecode) = err.args
594 if ecode != errors.ECODE_TEMP_NORES:
595 raise
596 logging.debug("Temporarily out of resources; will retry internally")
597 try:
598 lu.PrepareRetry(self.Log)
599 if self._cbs:
600 self._cbs.NotifyRetry()
601 except errors.OpRetryNotSupportedError:
602 logging.debug("LU does not know how to retry.")
603 raise err
604 raise LockAcquireTimeout()
605 except AssertionError, err:
606
607
608
609 (_, _, tb) = sys.exc_info()
610 err_info = traceback.format_tb(tb)
611 del tb
612 logging.exception("Detected AssertionError")
613 raise errors.OpExecError("Internal assertion error: please report"
614 " this as a bug.\nError message: '%s';"
615 " location:\n%s" % (str(err), err_info[-1]))
616 return result
617
618
619 opportunistic = lu.opportunistic_locks[level]
620
621 dont_collate = lu.dont_collate_locks[level]
622
623 if dont_collate and pending:
624 self._RequestAndWait(pending, calc_timeout())
625 lu.cfg.OutDate()
626 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
627 pending = []
628
629 if adding_locks and opportunistic:
630
631
632
633 raise NotImplementedError("Can't opportunistically acquire locks when"
634 " adding new ones")
635
636 if adding_locks and acquiring_locks and \
637 lu.needed_locks[level] == locking.ALL_SET:
638
639
640 raise NotImplementedError("Can't request all locks of a certain level"
641 " and add new locks")
642
643 if adding_locks or acquiring_locks:
644 self._CheckLocksEnabled()
645
646 lu.DeclareLocks(level)
647 share = lu.share_locks[level]
648 opportunistic_count = lu.opportunistic_locks_count[level]
649
650 try:
651 if acquiring_locks:
652 needed_locks = _LockList(lu.needed_locks[level])
653 else:
654 needed_locks = []
655
656 if adding_locks:
657 needed_locks.extend(_LockList(lu.add_locks[level]))
658
659 timeout = calc_timeout()
660 if timeout is not None and not opportunistic:
661 pending = pending + self._AcquireLocks(level, needed_locks, share,
662 opportunistic, timeout,
663 request_only=True)
664 else:
665 if pending:
666 self._RequestAndWait(pending, calc_timeout())
667 lu.cfg.OutDate()
668 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
669 pending = []
670 self._AcquireLocks(level, needed_locks, share, opportunistic,
671 timeout,
672 opportunistic_count=opportunistic_count)
673 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
674
675 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
676 pending=pending)
677 finally:
678 levelname = locking.LEVEL_NAMES[level]
679 logging.debug("Freeing locks at level %s for %s",
680 levelname, self._wconfdcontext)
681 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
682 else:
683 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
684
685 return result
686
687
689 """Check the LU result against the contract in the opcode.
690
691 """
692 resultcheck_fn = op.OP_RESULT
693 if not (resultcheck_fn is None or resultcheck_fn(result)):
694 logging.error("Expected opcode result matching %s, got %s",
695 resultcheck_fn, result)
696 if not getattr(op, "dry_run", False):
697
698
699
700 raise errors.OpResultError("Opcode result does not match %s: %s" %
701 (resultcheck_fn, utils.Truncate(result, 80)))
702
704 """Execute an opcode.
705
706 @type op: an OpCode instance
707 @param op: the opcode to be executed
708 @type cbs: L{OpExecCbBase}
709 @param cbs: Runtime callbacks
710 @type timeout: float or None
711 @param timeout: Maximum time to acquire all locks, None for no timeout
712 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
713 amount of time
714
715 """
716 if not isinstance(op, opcodes.OpCode):
717 raise errors.ProgrammerError("Non-opcode instance passed"
718 " to ExecOpcode (%s)" % type(op))
719
720 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
721 if lu_class is None:
722 raise errors.OpCodeUnknown("Unknown opcode")
723
724 if timeout is None:
725 calc_timeout = lambda: None
726 else:
727 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
728
729 self._cbs = cbs
730 try:
731 if self._enable_locks:
732
733
734
735 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
736 not lu_class.REQ_BGL, False, calc_timeout())
737 elif lu_class.REQ_BGL:
738 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
739 " disabled" % op.OP_ID)
740
741 lu = lu_class(self, op, self.context, self.cfg, self.rpc,
742 self._wconfdcontext, self.wconfd)
743 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
744 lu.ExpandNames()
745 assert lu.needed_locks is not None, "needed_locks not set by LU"
746
747 try:
748 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
749 calc_timeout)
750 finally:
751 if self._ec_id:
752 self.cfg.DropECReservations(self._ec_id)
753 finally:
754 self.wconfd.Client().FreeLocksLevel(
755 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
756 self._cbs = None
757
758 self._CheckLUResult(op, result)
759
760 return result
761
762 - def Log(self, *args):
763 """Forward call to feedback callback function.
764
765 """
766 if self._cbs:
767 self._cbs.Feedback(*args)
768
769 - def LogStep(self, current, total, message):
770 """Log a change in LU execution progress.
771
772 """
773 logging.debug("Step %d/%d %s", current, total, message)
774 self.Log("STEP %d/%d %s" % (current, total, message))
775
777 """Log a warning to the logs and the user.
778
779 The optional keyword argument is 'hint' and can be used to show a
780 hint to the user (presumably related to the warning). If the
781 message is empty, it will not be printed at all, allowing one to
782 show only a hint.
783
784 """
785 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
786 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
787 if args:
788 message = message % tuple(args)
789 if message:
790 logging.warning(message)
791 self.Log(" - WARNING: %s" % message)
792 if "hint" in kwargs:
793 self.Log(" Hint: %s" % kwargs["hint"])
794
795 - def LogInfo(self, message, *args):
803
805 """Returns the current execution context ID.
806
807 """
808 if not self._ec_id:
809 raise errors.ProgrammerError("Tried to use execution context id when"
810 " not set")
811 return self._ec_id
812