1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
95
96
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129
130 timeout = None
131
132 if timeout is not None:
133
134
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
143 """Base class for OpCode execution callbacks.
144
145 """
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
161
163 """Sends feedback from the LU code to the end-user.
164
165 """
166
168 """Returns current priority or C{None}.
169
170 """
171 return None
172
174 """Submits jobs for processing.
175
176 See L{jqueue.JobQueue.SubmitManyJobs}.
177
178 """
179 raise NotImplementedError
180
181
183 """Computes the LU name for a given OpCode name.
184
185 """
186 assert opname.startswith(_OP_PREFIX), \
187 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
188
189 return _LU_PREFIX + opname[len(_OP_PREFIX):]
190
191
199
200
202 """Copies basic opcode parameters.
203
204 @type src: L{opcodes.OpCode}
205 @param src: Source opcode
206 @type defcomment: string
207 @param defcomment: Comment to specify if not already given
208 @type dst: L{opcodes.OpCode}
209 @param dst: Destination opcode
210
211 """
212 if hasattr(src, "debug_level"):
213 dst.debug_level = src.debug_level
214
215 if (getattr(dst, "priority", None) is None and
216 hasattr(src, "priority")):
217 dst.priority = src.priority
218
219 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
220 dst.comment = defcomment
221
222 if hasattr(src, constants.OPCODE_REASON):
223 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
224 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
225
226
228 """Examines opcode result.
229
230 If necessary, additional processing on the result is done.
231
232 """
233 if isinstance(result, cmdlib.ResultWithJobs):
234
235 map(compat.partial(_SetBaseOpParams, op,
236 "Submitted by %s" % op.OP_ID),
237 itertools.chain(*result.jobs))
238
239
240 job_submission = submit_fn(result.jobs)
241
242
243 result = result.other
244
245 assert constants.JOB_IDS_KEY not in result, \
246 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
247
248 result[constants.JOB_IDS_KEY] = job_submission
249
250 return result
251
252
254 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
255
256 """
257 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
258 " queries) can not submit jobs")
259
260
262 """If 'names' is a string, make it a single-element list.
263
264 @type names: list or string or NoneType
265 @param names: Lock names
266 @rtype: a list of strings
267 @return: if 'names' argument is an iterable, a list of it;
268 if it's a string, make it a one-element list;
269 if L{locking.ALL_SET}, L{locking.ALL_SET}
270
271 """
272 if names == locking.ALL_SET:
273 return names
274 elif isinstance(names, basestring):
275 return [names]
276 else:
277 return list(names)
278
279
281 """Object which runs OpCodes"""
282 DISPATCH_TABLE = _ComputeDispatchTable()
283
284 - def __init__(self, context, ec_id, enable_locks=True):
285 """Constructor for Processor
286
287 @type context: GanetiContext
288 @param context: global Ganeti context
289 @type ec_id: string
290 @param ec_id: execution context identifier
291
292 """
293 self.context = context
294 self._ec_id = ec_id
295 self._cbs = None
296 self.cfg = context.GetConfig(ec_id)
297 self.rpc = context.GetRpc(self.cfg)
298 self.hmclass = hooksmaster.HooksMaster
299 self._enable_locks = enable_locks
300 self.wconfd = wconfd
301 self._wconfdcontext = context.GetWConfdContext(ec_id)
302
304 """Checks if locking is enabled.
305
306 @raise errors.ProgrammerError: In case locking is not enabled
307
308 """
309 if not self._enable_locks:
310 raise errors.ProgrammerError("Attempted to use disabled locks")
311
313 """Request locks from WConfD and wait for them to be granted.
314
315 @type request: list
316 @param request: the lock request to be sent to WConfD
317 @type timeout: float
318 @param timeout: the time to wait for the request to be granted
319 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
320 amount of time; in this case, locks still might be acquired or a request
321 pending.
322
323 """
324 logging.debug("Trying %ss to request %s for %s",
325 timeout, request, self._wconfdcontext)
326 if self._cbs:
327 priority = self._cbs.CurrentPriority()
328 else:
329 priority = None
330
331 if priority is None:
332 priority = constants.OP_PRIO_DEFAULT
333
334
335 if sighupReceived[0]:
336 logging.warning("Ignoring unexpected SIGHUP")
337 sighupReceived[0] = False
338
339
340 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
341 request)
342 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
343
344 if pending:
345 def _HasPending():
346 if sighupReceived[0]:
347 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
348 else:
349 return True
350
351 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
352
353 signal = sighupReceived[0]
354
355 if pending:
356 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
357
358 if pending and signal:
359 logging.warning("Ignoring unexpected SIGHUP")
360 sighupReceived[0] = False
361
362 logging.debug("Finished trying. Pending: %s", pending)
363 if pending:
364 raise LockAcquireTimeout()
365
366 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
367 opportunistic_count=1, request_only=False):
368 """Acquires locks via the Ganeti lock manager.
369
370 @type level: int
371 @param level: Lock level
372 @type names: list or string
373 @param names: Lock names
374 @type shared: bool
375 @param shared: Whether the locks should be acquired in shared mode
376 @type opportunistic: bool
377 @param opportunistic: Whether to acquire opportunistically
378 @type timeout: None or float
379 @param timeout: Timeout for acquiring the locks
380 @type request_only: bool
381 @param request_only: do not acquire the locks, just return the request
382 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
383 amount of time; in this case, locks still might be acquired or a request
384 pending.
385
386 """
387 self._CheckLocksEnabled()
388
389 if self._cbs:
390 priority = self._cbs.CurrentPriority()
391 else:
392 priority = None
393
394 if priority is None:
395 priority = constants.OP_PRIO_DEFAULT
396
397 if names == locking.ALL_SET:
398 if opportunistic:
399 expand_fns = {
400 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
401 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
402 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
403 locking.LEVEL_NODE: self.cfg.GetNodeList,
404 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
405 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
406 }
407 names = expand_fns[level]()
408 else:
409 names = locking.LOCKSET_NAME
410
411 names = _LockList(names)
412
413
414 names.sort()
415
416 levelname = locking.LEVEL_NAMES[level]
417
418 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
419
420 if not names:
421 logging.debug("Acquiring no locks for (%s) at level %s",
422 self._wconfdcontext, levelname)
423 return []
424
425 if shared:
426 request = [[lock, "shared"] for lock in locks]
427 else:
428 request = [[lock, "exclusive"] for lock in locks]
429
430 if request_only:
431 logging.debug("Lock request for level %s is %s", level, request)
432 return request
433
434 self.cfg.OutDate()
435
436 if timeout is None:
437
438
439
440 logging.info("Definitely requesting %s for %s",
441 request, self._wconfdcontext)
442
443
444 for r in request:
445 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
446 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
447 [r])
448 while True:
449 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
450 if not pending:
451 break
452 time.sleep(10.0 * random.random())
453
454 elif opportunistic:
455 logging.debug("For %ss trying to opportunistically acquire"
456 " at least %d of %s for %s.",
457 timeout, opportunistic_count, locks, self._wconfdcontext)
458 locks = utils.SimpleRetry(
459 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
460 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
461 logging.debug("Managed to get the following locks: %s", locks)
462 if locks == []:
463 raise LockAcquireTimeout()
464 else:
465 self._RequestAndWait(request, timeout)
466
467 return locks
468
470 """Logical Unit execution sequence.
471
472 """
473 write_count = self.cfg.write_count
474 lu.cfg.OutDate()
475 lu.CheckPrereq()
476
477 hm = self.BuildHooksManager(lu)
478 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
479 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
480 self.Log, None)
481
482 if getattr(lu.op, "dry_run", False):
483
484
485
486 self.LogInfo("dry-run mode requested, not actually executing"
487 " the operation")
488 return lu.dry_run_result
489
490 if self._cbs:
491 submit_mj_fn = self._cbs.SubmitManyJobs
492 else:
493 submit_mj_fn = _FailingSubmitManyJobs
494
495 lusExecuting[0] += 1
496 try:
497 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
498 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
499 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
500 self.Log, result)
501 finally:
502
503 lusExecuting[0] -= 1
504 if write_count != self.cfg.write_count:
505 hm.RunConfigUpdate()
506
507 return result
508
511
513 """Execute a Logical Unit, with the needed locks.
514
515 This is a recursive function that starts locking the given level, and
516 proceeds up, till there are no more locks to acquire. Then it executes the
517 given LU and its opcodes.
518
519 """
520 pending = pending or []
521 logging.debug("Looking at locks of level %s, still need to obtain %s",
522 level, pending)
523 adding_locks = level in lu.add_locks
524 acquiring_locks = level in lu.needed_locks
525
526 if level not in locking.LEVELS:
527 if pending:
528 self._RequestAndWait(pending, calc_timeout())
529 lu.cfg.OutDate()
530 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
531 pending = []
532
533 logging.debug("Finished acquiring locks")
534
535 if self._cbs:
536 self._cbs.NotifyStart()
537
538 try:
539 result = self._ExecLU(lu)
540 except errors.OpPrereqError, err:
541 if len(err.args) < 2 or err.args[1] != errors.ECODE_TEMP_NORES:
542 raise
543
544 logging.debug("Temporarily out of resources; will retry internally")
545 try:
546 lu.PrepareRetry(self.Log)
547 if self._cbs:
548 self._cbs.NotifyRetry()
549 except errors.OpRetryNotSupportedError:
550 logging.debug("LU does not know how to retry.")
551 raise err
552 raise LockAcquireTimeout()
553 except AssertionError, err:
554
555
556
557 (_, _, tb) = sys.exc_info()
558 err_info = traceback.format_tb(tb)
559 del tb
560 logging.exception("Detected AssertionError")
561 raise errors.OpExecError("Internal assertion error: please report"
562 " this as a bug.\nError message: '%s';"
563 " location:\n%s" % (str(err), err_info[-1]))
564 return result
565
566
567 opportunistic = lu.opportunistic_locks[level]
568
569 dont_collate = lu.dont_collate_locks[level]
570
571 if dont_collate and pending:
572 self._RequestAndWait(pending, calc_timeout())
573 lu.cfg.OutDate()
574 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
575 pending = []
576
577 if adding_locks and opportunistic:
578
579
580
581 raise NotImplementedError("Can't opportunistically acquire locks when"
582 " adding new ones")
583
584 if adding_locks and acquiring_locks and \
585 lu.needed_locks[level] == locking.ALL_SET:
586
587
588 raise NotImplementedError("Can't request all locks of a certain level"
589 " and add new locks")
590
591 if adding_locks or acquiring_locks:
592 self._CheckLocksEnabled()
593
594 lu.DeclareLocks(level)
595 share = lu.share_locks[level]
596 opportunistic_count = lu.opportunistic_locks_count[level]
597
598 try:
599 if acquiring_locks:
600 needed_locks = _LockList(lu.needed_locks[level])
601 else:
602 needed_locks = []
603
604 if adding_locks:
605 needed_locks.extend(_LockList(lu.add_locks[level]))
606
607 timeout = calc_timeout()
608 if timeout is not None and not opportunistic:
609 pending = pending + self._AcquireLocks(level, needed_locks, share,
610 opportunistic, timeout,
611 request_only=True)
612 else:
613 if pending:
614 self._RequestAndWait(pending, calc_timeout())
615 lu.cfg.OutDate()
616 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
617 pending = []
618 self._AcquireLocks(level, needed_locks, share, opportunistic,
619 timeout,
620 opportunistic_count=opportunistic_count)
621 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
622
623 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
624 pending=pending)
625 finally:
626 levelname = locking.LEVEL_NAMES[level]
627 logging.debug("Freeing locks at level %s for %s",
628 levelname, self._wconfdcontext)
629 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
630 else:
631 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
632
633 return result
634
635
637 """Check the LU result against the contract in the opcode.
638
639 """
640 resultcheck_fn = op.OP_RESULT
641 if not (resultcheck_fn is None or resultcheck_fn(result)):
642 logging.error("Expected opcode result matching %s, got %s",
643 resultcheck_fn, result)
644 if not getattr(op, "dry_run", False):
645
646
647
648 raise errors.OpResultError("Opcode result does not match %s: %s" %
649 (resultcheck_fn, utils.Truncate(result, 80)))
650
652 """Execute an opcode.
653
654 @type op: an OpCode instance
655 @param op: the opcode to be executed
656 @type cbs: L{OpExecCbBase}
657 @param cbs: Runtime callbacks
658 @type timeout: float or None
659 @param timeout: Maximum time to acquire all locks, None for no timeout
660 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
661 amount of time
662
663 """
664 if not isinstance(op, opcodes.OpCode):
665 raise errors.ProgrammerError("Non-opcode instance passed"
666 " to ExecOpcode (%s)" % type(op))
667
668 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
669 if lu_class is None:
670 raise errors.OpCodeUnknown("Unknown opcode")
671
672 if timeout is None:
673 calc_timeout = lambda: None
674 else:
675 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
676
677 self._cbs = cbs
678 try:
679 if self._enable_locks:
680
681
682
683 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
684 not lu_class.REQ_BGL, False, calc_timeout())
685 elif lu_class.REQ_BGL:
686 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
687 " disabled" % op.OP_ID)
688
689 lu = lu_class(self, op, self.context, self.cfg, self.rpc,
690 self._wconfdcontext, self.wconfd)
691 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
692 lu.ExpandNames()
693 assert lu.needed_locks is not None, "needed_locks not set by LU"
694
695 try:
696 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
697 calc_timeout)
698 finally:
699 if self._ec_id:
700 self.cfg.DropECReservations(self._ec_id)
701 finally:
702 self.wconfd.Client().FreeLocksLevel(
703 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
704 self._cbs = None
705
706 self._CheckLUResult(op, result)
707
708 return result
709
710 - def Log(self, *args):
711 """Forward call to feedback callback function.
712
713 """
714 if self._cbs:
715 self._cbs.Feedback(*args)
716
717 - def LogStep(self, current, total, message):
718 """Log a change in LU execution progress.
719
720 """
721 logging.debug("Step %d/%d %s", current, total, message)
722 self.Log("STEP %d/%d %s" % (current, total, message))
723
725 """Log a warning to the logs and the user.
726
727 The optional keyword argument is 'hint' and can be used to show a
728 hint to the user (presumably related to the warning). If the
729 message is empty, it will not be printed at all, allowing one to
730 show only a hint.
731
732 """
733 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
734 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
735 if args:
736 message = message % tuple(args)
737 if message:
738 logging.warning(message)
739 self.Log(" - WARNING: %s" % message)
740 if "hint" in kwargs:
741 self.Log(" Hint: %s" % kwargs["hint"])
742
743 - def LogInfo(self, message, *args):
751
753 """Returns the current execution context ID.
754
755 """
756 if not self._ec_id:
757 raise errors.ProgrammerError("Tried to use execution context id when"
758 " not set")
759 return self._ec_id
760