1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import sys
32 import logging
33 import random
34 import time
35 import itertools
36 import traceback
37
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hooksmaster
42 from ganeti import cmdlib
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import compat
46
47
48 _OP_PREFIX = "Op"
49 _LU_PREFIX = "LU"
50
51
52
53 _NODE_ALLOC_WHITELIST = frozenset([])
54
55
56
57
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59 cmdlib.LUBackupExport,
60 cmdlib.LUBackupRemove,
61 cmdlib.LUOobCommand,
62 ])
63
64
66 """Exception to report timeouts on acquiring locks.
67
68 """
69
70
94
95
97 """Class with lock acquire timeout strategy.
98
99 """
100 __slots__ = [
101 "_timeouts",
102 "_random_fn",
103 "_time_fn",
104 ]
105
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107
108 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
110
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
113
114 """
115 object.__init__(self)
116
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
120
122 """Returns the timeout for the next attempt.
123
124 """
125 try:
126 timeout = self._timeouts.next()
127 except StopIteration:
128
129 timeout = None
130
131 if timeout is not None:
132
133
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
137
138 return timeout
139
140
142 """Base class for OpCode execution callbacks.
143
144 """
146 """Called when we are about to execute the LU.
147
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
150
151 """
152
154 """Sends feedback from the LU code to the end-user.
155
156 """
157
159 """Returns current priority or C{None}.
160
161 """
162 return None
163
165 """Submits jobs for processing.
166
167 See L{jqueue.JobQueue.SubmitManyJobs}.
168
169 """
170 raise NotImplementedError
171
172
174 """Computes the LU name for a given OpCode name.
175
176 """
177 assert opname.startswith(_OP_PREFIX), \
178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179
180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
181
182
190
191
193 """Copies basic opcode parameters.
194
195 @type src: L{opcodes.OpCode}
196 @param src: Source opcode
197 @type defcomment: string
198 @param defcomment: Comment to specify if not already given
199 @type dst: L{opcodes.OpCode}
200 @param dst: Destination opcode
201
202 """
203 if hasattr(src, "debug_level"):
204 dst.debug_level = src.debug_level
205
206 if (getattr(dst, "priority", None) is None and
207 hasattr(src, "priority")):
208 dst.priority = src.priority
209
210 if not getattr(dst, opcodes.COMMENT_ATTR, None):
211 dst.comment = defcomment
212
213 if hasattr(src, constants.OPCODE_REASON):
214 dst.reason = getattr(dst, constants.OPCODE_REASON, [])
215 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
216
217
219 """Examines opcode result.
220
221 If necessary, additional processing on the result is done.
222
223 """
224 if isinstance(result, cmdlib.ResultWithJobs):
225
226 map(compat.partial(_SetBaseOpParams, op,
227 "Submitted by %s" % op.OP_ID),
228 itertools.chain(*result.jobs))
229
230
231 job_submission = submit_fn(result.jobs)
232
233
234 result = result.other
235
236 assert constants.JOB_IDS_KEY not in result, \
237 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
238
239 result[constants.JOB_IDS_KEY] = job_submission
240
241 return result
242
243
245 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
246
247 """
248 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
249 " queries) can not submit jobs")
250
251
254 """Performs consistency checks on locks acquired by a logical unit.
255
256 @type lu: L{cmdlib.LogicalUnit}
257 @param lu: Logical unit instance
258 @type glm: L{locking.GanetiLockManager}
259 @param glm: Lock manager
260
261 """
262 if not __debug__:
263 return
264
265 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
266
267 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
268
269 if level in lu.needed_locks:
270 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
271 share_level = lu.share_locks[level]
272
273 if lu.__class__ in _mode_whitelist:
274 assert share_node_alloc != share_level, \
275 "LU is whitelisted to use different modes for node allocation lock"
276 else:
277 assert bool(share_node_alloc) == bool(share_level), \
278 ("Node allocation lock must be acquired using the same mode as nodes"
279 " and node resources")
280
281 if lu.__class__ in _nal_whitelist:
282 assert not have_nal, \
283 "LU is whitelisted for not acquiring the node allocation lock"
284 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
285 assert have_nal, \
286 ("Node allocation lock must be used if an LU acquires all nodes"
287 " or node resources")
288
289
291 """Object which runs OpCodes"""
292 DISPATCH_TABLE = _ComputeDispatchTable()
293
294 - def __init__(self, context, ec_id, enable_locks=True):
295 """Constructor for Processor
296
297 @type context: GanetiContext
298 @param context: global Ganeti context
299 @type ec_id: string
300 @param ec_id: execution context identifier
301
302 """
303 self.context = context
304 self._ec_id = ec_id
305 self._cbs = None
306 self.rpc = context.rpc
307 self.hmclass = hooksmaster.HooksMaster
308 self._enable_locks = enable_locks
309
311 """Checks if locking is enabled.
312
313 @raise errors.ProgrammerError: In case locking is not enabled
314
315 """
316 if not self._enable_locks:
317 raise errors.ProgrammerError("Attempted to use disabled locks")
318
319 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
320 """Acquires locks via the Ganeti lock manager.
321
322 @type level: int
323 @param level: Lock level
324 @type names: list or string
325 @param names: Lock names
326 @type shared: bool
327 @param shared: Whether the locks should be acquired in shared mode
328 @type opportunistic: bool
329 @param opportunistic: Whether to acquire opportunistically
330 @type timeout: None or float
331 @param timeout: Timeout for acquiring the locks
332 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
333 amount of time
334
335 """
336 self._CheckLocksEnabled()
337
338 if self._cbs:
339 priority = self._cbs.CurrentPriority()
340 else:
341 priority = None
342
343 acquired = self.context.glm.acquire(level, names, shared=shared,
344 timeout=timeout, priority=priority,
345 opportunistic=opportunistic)
346
347 if acquired is None:
348 raise LockAcquireTimeout()
349
350 return acquired
351
353 """Logical Unit execution sequence.
354
355 """
356 write_count = self.context.cfg.write_count
357 lu.CheckPrereq()
358
359 hm = self.BuildHooksManager(lu)
360 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
361 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
362 self.Log, None)
363
364 if getattr(lu.op, "dry_run", False):
365
366
367
368 self.LogInfo("dry-run mode requested, not actually executing"
369 " the operation")
370 return lu.dry_run_result
371
372 if self._cbs:
373 submit_mj_fn = self._cbs.SubmitManyJobs
374 else:
375 submit_mj_fn = _FailingSubmitManyJobs
376
377 try:
378 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
379 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
380 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
381 self.Log, result)
382 finally:
383
384 if write_count != self.context.cfg.write_count:
385 hm.RunConfigUpdate()
386
387 return result
388
391
393 """Execute a Logical Unit, with the needed locks.
394
395 This is a recursive function that starts locking the given level, and
396 proceeds up, till there are no more locks to acquire. Then it executes the
397 given LU and its opcodes.
398
399 """
400 glm = self.context.glm
401 adding_locks = level in lu.add_locks
402 acquiring_locks = level in lu.needed_locks
403
404 if level not in locking.LEVELS:
405 _VerifyLocks(lu, glm)
406
407 if self._cbs:
408 self._cbs.NotifyStart()
409
410 try:
411 result = self._ExecLU(lu)
412 except AssertionError, err:
413
414
415
416 (_, _, tb) = sys.exc_info()
417 err_info = traceback.format_tb(tb)
418 del tb
419 logging.exception("Detected AssertionError")
420 raise errors.OpExecError("Internal assertion error: please report"
421 " this as a bug.\nError message: '%s';"
422 " location:\n%s" % (str(err), err_info[-1]))
423
424 elif adding_locks and acquiring_locks:
425
426
427 raise NotImplementedError("Can't declare locks to acquire when adding"
428 " others")
429
430 elif adding_locks or acquiring_locks:
431 self._CheckLocksEnabled()
432
433 lu.DeclareLocks(level)
434 share = lu.share_locks[level]
435 opportunistic = lu.opportunistic_locks[level]
436
437 try:
438 assert adding_locks ^ acquiring_locks, \
439 "Locks must be either added or acquired"
440
441 if acquiring_locks:
442
443 needed_locks = lu.needed_locks[level]
444
445 self._AcquireLocks(level, needed_locks, share, opportunistic,
446 calc_timeout())
447 else:
448
449 add_locks = lu.add_locks[level]
450 lu.remove_locks[level] = add_locks
451
452 try:
453 glm.add(level, add_locks, acquired=1, shared=share)
454 except errors.LockError:
455 logging.exception("Detected lock error in level %s for locks"
456 " %s, shared=%s", level, add_locks, share)
457 raise errors.OpPrereqError(
458 "Couldn't add locks (%s), most likely because of another"
459 " job who added them first" % add_locks,
460 errors.ECODE_NOTUNIQUE)
461
462 try:
463 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
464 finally:
465 if level in lu.remove_locks:
466 glm.remove(level, lu.remove_locks[level])
467 finally:
468 if glm.is_owned(level):
469 glm.release(level)
470
471 else:
472 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
473
474 return result
475
477 """Execute an opcode.
478
479 @type op: an OpCode instance
480 @param op: the opcode to be executed
481 @type cbs: L{OpExecCbBase}
482 @param cbs: Runtime callbacks
483 @type timeout: float or None
484 @param timeout: Maximum time to acquire all locks, None for no timeout
485 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
486 amount of time
487
488 """
489 if not isinstance(op, opcodes.OpCode):
490 raise errors.ProgrammerError("Non-opcode instance passed"
491 " to ExecOpcode (%s)" % type(op))
492
493 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
494 if lu_class is None:
495 raise errors.OpCodeUnknown("Unknown opcode")
496
497 if timeout is None:
498 calc_timeout = lambda: None
499 else:
500 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
501
502 self._cbs = cbs
503 try:
504 if self._enable_locks:
505
506
507
508 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
509 not lu_class.REQ_BGL, False, calc_timeout())
510 elif lu_class.REQ_BGL:
511 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
512 " disabled" % op.OP_ID)
513
514 try:
515 lu = lu_class(self, op, self.context, self.rpc)
516 lu.ExpandNames()
517 assert lu.needed_locks is not None, "needed_locks not set by LU"
518
519 try:
520 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
521 calc_timeout)
522 finally:
523 if self._ec_id:
524 self.context.cfg.DropECReservations(self._ec_id)
525 finally:
526
527 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
528 assert self._enable_locks
529 self.context.glm.release(locking.LEVEL_CLUSTER)
530 finally:
531 self._cbs = None
532
533 resultcheck_fn = op.OP_RESULT
534 if not (resultcheck_fn is None or resultcheck_fn(result)):
535 logging.error("Expected opcode result matching %s, got %s",
536 resultcheck_fn, result)
537 if not getattr(op, "dry_run", False):
538
539
540
541 raise errors.OpResultError("Opcode result does not match %s: %s" %
542 (resultcheck_fn, utils.Truncate(result, 80)))
543
544 return result
545
546 - def Log(self, *args):
547 """Forward call to feedback callback function.
548
549 """
550 if self._cbs:
551 self._cbs.Feedback(*args)
552
553 - def LogStep(self, current, total, message):
554 """Log a change in LU execution progress.
555
556 """
557 logging.debug("Step %d/%d %s", current, total, message)
558 self.Log("STEP %d/%d %s" % (current, total, message))
559
561 """Log a warning to the logs and the user.
562
563 The optional keyword argument is 'hint' and can be used to show a
564 hint to the user (presumably related to the warning). If the
565 message is empty, it will not be printed at all, allowing one to
566 show only a hint.
567
568 """
569 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
570 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
571 if args:
572 message = message % tuple(args)
573 if message:
574 logging.warning(message)
575 self.Log(" - WARNING: %s" % message)
576 if "hint" in kwargs:
577 self.Log(" Hint: %s" % kwargs["hint"])
578
579 - def LogInfo(self, message, *args):
587
589 """Returns the current execution context ID.
590
591 """
592 if not self._ec_id:
593 raise errors.ProgrammerError("Tried to use execution context id when"
594 " not set")
595 return self._ec_id
596