1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56
57
58 _OP_PREFIX = "Op"
59 _LU_PREFIX = "LU"
60
61
62
63 _NODE_ALLOC_WHITELIST = frozenset([])
64
65
66
67
68 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
69 cmdlib.LUBackupExport,
70 cmdlib.LUBackupRemove,
71 cmdlib.LUOobCommand,
72 ])
73
74
76 """Exception to report timeouts on acquiring locks.
77
78 """
79
80
104
105
107 """Class with lock acquire timeout strategy.
108
109 """
110 __slots__ = [
111 "_timeouts",
112 "_random_fn",
113 "_time_fn",
114 ]
115
116 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
117
118 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
119 """Initializes this class.
120
121 @param _time_fn: Time function for unittests
122 @param _random_fn: Random number generator for unittests
123
124 """
125 object.__init__(self)
126
127 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
128 self._time_fn = _time_fn
129 self._random_fn = _random_fn
130
132 """Returns the timeout for the next attempt.
133
134 """
135 try:
136 timeout = self._timeouts.next()
137 except StopIteration:
138
139 timeout = None
140
141 if timeout is not None:
142
143
144 variation_range = timeout * 0.1
145 timeout += ((self._random_fn() * variation_range) -
146 (variation_range * 0.5))
147
148 return timeout
149
150
152 """Base class for OpCode execution callbacks.
153
154 """
156 """Called when we are about to execute the LU.
157
158 This function is called when we're about to start the lu's Exec() method,
159 that is, after we have acquired all locks.
160
161 """
162
164 """Sends feedback from the LU code to the end-user.
165
166 """
167
169 """Returns current priority or C{None}.
170
171 """
172 return None
173
175 """Submits jobs for processing.
176
177 See L{jqueue.JobQueue.SubmitManyJobs}.
178
179 """
180 raise NotImplementedError
181
182
184 """Computes the LU name for a given OpCode name.
185
186 """
187 assert opname.startswith(_OP_PREFIX), \
188 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
189
190 return _LU_PREFIX + opname[len(_OP_PREFIX):]
191
192
200
201
203 """Copies basic opcode parameters.
204
205 @type src: L{opcodes.OpCode}
206 @param src: Source opcode
207 @type defcomment: string
208 @param defcomment: Comment to specify if not already given
209 @type dst: L{opcodes.OpCode}
210 @param dst: Destination opcode
211
212 """
213 if hasattr(src, "debug_level"):
214 dst.debug_level = src.debug_level
215
216 if (getattr(dst, "priority", None) is None and
217 hasattr(src, "priority")):
218 dst.priority = src.priority
219
220 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
221 dst.comment = defcomment
222
223 if hasattr(src, constants.OPCODE_REASON):
224 dst.reason = getattr(dst, constants.OPCODE_REASON, [])
225 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
226
227
229 """Examines opcode result.
230
231 If necessary, additional processing on the result is done.
232
233 """
234 if isinstance(result, cmdlib.ResultWithJobs):
235
236 map(compat.partial(_SetBaseOpParams, op,
237 "Submitted by %s" % op.OP_ID),
238 itertools.chain(*result.jobs))
239
240
241 job_submission = submit_fn(result.jobs)
242
243
244 result = result.other
245
246 assert constants.JOB_IDS_KEY not in result, \
247 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
248
249 result[constants.JOB_IDS_KEY] = job_submission
250
251 return result
252
253
255 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
256
257 """
258 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
259 " queries) can not submit jobs")
260
261
264 """Performs consistency checks on locks acquired by a logical unit.
265
266 @type lu: L{cmdlib.LogicalUnit}
267 @param lu: Logical unit instance
268 @type glm: L{locking.GanetiLockManager}
269 @param glm: Lock manager
270
271 """
272 if not __debug__:
273 return
274
275 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
276
277 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
278
279 if level in lu.needed_locks:
280 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
281 share_level = lu.share_locks[level]
282
283 if lu.__class__ in _mode_whitelist:
284 assert share_node_alloc != share_level, \
285 "LU is whitelisted to use different modes for node allocation lock"
286 else:
287 assert bool(share_node_alloc) == bool(share_level), \
288 ("Node allocation lock must be acquired using the same mode as nodes"
289 " and node resources")
290
291 if lu.__class__ in _nal_whitelist:
292 assert not have_nal, \
293 "LU is whitelisted for not acquiring the node allocation lock"
294 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
295 assert have_nal, \
296 ("Node allocation lock must be used if an LU acquires all nodes"
297 " or node resources")
298
299
301 """Object which runs OpCodes"""
302 DISPATCH_TABLE = _ComputeDispatchTable()
303
304 - def __init__(self, context, ec_id, enable_locks=True):
305 """Constructor for Processor
306
307 @type context: GanetiContext
308 @param context: global Ganeti context
309 @type ec_id: string
310 @param ec_id: execution context identifier
311
312 """
313 self.context = context
314 self._ec_id = ec_id
315 self._cbs = None
316 self.rpc = context.rpc
317 self.hmclass = hooksmaster.HooksMaster
318 self._enable_locks = enable_locks
319
321 """Checks if locking is enabled.
322
323 @raise errors.ProgrammerError: In case locking is not enabled
324
325 """
326 if not self._enable_locks:
327 raise errors.ProgrammerError("Attempted to use disabled locks")
328
329 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330 """Acquires locks via the Ganeti lock manager.
331
332 @type level: int
333 @param level: Lock level
334 @type names: list or string
335 @param names: Lock names
336 @type shared: bool
337 @param shared: Whether the locks should be acquired in shared mode
338 @type opportunistic: bool
339 @param opportunistic: Whether to acquire opportunistically
340 @type timeout: None or float
341 @param timeout: Timeout for acquiring the locks
342 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
343 amount of time
344
345 """
346 self._CheckLocksEnabled()
347
348 if self._cbs:
349 priority = self._cbs.CurrentPriority()
350 else:
351 priority = None
352
353 acquired = self.context.glm.acquire(level, names, shared=shared,
354 timeout=timeout, priority=priority,
355 opportunistic=opportunistic)
356
357 if acquired is None:
358 raise LockAcquireTimeout()
359
360 return acquired
361
363 """Logical Unit execution sequence.
364
365 """
366 write_count = self.context.cfg.write_count
367 lu.CheckPrereq()
368
369 hm = self.BuildHooksManager(lu)
370 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
371 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
372 self.Log, None)
373
374 if getattr(lu.op, "dry_run", False):
375
376
377
378 self.LogInfo("dry-run mode requested, not actually executing"
379 " the operation")
380 return lu.dry_run_result
381
382 if self._cbs:
383 submit_mj_fn = self._cbs.SubmitManyJobs
384 else:
385 submit_mj_fn = _FailingSubmitManyJobs
386
387 try:
388 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
389 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
390 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
391 self.Log, result)
392 finally:
393
394 if write_count != self.context.cfg.write_count:
395 hm.RunConfigUpdate()
396
397 return result
398
401
403 """Execute a Logical Unit, with the needed locks.
404
405 This is a recursive function that starts locking the given level, and
406 proceeds up, till there are no more locks to acquire. Then it executes the
407 given LU and its opcodes.
408
409 """
410 glm = self.context.glm
411 adding_locks = level in lu.add_locks
412 acquiring_locks = level in lu.needed_locks
413
414 if level not in locking.LEVELS:
415 _VerifyLocks(lu, glm)
416
417 if self._cbs:
418 self._cbs.NotifyStart()
419
420 try:
421 result = self._ExecLU(lu)
422 except AssertionError, err:
423
424
425
426 (_, _, tb) = sys.exc_info()
427 err_info = traceback.format_tb(tb)
428 del tb
429 logging.exception("Detected AssertionError")
430 raise errors.OpExecError("Internal assertion error: please report"
431 " this as a bug.\nError message: '%s';"
432 " location:\n%s" % (str(err), err_info[-1]))
433
434 elif adding_locks and acquiring_locks:
435
436
437 raise NotImplementedError("Can't declare locks to acquire when adding"
438 " others")
439
440 elif adding_locks or acquiring_locks:
441 self._CheckLocksEnabled()
442
443 lu.DeclareLocks(level)
444 share = lu.share_locks[level]
445 opportunistic = lu.opportunistic_locks[level]
446
447 try:
448 assert adding_locks ^ acquiring_locks, \
449 "Locks must be either added or acquired"
450
451 if acquiring_locks:
452
453 needed_locks = lu.needed_locks[level]
454
455 self._AcquireLocks(level, needed_locks, share, opportunistic,
456 calc_timeout())
457 else:
458
459 add_locks = lu.add_locks[level]
460 lu.remove_locks[level] = add_locks
461
462 try:
463 glm.add(level, add_locks, acquired=1, shared=share)
464 except errors.LockError:
465 logging.exception("Detected lock error in level %s for locks"
466 " %s, shared=%s", level, add_locks, share)
467 raise errors.OpPrereqError(
468 "Couldn't add locks (%s), most likely because of another"
469 " job who added them first" % add_locks,
470 errors.ECODE_NOTUNIQUE)
471
472 try:
473 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
474 finally:
475 if level in lu.remove_locks:
476 glm.remove(level, lu.remove_locks[level])
477 finally:
478 if glm.is_owned(level):
479 glm.release(level)
480
481 else:
482 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
483
484 return result
485
486
488 """Check the LU result against the contract in the opcode.
489
490 """
491 resultcheck_fn = op.OP_RESULT
492 if not (resultcheck_fn is None or resultcheck_fn(result)):
493 logging.error("Expected opcode result matching %s, got %s",
494 resultcheck_fn, result)
495 if not getattr(op, "dry_run", False):
496
497
498
499 raise errors.OpResultError("Opcode result does not match %s: %s" %
500 (resultcheck_fn, utils.Truncate(result, 80)))
501
503 """Execute an opcode.
504
505 @type op: an OpCode instance
506 @param op: the opcode to be executed
507 @type cbs: L{OpExecCbBase}
508 @param cbs: Runtime callbacks
509 @type timeout: float or None
510 @param timeout: Maximum time to acquire all locks, None for no timeout
511 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
512 amount of time
513
514 """
515 if not isinstance(op, opcodes.OpCode):
516 raise errors.ProgrammerError("Non-opcode instance passed"
517 " to ExecOpcode (%s)" % type(op))
518
519 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
520 if lu_class is None:
521 raise errors.OpCodeUnknown("Unknown opcode")
522
523 if timeout is None:
524 calc_timeout = lambda: None
525 else:
526 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
527
528 self._cbs = cbs
529 try:
530 if self._enable_locks:
531
532
533
534 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
535 not lu_class.REQ_BGL, False, calc_timeout())
536 elif lu_class.REQ_BGL:
537 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
538 " disabled" % op.OP_ID)
539
540 try:
541 lu = lu_class(self, op, self.context, self.rpc)
542 lu.ExpandNames()
543 assert lu.needed_locks is not None, "needed_locks not set by LU"
544
545 try:
546 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
547 calc_timeout)
548 finally:
549 if self._ec_id:
550 self.context.cfg.DropECReservations(self._ec_id)
551 finally:
552
553 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
554 assert self._enable_locks
555 self.context.glm.release(locking.LEVEL_CLUSTER)
556 finally:
557 self._cbs = None
558
559 self._CheckLUResult(op, result)
560
561 return result
562
563 - def Log(self, *args):
564 """Forward call to feedback callback function.
565
566 """
567 if self._cbs:
568 self._cbs.Feedback(*args)
569
570 - def LogStep(self, current, total, message):
571 """Log a change in LU execution progress.
572
573 """
574 logging.debug("Step %d/%d %s", current, total, message)
575 self.Log("STEP %d/%d %s" % (current, total, message))
576
578 """Log a warning to the logs and the user.
579
580 The optional keyword argument is 'hint' and can be used to show a
581 hint to the user (presumably related to the warning). If the
582 message is empty, it will not be printed at all, allowing one to
583 show only a hint.
584
585 """
586 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
587 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
588 if args:
589 message = message % tuple(args)
590 if message:
591 logging.warning(message)
592 self.Log(" - WARNING: %s" % message)
593 if "hint" in kwargs:
594 self.Log(" Hint: %s" % kwargs["hint"])
595
596 - def LogInfo(self, message, *args):
604
606 """Returns the current execution context ID.
607
608 """
609 if not self._ec_id:
610 raise errors.ProgrammerError("Tried to use execution context id when"
611 " not set")
612 return self._ec_id
613