1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
95
96
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129
130 timeout = None
131
132 if timeout is not None:
133
134
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
143 """Base class for OpCode execution callbacks.
144
145 """
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
198
199
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233
234 map(compat.partial(_SetBaseOpParams, op,
235 "Submitted by %s" % op.OP_ID),
236 itertools.chain(*result.jobs))
237
238
239 job_submission = submit_fn(result.jobs)
240
241
242 result = result.other
243
244 assert constants.JOB_IDS_KEY not in result, \
245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
246
247 result[constants.JOB_IDS_KEY] = job_submission
248
249 return result
250
251
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
254
255 """
256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
257 " queries) can not submit jobs")
258
259
261 """If 'names' is a string, make it a single-element list.
262
263 @type names: list or string or NoneType
264 @param names: Lock names
265 @rtype: a list of strings
266 @return: if 'names' argument is an iterable, a list of it;
267 if it's a string, make it a one-element list;
268 if L{locking.ALL_SET}, L{locking.ALL_SET}
269
270 """
271 if names == locking.ALL_SET:
272 return names
273 elif isinstance(names, basestring):
274 return [names]
275 else:
276 return list(names)
277
278
280 """Object which runs OpCodes"""
281 DISPATCH_TABLE = _ComputeDispatchTable()
282
283 - def __init__(self, context, ec_id, enable_locks=True):
284 """Constructor for Processor
285
286 @type context: GanetiContext
287 @param context: global Ganeti context
288 @type ec_id: string
289 @param ec_id: execution context identifier
290
291 """
292 self.context = context
293 self._ec_id = ec_id
294 self._cbs = None
295 self.cfg = context.GetConfig(ec_id)
296 self.rpc = context.GetRpc(self.cfg)
297 self.hmclass = hooksmaster.HooksMaster
298 self._enable_locks = enable_locks
299 self.wconfd = wconfd
300 self._wconfdcontext = context.GetWConfdContext(ec_id)
301
303 """Checks if locking is enabled.
304
305 @raise errors.ProgrammerError: In case locking is not enabled
306
307 """
308 if not self._enable_locks:
309 raise errors.ProgrammerError("Attempted to use disabled locks")
310
312 """Request locks from WConfD and wait for them to be granted.
313
314 @type request: list
315 @param request: the lock request to be sent to WConfD
316 @type timeout: float
317 @param timeout: the time to wait for the request to be granted
318 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
319 amount of time; in this case, locks still might be acquired or a request
320 pending.
321
322 """
323 logging.debug("Trying %ss to request %s for %s",
324 timeout, request, self._wconfdcontext)
325 if self._cbs:
326 priority = self._cbs.CurrentPriority()
327 else:
328 priority = None
329
330 if priority is None:
331 priority = constants.OP_PRIO_DEFAULT
332
333
334 if sighupReceived[0]:
335 logging.warning("Ignoring unexpected SIGHUP")
336 sighupReceived[0] = False
337
338
339 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
340 request)
341 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
342
343 if pending:
344 def _HasPending():
345 if sighupReceived[0]:
346 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
347 else:
348 return True
349
350 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
351
352 signal = sighupReceived[0]
353
354 if pending:
355 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
356
357 if pending and signal:
358 logging.warning("Ignoring unexpected SIGHUP")
359 sighupReceived[0] = False
360
361 logging.debug("Finished trying. Pending: %s", pending)
362 if pending:
363 raise LockAcquireTimeout()
364
365 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
366 opportunistic_count=1, request_only=False):
367 """Acquires locks via the Ganeti lock manager.
368
369 @type level: int
370 @param level: Lock level
371 @type names: list or string
372 @param names: Lock names
373 @type shared: bool
374 @param shared: Whether the locks should be acquired in shared mode
375 @type opportunistic: bool
376 @param opportunistic: Whether to acquire opportunistically
377 @type timeout: None or float
378 @param timeout: Timeout for acquiring the locks
379 @type request_only: bool
380 @param request_only: do not acquire the locks, just return the request
381 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
382 amount of time; in this case, locks still might be acquired or a request
383 pending.
384
385 """
386 self._CheckLocksEnabled()
387
388 if self._cbs:
389 priority = self._cbs.CurrentPriority()
390 else:
391 priority = None
392
393 if priority is None:
394 priority = constants.OP_PRIO_DEFAULT
395
396 if names == locking.ALL_SET:
397 if opportunistic:
398 expand_fns = {
399 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
400 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
401 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
402 locking.LEVEL_NODE: self.cfg.GetNodeList,
403 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
404 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
405 }
406 names = expand_fns[level]()
407 else:
408 names = locking.LOCKSET_NAME
409
410 names = _LockList(names)
411
412
413 names.sort()
414
415 levelname = locking.LEVEL_NAMES[level]
416
417 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
418
419 if not names:
420 logging.debug("Acquiring no locks for (%s) at level %s",
421 self._wconfdcontext, levelname)
422 return []
423
424 if shared:
425 request = [[lock, "shared"] for lock in locks]
426 else:
427 request = [[lock, "exclusive"] for lock in locks]
428
429 if request_only:
430 logging.debug("Lock request for level %s is %s", level, request)
431 return request
432
433 self.cfg.OutDate()
434
435 if timeout is None:
436
437
438
439 logging.info("Definitely requesting %s for %s",
440 request, self._wconfdcontext)
441
442
443 for r in request:
444 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
445 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
446 [r])
447 while True:
448 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
449 if not pending:
450 break
451 time.sleep(10.0 * random.random())
452
453 elif opportunistic:
454 logging.debug("For %ss trying to opportunistically acquire"
455 " at least %d of %s for %s.",
456 timeout, opportunistic_count, locks, self._wconfdcontext)
457 locks = utils.SimpleRetry(
458 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
459 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
460 logging.debug("Managed to get the following locks: %s", locks)
461 if locks == []:
462 raise LockAcquireTimeout()
463 else:
464 self._RequestAndWait(request, timeout)
465
466 return locks
467
469 """Logical Unit execution sequence.
470
471 """
472 write_count = self.cfg.write_count
473 lu.cfg.OutDate()
474 lu.CheckPrereq()
475
476 hm = self.BuildHooksManager(lu)
477 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
478 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
479 self.Log, None)
480
481 if getattr(lu.op, "dry_run", False):
482
483
484
485 self.LogInfo("dry-run mode requested, not actually executing"
486 " the operation")
487 return lu.dry_run_result
488
489 if self._cbs:
490 submit_mj_fn = self._cbs.SubmitManyJobs
491 else:
492 submit_mj_fn = _FailingSubmitManyJobs
493
494 lusExecuting[0] += 1
495 try:
496 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
497 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
498 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
499 self.Log, result)
500 finally:
501
502 lusExecuting[0] -= 1
503 if write_count != self.cfg.write_count:
504 hm.RunConfigUpdate()
505
506 return result
507
510
512 """Execute a Logical Unit, with the needed locks.
513
514 This is a recursive function that starts locking the given level, and
515 proceeds up, till there are no more locks to acquire. Then it executes the
516 given LU and its opcodes.
517
518 """
519 pending = pending or []
520 logging.debug("Looking at locks of level %s, still need to obtain %s",
521 level, pending)
522 adding_locks = level in lu.add_locks
523 acquiring_locks = level in lu.needed_locks
524
525 if level not in locking.LEVELS:
526 if pending:
527 self._RequestAndWait(pending, calc_timeout())
528 lu.cfg.OutDate()
529 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
530 pending = []
531
532 logging.debug("Finished acquiring locks")
533
534 if self._cbs:
535 self._cbs.NotifyStart()
536
537 try:
538 result = self._ExecLU(lu)
539 except errors.OpPrereqError, err:
540 (_, ecode) = err.args
541 if ecode != errors.ECODE_TEMP_NORES:
542 raise
543 logging.debug("Temporarily out of resources; will retry internally")
544 try:
545 lu.PrepareRetry(self.Log)
546 if self._cbs:
547 self._cbs.NotifyRetry()
548 except errors.OpRetryNotSupportedError:
549 logging.debug("LU does not know how to retry.")
550 raise err
551 raise LockAcquireTimeout()
552 except AssertionError, err:
553
554
555
556 (_, _, tb) = sys.exc_info()
557 err_info = traceback.format_tb(tb)
558 del tb
559 logging.exception("Detected AssertionError")
560 raise errors.OpExecError("Internal assertion error: please report"
561 " this as a bug.\nError message: '%s';"
562 " location:\n%s" % (str(err), err_info[-1]))
563 return result
564
565
566 opportunistic = lu.opportunistic_locks[level]
567
568 dont_collate = lu.dont_collate_locks[level]
569
570 if dont_collate and pending:
571 self._RequestAndWait(pending, calc_timeout())
572 lu.cfg.OutDate()
573 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
574 pending = []
575
576 if adding_locks and opportunistic:
577
578
579
580 raise NotImplementedError("Can't opportunistically acquire locks when"
581 " adding new ones")
582
583 if adding_locks and acquiring_locks and \
584 lu.needed_locks[level] == locking.ALL_SET:
585
586
587 raise NotImplementedError("Can't request all locks of a certain level"
588 " and add new locks")
589
590 if adding_locks or acquiring_locks:
591 self._CheckLocksEnabled()
592
593 lu.DeclareLocks(level)
594 share = lu.share_locks[level]
595 opportunistic_count = lu.opportunistic_locks_count[level]
596
597 try:
598 if acquiring_locks:
599 needed_locks = _LockList(lu.needed_locks[level])
600 else:
601 needed_locks = []
602
603 if adding_locks:
604 needed_locks.extend(_LockList(lu.add_locks[level]))
605
606 timeout = calc_timeout()
607 if timeout is not None and not opportunistic:
608 pending = pending + self._AcquireLocks(level, needed_locks, share,
609 opportunistic, timeout,
610 request_only=True)
611 else:
612 if pending:
613 self._RequestAndWait(pending, calc_timeout())
614 lu.cfg.OutDate()
615 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
616 pending = []
617 self._AcquireLocks(level, needed_locks, share, opportunistic,
618 timeout,
619 opportunistic_count=opportunistic_count)
620 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
621
622 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
623 pending=pending)
624 finally:
625 levelname = locking.LEVEL_NAMES[level]
626 logging.debug("Freeing locks at level %s for %s",
627 levelname, self._wconfdcontext)
628 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
629 else:
630 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
631
632 return result
633
634
636 """Check the LU result against the contract in the opcode.
637
638 """
639 resultcheck_fn = op.OP_RESULT
640 if not (resultcheck_fn is None or resultcheck_fn(result)):
641 logging.error("Expected opcode result matching %s, got %s",
642 resultcheck_fn, result)
643 if not getattr(op, "dry_run", False):
644
645
646
647 raise errors.OpResultError("Opcode result does not match %s: %s" %
648 (resultcheck_fn, utils.Truncate(result, 80)))
649
651 """Execute an opcode.
652
653 @type op: an OpCode instance
654 @param op: the opcode to be executed
655 @type cbs: L{OpExecCbBase}
656 @param cbs: Runtime callbacks
657 @type timeout: float or None
658 @param timeout: Maximum time to acquire all locks, None for no timeout
659 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
660 amount of time
661
662 """
663 if not isinstance(op, opcodes.OpCode):
664 raise errors.ProgrammerError("Non-opcode instance passed"
665 " to ExecOpcode (%s)" % type(op))
666
667 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
668 if lu_class is None:
669 raise errors.OpCodeUnknown("Unknown opcode")
670
671 if timeout is None:
672 calc_timeout = lambda: None
673 else:
674 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
675
676 self._cbs = cbs
677 try:
678 if self._enable_locks:
679
680
681
682 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
683 not lu_class.REQ_BGL, False, calc_timeout())
684 elif lu_class.REQ_BGL:
685 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
686 " disabled" % op.OP_ID)
687
688 lu = lu_class(self, op, self.context, self.cfg, self.rpc,
689 self._wconfdcontext, self.wconfd)
690 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
691 lu.ExpandNames()
692 assert lu.needed_locks is not None, "needed_locks not set by LU"
693
694 try:
695 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
696 calc_timeout)
697 finally:
698 if self._ec_id:
699 self.cfg.DropECReservations(self._ec_id)
700 finally:
701 self.wconfd.Client().FreeLocksLevel(
702 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
703 self._cbs = None
704
705 self._CheckLUResult(op, result)
706
707 return result
708
709 - def Log(self, *args):
710 """Forward call to feedback callback function.
711
712 """
713 if self._cbs:
714 self._cbs.Feedback(*args)
715
716 - def LogStep(self, current, total, message):
717 """Log a change in LU execution progress.
718
719 """
720 logging.debug("Step %d/%d %s", current, total, message)
721 self.Log("STEP %d/%d %s" % (current, total, message))
722
724 """Log a warning to the logs and the user.
725
726 The optional keyword argument is 'hint' and can be used to show a
727 hint to the user (presumably related to the warning). If the
728 message is empty, it will not be printed at all, allowing one to
729 show only a hint.
730
731 """
732 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
733 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
734 if args:
735 message = message % tuple(args)
736 if message:
737 logging.warning(message)
738 self.Log(" - WARNING: %s" % message)
739 if "hint" in kwargs:
740 self.Log(" Hint: %s" % kwargs["hint"])
741
742 - def LogInfo(self, message, *args):
750
752 """Returns the current execution context ID.
753
754 """
755 if not self._ec_id:
756 raise errors.ProgrammerError("Tried to use execution context id when"
757 " not set")
758 return self._ec_id
759