1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import sys
32 import logging
33 import random
34 import time
35 import itertools
36 import traceback
37
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hooksmaster
42 from ganeti import cmdlib
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import compat
46
47
48 _OP_PREFIX = "Op"
49 _LU_PREFIX = "LU"
50
51
52
53 _NODE_ALLOC_WHITELIST = frozenset([])
54
55
56
57
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59 cmdlib.LUBackupExport,
60 cmdlib.LUBackupRemove,
61 cmdlib.LUOobCommand,
62 ])
63
64
66 """Exception to report timeouts on acquiring locks.
67
68 """
69
70
94
95
97 """Class with lock acquire timeout strategy.
98
99 """
100 __slots__ = [
101 "_timeouts",
102 "_random_fn",
103 "_time_fn",
104 ]
105
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107
108 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
110
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
113
114 """
115 object.__init__(self)
116
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
120
122 """Returns the timeout for the next attempt.
123
124 """
125 try:
126 timeout = self._timeouts.next()
127 except StopIteration:
128
129 timeout = None
130
131 if timeout is not None:
132
133
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
137
138 return timeout
139
140
142 """Base class for OpCode execution callbacks.
143
144 """
146 """Called when we are about to execute the LU.
147
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
150
151 """
152
154 """Sends feedback from the LU code to the end-user.
155
156 """
157
159 """Returns current priority or C{None}.
160
161 """
162 return None
163
165 """Submits jobs for processing.
166
167 See L{jqueue.JobQueue.SubmitManyJobs}.
168
169 """
170 raise NotImplementedError
171
172
174 """Computes the LU name for a given OpCode name.
175
176 """
177 assert opname.startswith(_OP_PREFIX), \
178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179
180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
181
182
190
191
193 """Copies basic opcode parameters.
194
195 @type src: L{opcodes.OpCode}
196 @param src: Source opcode
197 @type defcomment: string
198 @param defcomment: Comment to specify if not already given
199 @type dst: L{opcodes.OpCode}
200 @param dst: Destination opcode
201
202 """
203 if hasattr(src, "debug_level"):
204 dst.debug_level = src.debug_level
205
206 if (getattr(dst, "priority", None) is None and
207 hasattr(src, "priority")):
208 dst.priority = src.priority
209
210 if not getattr(dst, opcodes.COMMENT_ATTR, None):
211 dst.comment = defcomment
212
213
215 """Examines opcode result.
216
217 If necessary, additional processing on the result is done.
218
219 """
220 if isinstance(result, cmdlib.ResultWithJobs):
221
222 map(compat.partial(_SetBaseOpParams, op,
223 "Submitted by %s" % op.OP_ID),
224 itertools.chain(*result.jobs))
225
226
227 job_submission = submit_fn(result.jobs)
228
229
230 result = result.other
231
232 assert constants.JOB_IDS_KEY not in result, \
233 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
234
235 result[constants.JOB_IDS_KEY] = job_submission
236
237 return result
238
239
241 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
242
243 """
244 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245 " queries) can not submit jobs")
246
247
250 """Performs consistency checks on locks acquired by a logical unit.
251
252 @type lu: L{cmdlib.LogicalUnit}
253 @param lu: Logical unit instance
254 @type glm: L{locking.GanetiLockManager}
255 @param glm: Lock manager
256
257 """
258 if not __debug__:
259 return
260
261 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
262
263 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
264
265 if level in lu.needed_locks:
266 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
267 share_level = lu.share_locks[level]
268
269 if lu.__class__ in _mode_whitelist:
270 assert share_node_alloc != share_level, \
271 "LU is whitelisted to use different modes for node allocation lock"
272 else:
273 assert bool(share_node_alloc) == bool(share_level), \
274 ("Node allocation lock must be acquired using the same mode as nodes"
275 " and node resources")
276
277 if lu.__class__ in _nal_whitelist:
278 assert not have_nal, \
279 "LU is whitelisted for not acquiring the node allocation lock"
280 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
281 assert have_nal, \
282 ("Node allocation lock must be used if an LU acquires all nodes"
283 " or node resources")
284
285
287 """Object which runs OpCodes"""
288 DISPATCH_TABLE = _ComputeDispatchTable()
289
290 - def __init__(self, context, ec_id, enable_locks=True):
291 """Constructor for Processor
292
293 @type context: GanetiContext
294 @param context: global Ganeti context
295 @type ec_id: string
296 @param ec_id: execution context identifier
297
298 """
299 self.context = context
300 self._ec_id = ec_id
301 self._cbs = None
302 self.rpc = context.rpc
303 self.hmclass = hooksmaster.HooksMaster
304 self._enable_locks = enable_locks
305
307 """Checks if locking is enabled.
308
309 @raise errors.ProgrammerError: In case locking is not enabled
310
311 """
312 if not self._enable_locks:
313 raise errors.ProgrammerError("Attempted to use disabled locks")
314
315 - def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
316 """Acquires locks via the Ganeti lock manager.
317
318 @type level: int
319 @param level: Lock level
320 @type names: list or string
321 @param names: Lock names
322 @type shared: bool
323 @param shared: Whether the locks should be acquired in shared mode
324 @type opportunistic: bool
325 @param opportunistic: Whether to acquire opportunistically
326 @type timeout: None or float
327 @param timeout: Timeout for acquiring the locks
328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
329 amount of time
330
331 """
332 self._CheckLocksEnabled()
333
334 if self._cbs:
335 priority = self._cbs.CurrentPriority()
336 else:
337 priority = None
338
339 acquired = self.context.glm.acquire(level, names, shared=shared,
340 timeout=timeout, priority=priority,
341 opportunistic=opportunistic)
342
343 if acquired is None:
344 raise LockAcquireTimeout()
345
346 return acquired
347
349 """Logical Unit execution sequence.
350
351 """
352 write_count = self.context.cfg.write_count
353 lu.CheckPrereq()
354
355 hm = self.BuildHooksManager(lu)
356 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
357 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
358 self.Log, None)
359
360 if getattr(lu.op, "dry_run", False):
361
362
363
364 self.LogInfo("dry-run mode requested, not actually executing"
365 " the operation")
366 return lu.dry_run_result
367
368 if self._cbs:
369 submit_mj_fn = self._cbs.SubmitManyJobs
370 else:
371 submit_mj_fn = _FailingSubmitManyJobs
372
373 try:
374 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
375 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
376 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
377 self.Log, result)
378 finally:
379
380 if write_count != self.context.cfg.write_count:
381 hm.RunConfigUpdate()
382
383 return result
384
387
389 """Execute a Logical Unit, with the needed locks.
390
391 This is a recursive function that starts locking the given level, and
392 proceeds up, till there are no more locks to acquire. Then it executes the
393 given LU and its opcodes.
394
395 """
396 glm = self.context.glm
397 adding_locks = level in lu.add_locks
398 acquiring_locks = level in lu.needed_locks
399
400 if level not in locking.LEVELS:
401 _VerifyLocks(lu, glm)
402
403 if self._cbs:
404 self._cbs.NotifyStart()
405
406 try:
407 result = self._ExecLU(lu)
408 except AssertionError, err:
409
410
411
412 (_, _, tb) = sys.exc_info()
413 err_info = traceback.format_tb(tb)
414 del tb
415 logging.exception("Detected AssertionError")
416 raise errors.OpExecError("Internal assertion error: please report"
417 " this as a bug.\nError message: '%s';"
418 " location:\n%s" % (str(err), err_info[-1]))
419
420 elif adding_locks and acquiring_locks:
421
422
423 raise NotImplementedError("Can't declare locks to acquire when adding"
424 " others")
425
426 elif adding_locks or acquiring_locks:
427 self._CheckLocksEnabled()
428
429 lu.DeclareLocks(level)
430 share = lu.share_locks[level]
431 opportunistic = lu.opportunistic_locks[level]
432
433 try:
434 assert adding_locks ^ acquiring_locks, \
435 "Locks must be either added or acquired"
436
437 if acquiring_locks:
438
439 needed_locks = lu.needed_locks[level]
440
441 self._AcquireLocks(level, needed_locks, share, opportunistic,
442 calc_timeout())
443 else:
444
445 add_locks = lu.add_locks[level]
446 lu.remove_locks[level] = add_locks
447
448 try:
449 glm.add(level, add_locks, acquired=1, shared=share)
450 except errors.LockError:
451 logging.exception("Detected lock error in level %s for locks"
452 " %s, shared=%s", level, add_locks, share)
453 raise errors.OpPrereqError(
454 "Couldn't add locks (%s), most likely because of another"
455 " job who added them first" % add_locks,
456 errors.ECODE_NOTUNIQUE)
457
458 try:
459 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
460 finally:
461 if level in lu.remove_locks:
462 glm.remove(level, lu.remove_locks[level])
463 finally:
464 if glm.is_owned(level):
465 glm.release(level)
466
467 else:
468 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
469
470 return result
471
473 """Execute an opcode.
474
475 @type op: an OpCode instance
476 @param op: the opcode to be executed
477 @type cbs: L{OpExecCbBase}
478 @param cbs: Runtime callbacks
479 @type timeout: float or None
480 @param timeout: Maximum time to acquire all locks, None for no timeout
481 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
482 amount of time
483
484 """
485 if not isinstance(op, opcodes.OpCode):
486 raise errors.ProgrammerError("Non-opcode instance passed"
487 " to ExecOpcode (%s)" % type(op))
488
489 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
490 if lu_class is None:
491 raise errors.OpCodeUnknown("Unknown opcode")
492
493 if timeout is None:
494 calc_timeout = lambda: None
495 else:
496 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
497
498 self._cbs = cbs
499 try:
500 if self._enable_locks:
501
502
503
504 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
505 not lu_class.REQ_BGL, False, calc_timeout())
506 elif lu_class.REQ_BGL:
507 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
508 " disabled" % op.OP_ID)
509
510 try:
511 lu = lu_class(self, op, self.context, self.rpc)
512 lu.ExpandNames()
513 assert lu.needed_locks is not None, "needed_locks not set by LU"
514
515 try:
516 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
517 calc_timeout)
518 finally:
519 if self._ec_id:
520 self.context.cfg.DropECReservations(self._ec_id)
521 finally:
522
523 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
524 assert self._enable_locks
525 self.context.glm.release(locking.LEVEL_CLUSTER)
526 finally:
527 self._cbs = None
528
529 resultcheck_fn = op.OP_RESULT
530 if not (resultcheck_fn is None or resultcheck_fn(result)):
531 logging.error("Expected opcode result matching %s, got %s",
532 resultcheck_fn, result)
533 if not getattr(op, "dry_run", False):
534
535
536
537 raise errors.OpResultError("Opcode result does not match %s: %s" %
538 (resultcheck_fn, utils.Truncate(result, 80)))
539
540 return result
541
542 - def Log(self, *args):
543 """Forward call to feedback callback function.
544
545 """
546 if self._cbs:
547 self._cbs.Feedback(*args)
548
549 - def LogStep(self, current, total, message):
550 """Log a change in LU execution progress.
551
552 """
553 logging.debug("Step %d/%d %s", current, total, message)
554 self.Log("STEP %d/%d %s" % (current, total, message))
555
557 """Log a warning to the logs and the user.
558
559 The optional keyword argument is 'hint' and can be used to show a
560 hint to the user (presumably related to the warning). If the
561 message is empty, it will not be printed at all, allowing one to
562 show only a hint.
563
564 """
565 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
566 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
567 if args:
568 message = message % tuple(args)
569 if message:
570 logging.warning(message)
571 self.Log(" - WARNING: %s" % message)
572 if "hint" in kwargs:
573 self.Log(" Hint: %s" % kwargs["hint"])
574
575 - def LogInfo(self, message, *args):
583
585 """Returns the current execution context ID.
586
587 """
588 if not self._ec_id:
589 raise errors.ProgrammerError("Tried to use execution context id when"
590 " not set")
591 return self._ec_id
592