1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
95
96
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129
130 timeout = None
131
132 if timeout is not None:
133
134
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
143 """Base class for OpCode execution callbacks.
144
145 """
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
198
199
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233
234 map(compat.partial(_SetBaseOpParams, op,
235 "Submitted by %s" % op.OP_ID),
236 itertools.chain(*result.jobs))
237
238
239 job_submission = submit_fn(result.jobs)
240
241
242 result = result.other
243
244 assert constants.JOB_IDS_KEY not in result, \
245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
246
247 result[constants.JOB_IDS_KEY] = job_submission
248
249 return result
250
251
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
254
255 """
256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
257 " queries) can not submit jobs")
258
259
261 """If 'names' is a string, make it a single-element list.
262
263 @type names: list or string or NoneType
264 @param names: Lock names
265 @rtype: a list of strings
266 @return: if 'names' argument is an iterable, a list of it;
267 if it's a string, make it a one-element list;
268 if L{locking.ALL_SET}, L{locking.ALL_SET}
269
270 """
271 if names == locking.ALL_SET:
272 return names
273 elif isinstance(names, basestring):
274 return [names]
275 else:
276 return list(names)
277
278
280 """Check if secret parameters are expected, but missing.
281
282 """
283 if hasattr(op, "osparams_secret") and op.osparams_secret:
284 for secret_param in op.osparams_secret:
285 if op.osparams_secret[secret_param].Get() == constants.REDACTED:
286 raise errors.OpPrereqError("Please re-submit secret parameters to job.",
287 errors.ECODE_INVAL)
288
289
291 """Object which runs OpCodes"""
292 DISPATCH_TABLE = _ComputeDispatchTable()
293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor
296
297 @type context: GanetiContext
298 @param context: global Ganeti context
299 @type ec_id: string
300 @param ec_id: execution context identifier
301
302 """
303 self._ec_id = ec_id
304 self._cbs = None
305 self.cfg = context.GetConfig(ec_id)
306 self.rpc = context.GetRpc(self.cfg)
307 self.hmclass = hooksmaster.HooksMaster
308 self._hm = None
309 self._enable_locks = enable_locks
310 self.wconfd = wconfd
311 self._wconfdcontext = context.GetWConfdContext(ec_id)
312
314 """Checks if locking is enabled.
315
316 @raise errors.ProgrammerError: In case locking is not enabled
317
318 """
319 if not self._enable_locks:
320 raise errors.ProgrammerError("Attempted to use disabled locks")
321
323 """Request locks from WConfD and wait for them to be granted.
324
325 @type request: list
326 @param request: the lock request to be sent to WConfD
327 @type timeout: float
328 @param timeout: the time to wait for the request to be granted
329 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
330 amount of time; in this case, locks still might be acquired or a request
331 pending.
332
333 """
334 logging.debug("Trying %ss to request %s for %s",
335 timeout, request, self._wconfdcontext)
336 if self._cbs:
337 priority = self._cbs.CurrentPriority()
338 else:
339 priority = None
340
341 if priority is None:
342 priority = constants.OP_PRIO_DEFAULT
343
344
345 if sighupReceived[0]:
346 logging.warning("Ignoring unexpected SIGHUP")
347 sighupReceived[0] = False
348
349
350 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
351 request)
352 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
353
354 if pending:
355 def _HasPending():
356 if sighupReceived[0]:
357 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
358 else:
359 return True
360
361 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
362
363 signal = sighupReceived[0]
364
365 if pending:
366 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
367
368 if pending and signal:
369 logging.warning("Ignoring unexpected SIGHUP")
370 sighupReceived[0] = False
371
372 logging.debug("Finished trying. Pending: %s", pending)
373 if pending:
374 raise LockAcquireTimeout()
375
376 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
377 opportunistic_count=1, request_only=False):
378 """Acquires locks via the Ganeti lock manager.
379
380 @type level: int
381 @param level: Lock level
382 @type names: list or string
383 @param names: Lock names
384 @type shared: bool
385 @param shared: Whether the locks should be acquired in shared mode
386 @type opportunistic: bool
387 @param opportunistic: Whether to acquire opportunistically
388 @type timeout: None or float
389 @param timeout: Timeout for acquiring the locks
390 @type request_only: bool
391 @param request_only: do not acquire the locks, just return the request
392 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
393 amount of time; in this case, locks still might be acquired or a request
394 pending.
395
396 """
397 self._CheckLocksEnabled()
398
399 if self._cbs:
400 priority = self._cbs.CurrentPriority()
401 else:
402 priority = None
403
404 if priority is None:
405 priority = constants.OP_PRIO_DEFAULT
406
407 if names == locking.ALL_SET:
408 if opportunistic:
409 expand_fns = {
410 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
411 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
412 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
413 locking.LEVEL_NODE: self.cfg.GetNodeList,
414 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
415 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
416 }
417 names = expand_fns[level]()
418 else:
419 names = locking.LOCKSET_NAME
420
421 names = _LockList(names)
422
423
424 names.sort()
425
426 levelname = locking.LEVEL_NAMES[level]
427
428 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
429
430 if not names:
431 logging.debug("Acquiring no locks for (%s) at level %s",
432 self._wconfdcontext, levelname)
433 return []
434
435 if shared:
436 request = [[lock, "shared"] for lock in locks]
437 else:
438 request = [[lock, "exclusive"] for lock in locks]
439
440 if request_only:
441 logging.debug("Lock request for level %s is %s", level, request)
442 return request
443
444 self.cfg.OutDate()
445
446 if timeout is None:
447
448
449
450 logging.info("Definitely requesting %s for %s",
451 request, self._wconfdcontext)
452
453
454 for r in request:
455 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
456 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
457 [r])
458 while True:
459 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
460 if not pending:
461 break
462 time.sleep(10.0 * random.random())
463
464 elif opportunistic:
465 logging.debug("For %ss trying to opportunistically acquire"
466 " at least %d of %s for %s.",
467 timeout, opportunistic_count, locks, self._wconfdcontext)
468 locks = utils.SimpleRetry(
469 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
470 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
471 logging.debug("Managed to get the following locks: %s", locks)
472 if locks == []:
473 raise LockAcquireTimeout()
474 else:
475 self._RequestAndWait(request, timeout)
476
477 return locks
478
480 """Logical Unit execution sequence.
481
482 """
483 write_count = self.cfg.write_count
484 lu.cfg.OutDate()
485 lu.CheckPrereq()
486
487 self._hm = self.BuildHooksManager(lu)
488 try:
489
490 self._hm.RunPhase(constants.HOOKS_PHASE_PRE, is_global=True)
491 h_results = self._hm.RunPhase(constants.HOOKS_PHASE_PRE)
492 except Exception, err:
493
494
495
496 lu.HooksAbortCallBack(constants.HOOKS_PHASE_PRE, self.Log, err)
497
498
499 raise err
500 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
501 self.Log, None)
502
503 if getattr(lu.op, "dry_run", False):
504
505
506
507 self.LogInfo("dry-run mode requested, not actually executing"
508 " the operation")
509 return lu.dry_run_result
510
511 if self._cbs:
512 submit_mj_fn = self._cbs.SubmitManyJobs
513 else:
514 submit_mj_fn = _FailingSubmitManyJobs
515
516 lusExecuting[0] += 1
517 try:
518 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
519 h_results = self._hm.RunPhase(constants.HOOKS_PHASE_POST)
520 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
521 self.Log, result)
522 finally:
523
524 lusExecuting[0] -= 1
525 if write_count != self.cfg.write_count:
526 self._hm.RunConfigUpdate()
527
528 return result
529
533
535 """Execute a Logical Unit, with the needed locks.
536
537 This is a recursive function that starts locking the given level, and
538 proceeds up, till there are no more locks to acquire. Then it executes the
539 given LU and its opcodes.
540
541 """
542 pending = pending or []
543 logging.debug("Looking at locks of level %s, still need to obtain %s",
544 level, pending)
545 adding_locks = level in lu.add_locks
546 acquiring_locks = level in lu.needed_locks
547
548 if level not in locking.LEVELS:
549 if pending:
550 self._RequestAndWait(pending, calc_timeout())
551 lu.cfg.OutDate()
552 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
553 pending = []
554
555 logging.debug("Finished acquiring locks")
556
557 if self._cbs:
558 self._cbs.NotifyStart()
559
560 try:
561 result = self._ExecLU(lu)
562 except errors.OpPrereqError, err:
563 (_, ecode) = err.args
564 if ecode != errors.ECODE_TEMP_NORES:
565 raise
566 logging.debug("Temporarily out of resources; will retry internally")
567 try:
568 lu.PrepareRetry(self.Log)
569 if self._cbs:
570 self._cbs.NotifyRetry()
571 except errors.OpRetryNotSupportedError:
572 logging.debug("LU does not know how to retry.")
573 raise err
574 raise LockAcquireTimeout()
575 except AssertionError, err:
576
577
578
579 (_, _, tb) = sys.exc_info()
580 err_info = traceback.format_tb(tb)
581 del tb
582 logging.exception("Detected AssertionError")
583 raise errors.OpExecError("Internal assertion error: please report"
584 " this as a bug.\nError message: '%s';"
585 " location:\n%s" % (str(err), err_info[-1]))
586 return result
587
588
589 opportunistic = lu.opportunistic_locks[level]
590
591 dont_collate = lu.dont_collate_locks[level]
592
593 if dont_collate and pending:
594 self._RequestAndWait(pending, calc_timeout())
595 lu.cfg.OutDate()
596 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
597 pending = []
598
599 if adding_locks and opportunistic:
600
601
602
603 raise NotImplementedError("Can't opportunistically acquire locks when"
604 " adding new ones")
605
606 if adding_locks and acquiring_locks and \
607 lu.needed_locks[level] == locking.ALL_SET:
608
609
610 raise NotImplementedError("Can't request all locks of a certain level"
611 " and add new locks")
612
613 if adding_locks or acquiring_locks:
614 self._CheckLocksEnabled()
615
616 lu.DeclareLocks(level)
617 share = lu.share_locks[level]
618 opportunistic_count = lu.opportunistic_locks_count[level]
619
620 try:
621 if acquiring_locks:
622 needed_locks = _LockList(lu.needed_locks[level])
623 else:
624 needed_locks = []
625
626 if adding_locks:
627 needed_locks.extend(_LockList(lu.add_locks[level]))
628
629 timeout = calc_timeout()
630 if timeout is not None and not opportunistic:
631 pending = pending + self._AcquireLocks(level, needed_locks, share,
632 opportunistic, timeout,
633 request_only=True)
634 else:
635 if pending:
636 self._RequestAndWait(pending, calc_timeout())
637 lu.cfg.OutDate()
638 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
639 pending = []
640 self._AcquireLocks(level, needed_locks, share, opportunistic,
641 timeout,
642 opportunistic_count=opportunistic_count)
643 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
644
645 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
646 pending=pending)
647 finally:
648 levelname = locking.LEVEL_NAMES[level]
649 logging.debug("Freeing locks at level %s for %s",
650 levelname, self._wconfdcontext)
651 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
652 else:
653 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
654
655 return result
656
657
659 """Check the LU result against the contract in the opcode.
660
661 """
662 resultcheck_fn = op.OP_RESULT
663 if not (resultcheck_fn is None or resultcheck_fn(result)):
664 logging.error("Expected opcode result matching %s, got %s",
665 resultcheck_fn, result)
666 if not getattr(op, "dry_run", False):
667
668
669
670 raise errors.OpResultError("Opcode result does not match %s: %s" %
671 (resultcheck_fn, utils.Truncate(result, 80)))
672
674 """Execute an opcode.
675
676 @param op: the opcode to be executed
677 @param lu_class: the LU class implementing the current opcode
678 @param calc_timeout: The function calculating the time remaining
679 to acquire all locks, None for no timeout
680 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
681 amount of time
682
683 """
684 try:
685 if self._enable_locks:
686
687
688
689 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
690 not lu_class.REQ_BGL, False, calc_timeout())
691 elif lu_class.REQ_BGL:
692 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
693 " disabled" % op.OP_ID)
694
695 lu = lu_class(self, op, self.cfg, self.rpc,
696 self._wconfdcontext, self.wconfd)
697 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
698 _CheckSecretParameters(op)
699 lu.ExpandNames()
700 assert lu.needed_locks is not None, "needed_locks not set by LU"
701
702 try:
703 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
704 calc_timeout)
705 finally:
706 if self._ec_id:
707 self.cfg.DropECReservations(self._ec_id)
708 finally:
709 self.wconfd.Client().FreeLocksLevel(
710 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
711 self._cbs = None
712
713 return result
714
716 """Execute an opcode.
717
718 @type op: an OpCode instance
719 @param op: the opcode to be executed
720 @type cbs: L{OpExecCbBase}
721 @param cbs: Runtime callbacks
722 @type timeout: float or None
723 @param timeout: Maximum time to acquire all locks, None for no timeout
724 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
725 amount of time
726
727 """
728 if not isinstance(op, opcodes.OpCode):
729 raise errors.ProgrammerError("Non-opcode instance passed"
730 " to ExecOpcode (%s)" % type(op))
731
732 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
733 if lu_class is None:
734 raise errors.OpCodeUnknown("Unknown opcode")
735
736 if timeout is None:
737 calc_timeout = lambda: None
738 else:
739 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
740
741 self._cbs = cbs
742 try:
743 result = self._PrepareLockListsAndExecLU(op, lu_class, calc_timeout)
744
745
746
747
748 if self._hm is not None:
749 self._hm.RunPhase(constants.HOOKS_PHASE_POST, is_global=True,
750 post_status=constants.POST_HOOKS_STATUS_SUCCESS)
751 except:
752
753 hooksmaster.ExecGlobalPostHooks(op.OP_ID, self.cfg.GetMasterNodeName(),
754 self.rpc.call_hooks_runner,
755 logging.warning,
756 self.cfg.GetClusterName(),
757 self.cfg.GetMasterNode(), self.GetECId(),
758 constants.POST_HOOKS_STATUS_ERROR)
759 raise
760
761 self._CheckLUResult(op, result)
762 return result
763
764 - def Log(self, *args):
765 """Forward call to feedback callback function.
766
767 """
768 if self._cbs:
769 self._cbs.Feedback(*args)
770
771 - def LogStep(self, current, total, message):
772 """Log a change in LU execution progress.
773
774 """
775 logging.debug("Step %d/%d %s", current, total, message)
776 self.Log("STEP %d/%d %s" % (current, total, message))
777
779 """Log a warning to the logs and the user.
780
781 The optional keyword argument is 'hint' and can be used to show a
782 hint to the user (presumably related to the warning). If the
783 message is empty, it will not be printed at all, allowing one to
784 show only a hint.
785
786 """
787 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
788 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
789 if args:
790 message = message % tuple(args)
791 if message:
792 logging.warning(message)
793 self.Log(" - WARNING: %s" % message)
794 if "hint" in kwargs:
795 self.Log(" Hint: %s" % kwargs["hint"])
796
797 - def LogInfo(self, message, *args):
805
807 """Returns the current execution context ID.
808
809 """
810 if not self._ec_id:
811 raise errors.ProgrammerError("Tried to use execution context id when"
812 " not set")
813 return self._ec_id
814