1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
66
67 _NODE_ALLOC_WHITELIST = frozenset([])
68
69
70
71
72 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
73 cmdlib.LUBackupExport,
74 cmdlib.LUBackupRemove,
75 cmdlib.LUOobCommand,
76 ])
77
78
80 """Exception to report timeouts on acquiring locks.
81
82 """
83
84
108
109
111 """Class with lock acquire timeout strategy.
112
113 """
114 __slots__ = [
115 "_timeouts",
116 "_random_fn",
117 "_time_fn",
118 ]
119
120 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
121
122 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
123 """Initializes this class.
124
125 @param _time_fn: Time function for unittests
126 @param _random_fn: Random number generator for unittests
127
128 """
129 object.__init__(self)
130
131 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
132 self._time_fn = _time_fn
133 self._random_fn = _random_fn
134
136 """Returns the timeout for the next attempt.
137
138 """
139 try:
140 timeout = self._timeouts.next()
141 except StopIteration:
142
143 timeout = None
144
145 if timeout is not None:
146
147
148 variation_range = timeout * 0.1
149 timeout += ((self._random_fn() * variation_range) -
150 (variation_range * 0.5))
151
152 return timeout
153
154
156 """Base class for OpCode execution callbacks.
157
158 """
160 """Called when we are about to execute the LU.
161
162 This function is called when we're about to start the lu's Exec() method,
163 that is, after we have acquired all locks.
164
165 """
166
168 """Sends feedback from the LU code to the end-user.
169
170 """
171
173 """Returns current priority or C{None}.
174
175 """
176 return None
177
179 """Submits jobs for processing.
180
181 See L{jqueue.JobQueue.SubmitManyJobs}.
182
183 """
184 raise NotImplementedError
185
186
188 """Computes the LU name for a given OpCode name.
189
190 """
191 assert opname.startswith(_OP_PREFIX), \
192 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
193
194 return _LU_PREFIX + opname[len(_OP_PREFIX):]
195
196
204
205
207 """Copies basic opcode parameters.
208
209 @type src: L{opcodes.OpCode}
210 @param src: Source opcode
211 @type defcomment: string
212 @param defcomment: Comment to specify if not already given
213 @type dst: L{opcodes.OpCode}
214 @param dst: Destination opcode
215
216 """
217 if hasattr(src, "debug_level"):
218 dst.debug_level = src.debug_level
219
220 if (getattr(dst, "priority", None) is None and
221 hasattr(src, "priority")):
222 dst.priority = src.priority
223
224 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
225 dst.comment = defcomment
226
227 if hasattr(src, constants.OPCODE_REASON):
228 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
229 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
230
231
233 """Examines opcode result.
234
235 If necessary, additional processing on the result is done.
236
237 """
238 if isinstance(result, cmdlib.ResultWithJobs):
239
240 map(compat.partial(_SetBaseOpParams, op,
241 "Submitted by %s" % op.OP_ID),
242 itertools.chain(*result.jobs))
243
244
245 job_submission = submit_fn(result.jobs)
246
247
248 result = result.other
249
250 assert constants.JOB_IDS_KEY not in result, \
251 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
252
253 result[constants.JOB_IDS_KEY] = job_submission
254
255 return result
256
257
259 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
260
261 """
262 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
263 " queries) can not submit jobs")
264
265
268 """Performs consistency checks on locks acquired by a logical unit.
269
270 @type lu: L{cmdlib.LogicalUnit}
271 @param lu: Logical unit instance
272
273 """
274 if not __debug__:
275 return
276
277 allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC)
278 have_nal = locking.NAL in allocset
279
280 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
281
282 if level in lu.needed_locks:
283 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
284 share_level = lu.share_locks[level]
285
286 if lu.__class__ in _mode_whitelist:
287 assert share_node_alloc != share_level, \
288 "LU is whitelisted to use different modes for node allocation lock"
289 else:
290 assert bool(share_node_alloc) == bool(share_level), \
291 ("Node allocation lock must be acquired using the same mode as nodes"
292 " and node resources")
293
294 if lu.__class__ in _nal_whitelist:
295 assert not have_nal, \
296 "LU is whitelisted for not acquiring the node allocation lock"
297 elif lu.needed_locks[level] == locking.ALL_SET:
298 assert have_nal, \
299 ("Node allocation lock must be used if an LU acquires all nodes"
300 " or node resources")
301
302
304 """If 'names' is a string, make it a single-element list.
305
306 @type names: list or string or NoneType
307 @param names: Lock names
308 @rtype: a list of strings
309 @return: if 'names' argument is an iterable, a list of it;
310 if it's a string, make it a one-element list;
311 if L{locking.ALL_SET}, L{locking.ALL_SET}
312
313 """
314 if names == locking.ALL_SET:
315 return names
316 elif isinstance(names, basestring):
317 return [names]
318 else:
319 return list(names)
320
321
323 """Object which runs OpCodes"""
324 DISPATCH_TABLE = _ComputeDispatchTable()
325
326 - def __init__(self, context, ec_id, enable_locks=True):
327 """Constructor for Processor
328
329 @type context: GanetiContext
330 @param context: global Ganeti context
331 @type ec_id: string
332 @param ec_id: execution context identifier
333
334 """
335 self.context = context
336 self._ec_id = ec_id
337 self._cbs = None
338 self.cfg = context.GetConfig(ec_id)
339 self.rpc = context.GetRpc(self.cfg)
340 self.hmclass = hooksmaster.HooksMaster
341 self._enable_locks = enable_locks
342 self.wconfd = wconfd
343 self._wconfdcontext = context.GetWConfdContext(ec_id)
344
346 """Checks if locking is enabled.
347
348 @raise errors.ProgrammerError: In case locking is not enabled
349
350 """
351 if not self._enable_locks:
352 raise errors.ProgrammerError("Attempted to use disabled locks")
353
354 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
355 opportunistic_count=1):
356 """Acquires locks via the Ganeti lock manager.
357
358 @type level: int
359 @param level: Lock level
360 @type names: list or string
361 @param names: Lock names
362 @type shared: bool
363 @param shared: Whether the locks should be acquired in shared mode
364 @type opportunistic: bool
365 @param opportunistic: Whether to acquire opportunistically
366 @type timeout: None or float
367 @param timeout: Timeout for acquiring the locks
368 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
369 amount of time
370
371 """
372 self._CheckLocksEnabled()
373
374 if self._cbs:
375 priority = self._cbs.CurrentPriority()
376 else:
377 priority = None
378
379 if priority is None:
380 priority = constants.OP_PRIO_DEFAULT
381
382 if names == locking.ALL_SET:
383 if opportunistic:
384 expand_fns = {
385 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
386 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
387 locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
388 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
389 locking.LEVEL_NODE: self.cfg.GetNodeList,
390 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
391 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
392 }
393 names = expand_fns[level]()
394 else:
395 names = locking.LOCKSET_NAME
396
397 names = _LockList(names)
398
399
400 names.sort()
401
402 levelname = locking.LEVEL_NAMES[level]
403
404 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
405
406 if not names:
407 logging.debug("Acquiring no locks for (%s) at level %s",
408 self._wconfdcontext, levelname)
409 return []
410
411 if shared:
412 request = [[lock, "shared"] for lock in locks]
413 else:
414 request = [[lock, "exclusive"] for lock in locks]
415
416 self.cfg.OutDate()
417
418 if timeout is None:
419
420
421
422 logging.info("Definitely requesting %s for %s",
423 request, self._wconfdcontext)
424
425
426 for r in request:
427 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
428 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
429 [r])
430 while True:
431 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
432 if not pending:
433 break
434 time.sleep(10.0 * random.random())
435
436 elif opportunistic:
437 logging.debug("For %ss trying to opportunistically acquire"
438 " at least %d of %s for %s.",
439 timeout, opportunistic_count, locks, self._wconfdcontext)
440 locks = utils.SimpleRetry(
441 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
442 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
443 logging.debug("Managed to get the following locks: %s", locks)
444 if locks == []:
445 raise LockAcquireTimeout()
446 else:
447 logging.debug("Trying %ss to request %s for %s",
448 timeout, request, self._wconfdcontext)
449
450 if sighupReceived[0]:
451 logging.warning("Ignoring unexpected SIGHUP")
452 sighupReceived[0] = False
453
454
455 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
456 request)
457 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
458
459 if pending:
460 def _HasPending():
461 if sighupReceived[0]:
462 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
463 else:
464 return True
465
466 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
467
468 signal = sighupReceived[0]
469
470 if pending:
471 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
472
473 if pending and signal:
474 logging.warning("Ignoring unexpected SIGHUP")
475 sighupReceived[0] = False
476
477 logging.debug("Finished trying. Pending: %s", pending)
478 if pending:
479
480
481 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
482 raise LockAcquireTimeout()
483
484 return locks
485
487 """Logical Unit execution sequence.
488
489 """
490 write_count = self.cfg.write_count
491 lu.cfg.OutDate()
492 lu.CheckPrereq()
493
494 hm = self.BuildHooksManager(lu)
495 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
496 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
497 self.Log, None)
498
499 if getattr(lu.op, "dry_run", False):
500
501
502
503 self.LogInfo("dry-run mode requested, not actually executing"
504 " the operation")
505 return lu.dry_run_result
506
507 if self._cbs:
508 submit_mj_fn = self._cbs.SubmitManyJobs
509 else:
510 submit_mj_fn = _FailingSubmitManyJobs
511
512 lusExecuting[0] += 1
513 try:
514 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
515 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
516 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
517 self.Log, result)
518 finally:
519
520 lusExecuting[0] -= 1
521 if write_count != self.cfg.write_count:
522 hm.RunConfigUpdate()
523
524 return result
525
528
530 """Execute a Logical Unit, with the needed locks.
531
532 This is a recursive function that starts locking the given level, and
533 proceeds up, till there are no more locks to acquire. Then it executes the
534 given LU and its opcodes.
535
536 """
537 adding_locks = level in lu.add_locks
538 acquiring_locks = level in lu.needed_locks
539
540 if level not in locking.LEVELS:
541 _VerifyLocks(lu)
542
543 if self._cbs:
544 self._cbs.NotifyStart()
545
546 try:
547 result = self._ExecLU(lu)
548 except AssertionError, err:
549
550
551
552 (_, _, tb) = sys.exc_info()
553 err_info = traceback.format_tb(tb)
554 del tb
555 logging.exception("Detected AssertionError")
556 raise errors.OpExecError("Internal assertion error: please report"
557 " this as a bug.\nError message: '%s';"
558 " location:\n%s" % (str(err), err_info[-1]))
559 return result
560
561
562 opportunistic = lu.opportunistic_locks[level]
563
564 if adding_locks and opportunistic:
565
566
567
568 raise NotImplementedError("Can't opportunistically acquire locks when"
569 " adding new ones")
570
571 if adding_locks and acquiring_locks and \
572 lu.needed_locks[level] == locking.ALL_SET:
573
574
575 raise NotImplementedError("Can't request all locks of a certain level"
576 " and add new locks")
577
578 if adding_locks or acquiring_locks:
579 self._CheckLocksEnabled()
580
581 lu.DeclareLocks(level)
582 share = lu.share_locks[level]
583 opportunistic_count = lu.opportunistic_locks_count[level]
584
585 try:
586 if acquiring_locks:
587 needed_locks = _LockList(lu.needed_locks[level])
588 else:
589 needed_locks = []
590
591 if adding_locks:
592 needed_locks.extend(_LockList(lu.add_locks[level]))
593
594 self._AcquireLocks(level, needed_locks, share, opportunistic,
595 calc_timeout(),
596 opportunistic_count=opportunistic_count)
597 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
598
599 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
600 finally:
601 levelname = locking.LEVEL_NAMES[level]
602 logging.debug("Freeing locks at level %s for %s",
603 levelname, self._wconfdcontext)
604 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
605 else:
606 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
607
608 return result
609
610
612 """Check the LU result against the contract in the opcode.
613
614 """
615 resultcheck_fn = op.OP_RESULT
616 if not (resultcheck_fn is None or resultcheck_fn(result)):
617 logging.error("Expected opcode result matching %s, got %s",
618 resultcheck_fn, result)
619 if not getattr(op, "dry_run", False):
620
621
622
623 raise errors.OpResultError("Opcode result does not match %s: %s" %
624 (resultcheck_fn, utils.Truncate(result, 80)))
625
627 """Execute an opcode.
628
629 @type op: an OpCode instance
630 @param op: the opcode to be executed
631 @type cbs: L{OpExecCbBase}
632 @param cbs: Runtime callbacks
633 @type timeout: float or None
634 @param timeout: Maximum time to acquire all locks, None for no timeout
635 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
636 amount of time
637
638 """
639 if not isinstance(op, opcodes.OpCode):
640 raise errors.ProgrammerError("Non-opcode instance passed"
641 " to ExecOpcode (%s)" % type(op))
642
643 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
644 if lu_class is None:
645 raise errors.OpCodeUnknown("Unknown opcode")
646
647 if timeout is None:
648 calc_timeout = lambda: None
649 else:
650 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
651
652 self._cbs = cbs
653 try:
654 if self._enable_locks:
655
656
657
658 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
659 not lu_class.REQ_BGL, False, calc_timeout())
660 elif lu_class.REQ_BGL:
661 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
662 " disabled" % op.OP_ID)
663
664 lu = lu_class(self, op, self.context, self.cfg, self.rpc,
665 self._wconfdcontext, self.wconfd)
666 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
667 lu.ExpandNames()
668 assert lu.needed_locks is not None, "needed_locks not set by LU"
669
670 try:
671 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
672 calc_timeout)
673 finally:
674 if self._ec_id:
675 self.cfg.DropECReservations(self._ec_id)
676 finally:
677 self.wconfd.Client().FreeLocksLevel(
678 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
679 self._cbs = None
680
681 self._CheckLUResult(op, result)
682
683 return result
684
685 - def Log(self, *args):
686 """Forward call to feedback callback function.
687
688 """
689 if self._cbs:
690 self._cbs.Feedback(*args)
691
692 - def LogStep(self, current, total, message):
693 """Log a change in LU execution progress.
694
695 """
696 logging.debug("Step %d/%d %s", current, total, message)
697 self.Log("STEP %d/%d %s" % (current, total, message))
698
700 """Log a warning to the logs and the user.
701
702 The optional keyword argument is 'hint' and can be used to show a
703 hint to the user (presumably related to the warning). If the
704 message is empty, it will not be printed at all, allowing one to
705 show only a hint.
706
707 """
708 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
709 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
710 if args:
711 message = message % tuple(args)
712 if message:
713 logging.warning(message)
714 self.Log(" - WARNING: %s" % message)
715 if "hint" in kwargs:
716 self.Log(" Hint: %s" % kwargs["hint"])
717
718 - def LogInfo(self, message, *args):
726
728 """Returns the current execution context ID.
729
730 """
731 if not self._ec_id:
732 raise errors.ProgrammerError("Tried to use execution context id when"
733 " not set")
734 return self._ec_id
735