1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
95
96
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129
130 timeout = None
131
132 if timeout is not None:
133
134
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
143 """Base class for OpCode execution callbacks.
144
145 """
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
198
199
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233
234 map(compat.partial(_SetBaseOpParams, op,
235 "Submitted by %s" % op.OP_ID),
236 itertools.chain(*result.jobs))
237
238
239 job_submission = submit_fn(result.jobs)
240
241
242 result = result.other
243
244 assert constants.JOB_IDS_KEY not in result, \
245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
246
247 result[constants.JOB_IDS_KEY] = job_submission
248
249 return result
250
251
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
254
255 """
256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
257 " queries) can not submit jobs")
258
259
261 """If 'names' is a string, make it a single-element list.
262
263 @type names: list or string or NoneType
264 @param names: Lock names
265 @rtype: a list of strings
266 @return: if 'names' argument is an iterable, a list of it;
267 if it's a string, make it a one-element list;
268 if L{locking.ALL_SET}, L{locking.ALL_SET}
269
270 """
271 if names == locking.ALL_SET:
272 return names
273 elif isinstance(names, basestring):
274 return [names]
275 else:
276 return list(names)
277
278
280 """Check if secret parameters are expected, but missing.
281
282 """
283 if hasattr(op, "osparams_secret") and op.osparams_secret:
284 for secret_param in op.osparams_secret:
285 if op.osparams_secret[secret_param].Get() == constants.REDACTED:
286 raise errors.OpPrereqError("Please re-submit secret parameters to job.",
287 errors.ECODE_INVAL)
288
289
291 """Object which runs OpCodes"""
292 DISPATCH_TABLE = _ComputeDispatchTable()
293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor
296
297 @type context: GanetiContext
298 @param context: global Ganeti context
299 @type ec_id: string
300 @param ec_id: execution context identifier
301
302 """
303 self._ec_id = ec_id
304 self._cbs = None
305 self.cfg = context.GetConfig(ec_id)
306 self.rpc = context.GetRpc(self.cfg)
307 self.hmclass = hooksmaster.HooksMaster
308 self._enable_locks = enable_locks
309 self.wconfd = wconfd
310 self._wconfdcontext = context.GetWConfdContext(ec_id)
311
313 """Checks if locking is enabled.
314
315 @raise errors.ProgrammerError: In case locking is not enabled
316
317 """
318 if not self._enable_locks:
319 raise errors.ProgrammerError("Attempted to use disabled locks")
320
322 """Request locks from WConfD and wait for them to be granted.
323
324 @type request: list
325 @param request: the lock request to be sent to WConfD
326 @type timeout: float
327 @param timeout: the time to wait for the request to be granted
328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
329 amount of time; in this case, locks still might be acquired or a request
330 pending.
331
332 """
333 logging.debug("Trying %ss to request %s for %s",
334 timeout, request, self._wconfdcontext)
335 if self._cbs:
336 priority = self._cbs.CurrentPriority()
337 else:
338 priority = None
339
340 if priority is None:
341 priority = constants.OP_PRIO_DEFAULT
342
343
344 if sighupReceived[0]:
345 logging.warning("Ignoring unexpected SIGHUP")
346 sighupReceived[0] = False
347
348
349 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
350 request)
351 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
352
353 if pending:
354 def _HasPending():
355 if sighupReceived[0]:
356 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
357 else:
358 return True
359
360 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
361
362 signal = sighupReceived[0]
363
364 if pending:
365 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
366
367 if pending and signal:
368 logging.warning("Ignoring unexpected SIGHUP")
369 sighupReceived[0] = False
370
371 logging.debug("Finished trying. Pending: %s", pending)
372 if pending:
373 raise LockAcquireTimeout()
374
375 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
376 opportunistic_count=1, request_only=False):
377 """Acquires locks via the Ganeti lock manager.
378
379 @type level: int
380 @param level: Lock level
381 @type names: list or string
382 @param names: Lock names
383 @type shared: bool
384 @param shared: Whether the locks should be acquired in shared mode
385 @type opportunistic: bool
386 @param opportunistic: Whether to acquire opportunistically
387 @type timeout: None or float
388 @param timeout: Timeout for acquiring the locks
389 @type request_only: bool
390 @param request_only: do not acquire the locks, just return the request
391 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
392 amount of time; in this case, locks still might be acquired or a request
393 pending.
394
395 """
396 self._CheckLocksEnabled()
397
398 if self._cbs:
399 priority = self._cbs.CurrentPriority()
400 else:
401 priority = None
402
403 if priority is None:
404 priority = constants.OP_PRIO_DEFAULT
405
406 if names == locking.ALL_SET:
407 if opportunistic:
408 expand_fns = {
409 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
410 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
411 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
412 locking.LEVEL_NODE: self.cfg.GetNodeList,
413 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
414 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
415 }
416 names = expand_fns[level]()
417 else:
418 names = locking.LOCKSET_NAME
419
420 names = _LockList(names)
421
422
423 names.sort()
424
425 levelname = locking.LEVEL_NAMES[level]
426
427 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
428
429 if not names:
430 logging.debug("Acquiring no locks for (%s) at level %s",
431 self._wconfdcontext, levelname)
432 return []
433
434 if shared:
435 request = [[lock, "shared"] for lock in locks]
436 else:
437 request = [[lock, "exclusive"] for lock in locks]
438
439 if request_only:
440 logging.debug("Lock request for level %s is %s", level, request)
441 return request
442
443 self.cfg.OutDate()
444
445 if timeout is None:
446
447
448
449 logging.info("Definitely requesting %s for %s",
450 request, self._wconfdcontext)
451
452
453 for r in request:
454 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
456 [r])
457 while True:
458 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
459 if not pending:
460 break
461 time.sleep(10.0 * random.random())
462
463 elif opportunistic:
464 logging.debug("For %ss trying to opportunistically acquire"
465 " at least %d of %s for %s.",
466 timeout, opportunistic_count, locks, self._wconfdcontext)
467 locks = utils.SimpleRetry(
468 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
469 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
470 logging.debug("Managed to get the following locks: %s", locks)
471 if locks == []:
472 raise LockAcquireTimeout()
473 else:
474 self._RequestAndWait(request, timeout)
475
476 return locks
477
479 """Logical Unit execution sequence.
480
481 """
482 write_count = self.cfg.write_count
483 lu.cfg.OutDate()
484 lu.CheckPrereq()
485
486 hm = self.BuildHooksManager(lu)
487 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
488 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
489 self.Log, None)
490
491 if getattr(lu.op, "dry_run", False):
492
493
494
495 self.LogInfo("dry-run mode requested, not actually executing"
496 " the operation")
497 return lu.dry_run_result
498
499 if self._cbs:
500 submit_mj_fn = self._cbs.SubmitManyJobs
501 else:
502 submit_mj_fn = _FailingSubmitManyJobs
503
504 lusExecuting[0] += 1
505 try:
506 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
507 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
508 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
509 self.Log, result)
510 finally:
511
512 lusExecuting[0] -= 1
513 if write_count != self.cfg.write_count:
514 hm.RunConfigUpdate()
515
516 return result
517
520
522 """Execute a Logical Unit, with the needed locks.
523
524 This is a recursive function that starts locking the given level, and
525 proceeds up, till there are no more locks to acquire. Then it executes the
526 given LU and its opcodes.
527
528 """
529 pending = pending or []
530 logging.debug("Looking at locks of level %s, still need to obtain %s",
531 level, pending)
532 adding_locks = level in lu.add_locks
533 acquiring_locks = level in lu.needed_locks
534
535 if level not in locking.LEVELS:
536 if pending:
537 self._RequestAndWait(pending, calc_timeout())
538 lu.cfg.OutDate()
539 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
540 pending = []
541
542 logging.debug("Finished acquiring locks")
543
544 if self._cbs:
545 self._cbs.NotifyStart()
546
547 try:
548 result = self._ExecLU(lu)
549 except errors.OpPrereqError, err:
550 (_, ecode) = err.args
551 if ecode != errors.ECODE_TEMP_NORES:
552 raise
553 logging.debug("Temporarily out of resources; will retry internally")
554 try:
555 lu.PrepareRetry(self.Log)
556 if self._cbs:
557 self._cbs.NotifyRetry()
558 except errors.OpRetryNotSupportedError:
559 logging.debug("LU does not know how to retry.")
560 raise err
561 raise LockAcquireTimeout()
562 except AssertionError, err:
563
564
565
566 (_, _, tb) = sys.exc_info()
567 err_info = traceback.format_tb(tb)
568 del tb
569 logging.exception("Detected AssertionError")
570 raise errors.OpExecError("Internal assertion error: please report"
571 " this as a bug.\nError message: '%s';"
572 " location:\n%s" % (str(err), err_info[-1]))
573 return result
574
575
576 opportunistic = lu.opportunistic_locks[level]
577
578 dont_collate = lu.dont_collate_locks[level]
579
580 if dont_collate and pending:
581 self._RequestAndWait(pending, calc_timeout())
582 lu.cfg.OutDate()
583 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
584 pending = []
585
586 if adding_locks and opportunistic:
587
588
589
590 raise NotImplementedError("Can't opportunistically acquire locks when"
591 " adding new ones")
592
593 if adding_locks and acquiring_locks and \
594 lu.needed_locks[level] == locking.ALL_SET:
595
596
597 raise NotImplementedError("Can't request all locks of a certain level"
598 " and add new locks")
599
600 if adding_locks or acquiring_locks:
601 self._CheckLocksEnabled()
602
603 lu.DeclareLocks(level)
604 share = lu.share_locks[level]
605 opportunistic_count = lu.opportunistic_locks_count[level]
606
607 try:
608 if acquiring_locks:
609 needed_locks = _LockList(lu.needed_locks[level])
610 else:
611 needed_locks = []
612
613 if adding_locks:
614 needed_locks.extend(_LockList(lu.add_locks[level]))
615
616 timeout = calc_timeout()
617 if timeout is not None and not opportunistic:
618 pending = pending + self._AcquireLocks(level, needed_locks, share,
619 opportunistic, timeout,
620 request_only=True)
621 else:
622 if pending:
623 self._RequestAndWait(pending, calc_timeout())
624 lu.cfg.OutDate()
625 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
626 pending = []
627 self._AcquireLocks(level, needed_locks, share, opportunistic,
628 timeout,
629 opportunistic_count=opportunistic_count)
630 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
631
632 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
633 pending=pending)
634 finally:
635 levelname = locking.LEVEL_NAMES[level]
636 logging.debug("Freeing locks at level %s for %s",
637 levelname, self._wconfdcontext)
638 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
639 else:
640 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
641
642 return result
643
644
646 """Check the LU result against the contract in the opcode.
647
648 """
649 resultcheck_fn = op.OP_RESULT
650 if not (resultcheck_fn is None or resultcheck_fn(result)):
651 logging.error("Expected opcode result matching %s, got %s",
652 resultcheck_fn, result)
653 if not getattr(op, "dry_run", False):
654
655
656
657 raise errors.OpResultError("Opcode result does not match %s: %s" %
658 (resultcheck_fn, utils.Truncate(result, 80)))
659
661 """Execute an opcode.
662
663 @type op: an OpCode instance
664 @param op: the opcode to be executed
665 @type cbs: L{OpExecCbBase}
666 @param cbs: Runtime callbacks
667 @type timeout: float or None
668 @param timeout: Maximum time to acquire all locks, None for no timeout
669 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
670 amount of time
671
672 """
673 if not isinstance(op, opcodes.OpCode):
674 raise errors.ProgrammerError("Non-opcode instance passed"
675 " to ExecOpcode (%s)" % type(op))
676
677 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
678 if lu_class is None:
679 raise errors.OpCodeUnknown("Unknown opcode")
680
681 if timeout is None:
682 calc_timeout = lambda: None
683 else:
684 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
685
686 self._cbs = cbs
687 try:
688 if self._enable_locks:
689
690
691
692 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
693 not lu_class.REQ_BGL, False, calc_timeout())
694 elif lu_class.REQ_BGL:
695 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
696 " disabled" % op.OP_ID)
697
698 lu = lu_class(self, op, self.cfg, self.rpc,
699 self._wconfdcontext, self.wconfd)
700 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
701 _CheckSecretParameters(op)
702 lu.ExpandNames()
703 assert lu.needed_locks is not None, "needed_locks not set by LU"
704
705 try:
706 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
707 calc_timeout)
708 finally:
709 if self._ec_id:
710 self.cfg.DropECReservations(self._ec_id)
711 finally:
712 self.wconfd.Client().FreeLocksLevel(
713 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
714 self._cbs = None
715
716 self._CheckLUResult(op, result)
717
718 return result
719
720 - def Log(self, *args):
721 """Forward call to feedback callback function.
722
723 """
724 if self._cbs:
725 self._cbs.Feedback(*args)
726
727 - def LogStep(self, current, total, message):
728 """Log a change in LU execution progress.
729
730 """
731 logging.debug("Step %d/%d %s", current, total, message)
732 self.Log("STEP %d/%d %s" % (current, total, message))
733
735 """Log a warning to the logs and the user.
736
737 The optional keyword argument is 'hint' and can be used to show a
738 hint to the user (presumably related to the warning). If the
739 message is empty, it will not be printed at all, allowing one to
740 show only a hint.
741
742 """
743 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
744 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
745 if args:
746 message = message % tuple(args)
747 if message:
748 logging.warning(message)
749 self.Log(" - WARNING: %s" % message)
750 if "hint" in kwargs:
751 self.Log(" Hint: %s" % kwargs["hint"])
752
753 - def LogInfo(self, message, *args):
761
763 """Returns the current execution context ID.
764
765 """
766 if not self._ec_id:
767 raise errors.ProgrammerError("Tried to use execution context id when"
768 " not set")
769 return self._ec_id
770