1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
95
96
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129
130 timeout = None
131
132 if timeout is not None:
133
134
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
143 """Base class for OpCode execution callbacks.
144
145 """
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
198
199
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233
234 map(compat.partial(_SetBaseOpParams, op,
235 "Submitted by %s" % op.OP_ID),
236 itertools.chain(*result.jobs))
237
238
239 job_submission = submit_fn(result.jobs)
240
241
242 result = result.other
243
244 assert constants.JOB_IDS_KEY not in result, \
245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
246
247 result[constants.JOB_IDS_KEY] = job_submission
248
249 return result
250
251
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
254
255 """
256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
257 " queries) can not submit jobs")
258
259
261 """If 'names' is a string, make it a single-element list.
262
263 @type names: list or string or NoneType
264 @param names: Lock names
265 @rtype: a list of strings
266 @return: if 'names' argument is an iterable, a list of it;
267 if it's a string, make it a one-element list;
268 if L{locking.ALL_SET}, L{locking.ALL_SET}
269
270 """
271 if names == locking.ALL_SET:
272 return names
273 elif isinstance(names, basestring):
274 return [names]
275 else:
276 return list(names)
277
278
280 """Check if secret parameters are expected, but missing.
281
282 """
283 if hasattr(op, "osparams_secret") and op.osparams_secret:
284 for secret_param in op.osparams_secret:
285 if op.osparams_secret[secret_param].Get() == constants.REDACTED:
286 raise errors.OpPrereqError("Please re-submit secret parameters to job.",
287 errors.ECODE_INVAL)
288
289
291 """Object which runs OpCodes"""
292 DISPATCH_TABLE = _ComputeDispatchTable()
293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor
296
297 @type context: GanetiContext
298 @param context: global Ganeti context
299 @type ec_id: string
300 @param ec_id: execution context identifier
301
302 """
303 self._ec_id = ec_id
304 self._cbs = None
305 self.cfg = context.GetConfig(ec_id)
306 self.rpc = context.GetRpc(self.cfg)
307 self.hmclass = hooksmaster.HooksMaster
308 self._enable_locks = enable_locks
309 self.wconfd = wconfd
310 self._wconfdcontext = context.GetWConfdContext(ec_id)
311
313 """Checks if locking is enabled.
314
315 @raise errors.ProgrammerError: In case locking is not enabled
316
317 """
318 if not self._enable_locks:
319 raise errors.ProgrammerError("Attempted to use disabled locks")
320
322 """Request locks from WConfD and wait for them to be granted.
323
324 @type request: list
325 @param request: the lock request to be sent to WConfD
326 @type timeout: float
327 @param timeout: the time to wait for the request to be granted
328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
329 amount of time; in this case, locks still might be acquired or a request
330 pending.
331
332 """
333 logging.debug("Trying %ss to request %s for %s",
334 timeout, request, self._wconfdcontext)
335 if self._cbs:
336 priority = self._cbs.CurrentPriority()
337 else:
338 priority = None
339
340 if priority is None:
341 priority = constants.OP_PRIO_DEFAULT
342
343
344 if sighupReceived[0]:
345 logging.warning("Ignoring unexpected SIGHUP")
346 sighupReceived[0] = False
347
348
349 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
350 request)
351 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
352
353 if pending:
354 def _HasPending():
355 if sighupReceived[0]:
356 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
357 else:
358 return True
359
360 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
361
362 signal = sighupReceived[0]
363
364 if pending:
365 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
366
367 if pending and signal:
368 logging.warning("Ignoring unexpected SIGHUP")
369 sighupReceived[0] = False
370
371 logging.debug("Finished trying. Pending: %s", pending)
372 if pending:
373 raise LockAcquireTimeout()
374
375 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
376 opportunistic_count=1, request_only=False):
377 """Acquires locks via the Ganeti lock manager.
378
379 @type level: int
380 @param level: Lock level
381 @type names: list or string
382 @param names: Lock names
383 @type shared: bool
384 @param shared: Whether the locks should be acquired in shared mode
385 @type opportunistic: bool
386 @param opportunistic: Whether to acquire opportunistically
387 @type timeout: None or float
388 @param timeout: Timeout for acquiring the locks
389 @type request_only: bool
390 @param request_only: do not acquire the locks, just return the request
391 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
392 amount of time; in this case, locks still might be acquired or a request
393 pending.
394
395 """
396 self._CheckLocksEnabled()
397
398 if self._cbs:
399 priority = self._cbs.CurrentPriority()
400 else:
401 priority = None
402
403 if priority is None:
404 priority = constants.OP_PRIO_DEFAULT
405
406 if names == locking.ALL_SET:
407 if opportunistic:
408 expand_fns = {
409 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
410 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
411 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
412 locking.LEVEL_NODE: self.cfg.GetNodeList,
413 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
414 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
415 }
416 names = expand_fns[level]()
417 else:
418 names = locking.LOCKSET_NAME
419
420 names = _LockList(names)
421
422
423 names.sort()
424
425 levelname = locking.LEVEL_NAMES[level]
426
427 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
428
429 if not names:
430 logging.debug("Acquiring no locks for (%s) at level %s",
431 self._wconfdcontext, levelname)
432 return []
433
434 if shared:
435 request = [[lock, "shared"] for lock in locks]
436 else:
437 request = [[lock, "exclusive"] for lock in locks]
438
439 if request_only:
440 logging.debug("Lock request for level %s is %s", level, request)
441 return request
442
443 self.cfg.OutDate()
444
445 if timeout is None:
446
447
448
449 logging.info("Definitely requesting %s for %s",
450 request, self._wconfdcontext)
451
452
453 for r in request:
454 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
456 [r])
457 while True:
458 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
459 if not pending:
460 break
461 time.sleep(10.0 * random.random())
462
463 elif opportunistic:
464 logging.debug("For %ss trying to opportunistically acquire"
465 " at least %d of %s for %s.",
466 timeout, opportunistic_count, locks, self._wconfdcontext)
467 locks = utils.SimpleRetry(
468 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
469 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
470 logging.debug("Managed to get the following locks: %s", locks)
471 if locks == []:
472 raise LockAcquireTimeout()
473 else:
474 self._RequestAndWait(request, timeout)
475
476 return locks
477
479 """Logical Unit execution sequence.
480
481 """
482 write_count = self.cfg.write_count
483 lu.cfg.OutDate()
484 lu.CheckPrereq()
485
486 hm = self.BuildHooksManager(lu)
487 try:
488 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
489 except Exception, err:
490
491
492
493 lu.HooksAbortCallBack(constants.HOOKS_PHASE_PRE, self.Log, err)
494
495
496 raise err
497 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
498 self.Log, None)
499
500 if getattr(lu.op, "dry_run", False):
501
502
503
504 self.LogInfo("dry-run mode requested, not actually executing"
505 " the operation")
506 return lu.dry_run_result
507
508 if self._cbs:
509 submit_mj_fn = self._cbs.SubmitManyJobs
510 else:
511 submit_mj_fn = _FailingSubmitManyJobs
512
513 lusExecuting[0] += 1
514 try:
515 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
516 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
517 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
518 self.Log, result)
519 finally:
520
521 lusExecuting[0] -= 1
522 if write_count != self.cfg.write_count:
523 hm.RunConfigUpdate()
524
525 return result
526
529
531 """Execute a Logical Unit, with the needed locks.
532
533 This is a recursive function that starts locking the given level, and
534 proceeds up, till there are no more locks to acquire. Then it executes the
535 given LU and its opcodes.
536
537 """
538 pending = pending or []
539 logging.debug("Looking at locks of level %s, still need to obtain %s",
540 level, pending)
541 adding_locks = level in lu.add_locks
542 acquiring_locks = level in lu.needed_locks
543
544 if level not in locking.LEVELS:
545 if pending:
546 self._RequestAndWait(pending, calc_timeout())
547 lu.cfg.OutDate()
548 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
549 pending = []
550
551 logging.debug("Finished acquiring locks")
552
553 if self._cbs:
554 self._cbs.NotifyStart()
555
556 try:
557 result = self._ExecLU(lu)
558 except errors.OpPrereqError, err:
559 (_, ecode) = err.args
560 if ecode != errors.ECODE_TEMP_NORES:
561 raise
562 logging.debug("Temporarily out of resources; will retry internally")
563 try:
564 lu.PrepareRetry(self.Log)
565 if self._cbs:
566 self._cbs.NotifyRetry()
567 except errors.OpRetryNotSupportedError:
568 logging.debug("LU does not know how to retry.")
569 raise err
570 raise LockAcquireTimeout()
571 except AssertionError, err:
572
573
574
575 (_, _, tb) = sys.exc_info()
576 err_info = traceback.format_tb(tb)
577 del tb
578 logging.exception("Detected AssertionError")
579 raise errors.OpExecError("Internal assertion error: please report"
580 " this as a bug.\nError message: '%s';"
581 " location:\n%s" % (str(err), err_info[-1]))
582 return result
583
584
585 opportunistic = lu.opportunistic_locks[level]
586
587 dont_collate = lu.dont_collate_locks[level]
588
589 if dont_collate and pending:
590 self._RequestAndWait(pending, calc_timeout())
591 lu.cfg.OutDate()
592 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
593 pending = []
594
595 if adding_locks and opportunistic:
596
597
598
599 raise NotImplementedError("Can't opportunistically acquire locks when"
600 " adding new ones")
601
602 if adding_locks and acquiring_locks and \
603 lu.needed_locks[level] == locking.ALL_SET:
604
605
606 raise NotImplementedError("Can't request all locks of a certain level"
607 " and add new locks")
608
609 if adding_locks or acquiring_locks:
610 self._CheckLocksEnabled()
611
612 lu.DeclareLocks(level)
613 share = lu.share_locks[level]
614 opportunistic_count = lu.opportunistic_locks_count[level]
615
616 try:
617 if acquiring_locks:
618 needed_locks = _LockList(lu.needed_locks[level])
619 else:
620 needed_locks = []
621
622 if adding_locks:
623 needed_locks.extend(_LockList(lu.add_locks[level]))
624
625 timeout = calc_timeout()
626 if timeout is not None and not opportunistic:
627 pending = pending + self._AcquireLocks(level, needed_locks, share,
628 opportunistic, timeout,
629 request_only=True)
630 else:
631 if pending:
632 self._RequestAndWait(pending, calc_timeout())
633 lu.cfg.OutDate()
634 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
635 pending = []
636 self._AcquireLocks(level, needed_locks, share, opportunistic,
637 timeout,
638 opportunistic_count=opportunistic_count)
639 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
640
641 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
642 pending=pending)
643 finally:
644 levelname = locking.LEVEL_NAMES[level]
645 logging.debug("Freeing locks at level %s for %s",
646 levelname, self._wconfdcontext)
647 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
648 else:
649 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
650
651 return result
652
653
655 """Check the LU result against the contract in the opcode.
656
657 """
658 resultcheck_fn = op.OP_RESULT
659 if not (resultcheck_fn is None or resultcheck_fn(result)):
660 logging.error("Expected opcode result matching %s, got %s",
661 resultcheck_fn, result)
662 if not getattr(op, "dry_run", False):
663
664
665
666 raise errors.OpResultError("Opcode result does not match %s: %s" %
667 (resultcheck_fn, utils.Truncate(result, 80)))
668
670 """Execute an opcode.
671
672 @type op: an OpCode instance
673 @param op: the opcode to be executed
674 @type cbs: L{OpExecCbBase}
675 @param cbs: Runtime callbacks
676 @type timeout: float or None
677 @param timeout: Maximum time to acquire all locks, None for no timeout
678 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
679 amount of time
680
681 """
682 if not isinstance(op, opcodes.OpCode):
683 raise errors.ProgrammerError("Non-opcode instance passed"
684 " to ExecOpcode (%s)" % type(op))
685
686 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
687 if lu_class is None:
688 raise errors.OpCodeUnknown("Unknown opcode")
689
690 if timeout is None:
691 calc_timeout = lambda: None
692 else:
693 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
694
695 self._cbs = cbs
696 try:
697 if self._enable_locks:
698
699
700
701 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
702 not lu_class.REQ_BGL, False, calc_timeout())
703 elif lu_class.REQ_BGL:
704 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
705 " disabled" % op.OP_ID)
706
707 lu = lu_class(self, op, self.cfg, self.rpc,
708 self._wconfdcontext, self.wconfd)
709 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
710 _CheckSecretParameters(op)
711 lu.ExpandNames()
712 assert lu.needed_locks is not None, "needed_locks not set by LU"
713
714 try:
715 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
716 calc_timeout)
717 finally:
718 if self._ec_id:
719 self.cfg.DropECReservations(self._ec_id)
720 finally:
721 self.wconfd.Client().FreeLocksLevel(
722 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
723 self._cbs = None
724
725 self._CheckLUResult(op, result)
726
727 return result
728
729 - def Log(self, *args):
730 """Forward call to feedback callback function.
731
732 """
733 if self._cbs:
734 self._cbs.Feedback(*args)
735
736 - def LogStep(self, current, total, message):
737 """Log a change in LU execution progress.
738
739 """
740 logging.debug("Step %d/%d %s", current, total, message)
741 self.Log("STEP %d/%d %s" % (current, total, message))
742
744 """Log a warning to the logs and the user.
745
746 The optional keyword argument is 'hint' and can be used to show a
747 hint to the user (presumably related to the warning). If the
748 message is empty, it will not be printed at all, allowing one to
749 show only a hint.
750
751 """
752 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
753 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
754 if args:
755 message = message % tuple(args)
756 if message:
757 logging.warning(message)
758 self.Log(" - WARNING: %s" % message)
759 if "hint" in kwargs:
760 self.Log(" Hint: %s" % kwargs["hint"])
761
762 - def LogInfo(self, message, *args):
770
772 """Returns the current execution context ID.
773
774 """
775 if not self._ec_id:
776 raise errors.ProgrammerError("Tried to use execution context id when"
777 " not set")
778 return self._ec_id
779