1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43
44
45 _OP_PREFIX = "Op"
46 _LU_PREFIX = "LU"
47
48
50 """Exception to report timeouts on acquiring locks.
51
52 """
53
54
78
79
81 """Class with lock acquire timeout strategy.
82
83 """
84 __slots__ = [
85 "_timeouts",
86 "_random_fn",
87 "_time_fn",
88 ]
89
90 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
91
92 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
93 """Initializes this class.
94
95 @param _time_fn: Time function for unittests
96 @param _random_fn: Random number generator for unittests
97
98 """
99 object.__init__(self)
100
101 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
102 self._time_fn = _time_fn
103 self._random_fn = _random_fn
104
106 """Returns the timeout for the next attempt.
107
108 """
109 try:
110 timeout = self._timeouts.next()
111 except StopIteration:
112
113 timeout = None
114
115 if timeout is not None:
116
117
118 variation_range = timeout * 0.1
119 timeout += ((self._random_fn() * variation_range) -
120 (variation_range * 0.5))
121
122 return timeout
123
124
126 """Base class for OpCode execution callbacks.
127
128 """
130 """Called when we are about to execute the LU.
131
132 This function is called when we're about to start the lu's Exec() method,
133 that is, after we have acquired all locks.
134
135 """
136
138 """Sends feedback from the LU code to the end-user.
139
140 """
141
143 """Check whether job has been cancelled.
144
145 """
146
148 """Submits jobs for processing.
149
150 See L{jqueue.JobQueue.SubmitManyJobs}.
151
152 """
153 raise NotImplementedError
154
155
157 """Computes the LU name for a given OpCode name.
158
159 """
160 assert opname.startswith(_OP_PREFIX), \
161 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
162
163 return _LU_PREFIX + opname[len(_OP_PREFIX):]
164
165
173
174
176 """Object which runs OpCodes"""
177 DISPATCH_TABLE = _ComputeDispatchTable()
178
180 """Constructor for Processor
181
182 @type context: GanetiContext
183 @param context: global Ganeti context
184 @type ec_id: string
185 @param ec_id: execution context identifier
186
187 """
188 self.context = context
189 self._ec_id = ec_id
190 self._cbs = None
191 self.rpc = rpc.RpcRunner(context.cfg)
192 self.hmclass = HooksMaster
193
194 - def _AcquireLocks(self, level, names, shared, timeout, priority):
195 """Acquires locks via the Ganeti lock manager.
196
197 @type level: int
198 @param level: Lock level
199 @type names: list or string
200 @param names: Lock names
201 @type shared: bool
202 @param shared: Whether the locks should be acquired in shared mode
203 @type timeout: None or float
204 @param timeout: Timeout for acquiring the locks
205 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
206 amount of time
207
208 """
209 if self._cbs:
210 self._cbs.CheckCancel()
211
212 acquired = self.context.glm.acquire(level, names, shared=shared,
213 timeout=timeout, priority=priority)
214
215 if acquired is None:
216 raise LockAcquireTimeout()
217
218 return acquired
219
239
241 """Logical Unit execution sequence.
242
243 """
244 write_count = self.context.cfg.write_count
245 lu.CheckPrereq()
246 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
247 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
248 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
249 self.Log, None)
250
251 if getattr(lu.op, "dry_run", False):
252
253
254
255 self.LogInfo("dry-run mode requested, not actually executing"
256 " the operation")
257 return lu.dry_run_result
258
259 try:
260 result = self._ProcessResult(lu.Exec(self.Log))
261 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
262 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
263 self.Log, result)
264 finally:
265
266 if write_count != self.context.cfg.write_count:
267 hm.RunConfigUpdate()
268
269 return result
270
272 """Execute a Logical Unit, with the needed locks.
273
274 This is a recursive function that starts locking the given level, and
275 proceeds up, till there are no more locks to acquire. Then it executes the
276 given LU and its opcodes.
277
278 """
279 adding_locks = level in lu.add_locks
280 acquiring_locks = level in lu.needed_locks
281 if level not in locking.LEVELS:
282 if self._cbs:
283 self._cbs.NotifyStart()
284
285 result = self._ExecLU(lu)
286
287 elif adding_locks and acquiring_locks:
288
289
290 raise NotImplementedError("Can't declare locks to acquire when adding"
291 " others")
292
293 elif adding_locks or acquiring_locks:
294 lu.DeclareLocks(level)
295 share = lu.share_locks[level]
296
297 try:
298 assert adding_locks ^ acquiring_locks, \
299 "Locks must be either added or acquired"
300
301 if acquiring_locks:
302
303 needed_locks = lu.needed_locks[level]
304
305 self._AcquireLocks(level, needed_locks, share,
306 calc_timeout(), priority)
307 else:
308
309 add_locks = lu.add_locks[level]
310 lu.remove_locks[level] = add_locks
311
312 try:
313 self.context.glm.add(level, add_locks, acquired=1, shared=share)
314 except errors.LockError:
315 raise errors.OpPrereqError(
316 "Couldn't add locks (%s), probably because of a race condition"
317 " with another job, who added them first" % add_locks,
318 errors.ECODE_FAULT)
319
320 try:
321 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
322 finally:
323 if level in lu.remove_locks:
324 self.context.glm.remove(level, lu.remove_locks[level])
325 finally:
326 if self.context.glm.is_owned(level):
327 self.context.glm.release(level)
328
329 else:
330 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
331
332 return result
333
334 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
335 """Execute an opcode.
336
337 @type op: an OpCode instance
338 @param op: the opcode to be executed
339 @type cbs: L{OpExecCbBase}
340 @param cbs: Runtime callbacks
341 @type timeout: float or None
342 @param timeout: Maximum time to acquire all locks, None for no timeout
343 @type priority: number or None
344 @param priority: Priority for acquiring lock(s)
345 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
346 amount of time
347
348 """
349 if not isinstance(op, opcodes.OpCode):
350 raise errors.ProgrammerError("Non-opcode instance passed"
351 " to ExecOpcode (%s)" % type(op))
352
353 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
354 if lu_class is None:
355 raise errors.OpCodeUnknown("Unknown opcode")
356
357 if timeout is None:
358 calc_timeout = lambda: None
359 else:
360 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
361
362 self._cbs = cbs
363 try:
364
365
366
367 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
368 not lu_class.REQ_BGL, calc_timeout(),
369 priority)
370 try:
371 lu = lu_class(self, op, self.context, self.rpc)
372 lu.ExpandNames()
373 assert lu.needed_locks is not None, "needed_locks not set by LU"
374
375 try:
376 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
377 priority)
378 finally:
379 if self._ec_id:
380 self.context.cfg.DropECReservations(self._ec_id)
381 finally:
382 self.context.glm.release(locking.LEVEL_CLUSTER)
383 finally:
384 self._cbs = None
385
386 resultcheck_fn = op.OP_RESULT
387 if not (resultcheck_fn is None or resultcheck_fn(result)):
388 logging.error("Expected opcode result matching %s, got %s",
389 resultcheck_fn, result)
390 raise errors.OpResultError("Opcode result does not match %s" %
391 resultcheck_fn)
392
393 return result
394
395 - def Log(self, *args):
396 """Forward call to feedback callback function.
397
398 """
399 if self._cbs:
400 self._cbs.Feedback(*args)
401
402 - def LogStep(self, current, total, message):
403 """Log a change in LU execution progress.
404
405 """
406 logging.debug("Step %d/%d %s", current, total, message)
407 self.Log("STEP %d/%d %s" % (current, total, message))
408
410 """Log a warning to the logs and the user.
411
412 The optional keyword argument is 'hint' and can be used to show a
413 hint to the user (presumably related to the warning). If the
414 message is empty, it will not be printed at all, allowing one to
415 show only a hint.
416
417 """
418 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
419 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
420 if args:
421 message = message % tuple(args)
422 if message:
423 logging.warning(message)
424 self.Log(" - WARNING: %s" % message)
425 if "hint" in kwargs:
426 self.Log(" Hint: %s" % kwargs["hint"])
427
428 - def LogInfo(self, message, *args):
436
438 """Returns the current execution context ID.
439
440 """
441 if not self._ec_id:
442 raise errors.ProgrammerError("Tried to use execution context id when"
443 " not set")
444 return self._ec_id
445
446
448 """Hooks master.
449
450 This class distributes the run commands to the nodes based on the
451 specific LU class.
452
453 In order to remove the direct dependency on the rpc module, the
454 constructor needs a function which actually does the remote
455 call. This will usually be rpc.call_hooks_runner, but any function
456 which behaves the same works.
457
458 """
460 self.callfn = callfn
461 self.lu = lu
462 self.op = lu.op
463 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
464
465 if self.lu.HPATH is None:
466 nodes = (None, None)
467 else:
468 nodes = map(frozenset, self.lu.BuildHooksNodes())
469
470 (self.pre_nodes, self.post_nodes) = nodes
471
473 """Compute the environment and the target nodes.
474
475 Based on the opcode and the current node list, this builds the
476 environment for the hooks and the target node list for the run.
477
478 """
479 if phase == constants.HOOKS_PHASE_PRE:
480 prefix = "GANETI_"
481 elif phase == constants.HOOKS_PHASE_POST:
482 prefix = "GANETI_POST_"
483 else:
484 raise AssertionError("Unknown phase '%s'" % phase)
485
486 env = {}
487
488 if self.lu.HPATH is not None:
489 lu_env = self.lu.BuildHooksEnv()
490 if lu_env:
491 assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
492 env.update(("%s%s" % (prefix, key), value)
493 for (key, value) in lu_env.items())
494
495 if phase == constants.HOOKS_PHASE_PRE:
496 assert compat.all((key.startswith("GANETI_") and
497 not key.startswith("GANETI_POST_"))
498 for key in env)
499
500 elif phase == constants.HOOKS_PHASE_POST:
501 assert compat.all(key.startswith("GANETI_POST_") for key in env)
502 assert isinstance(self.pre_env, dict)
503
504
505 assert not compat.any(key.startswith("GANETI_POST_")
506 for key in self.pre_env)
507 env.update(self.pre_env)
508 else:
509 raise AssertionError("Unknown phase '%s'" % phase)
510
511 return env
512
513 - def _RunWrapper(self, node_list, hpath, phase, phase_env):
514 """Simple wrapper over self.callfn.
515
516 This method fixes the environment before doing the rpc call.
517
518 """
519 cfg = self.lu.cfg
520
521 env = {
522 "PATH": constants.HOOKS_PATH,
523 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
524 "GANETI_OP_CODE": self.op.OP_ID,
525 "GANETI_DATA_DIR": constants.DATA_DIR,
526 "GANETI_HOOKS_PHASE": phase,
527 "GANETI_HOOKS_PATH": hpath,
528 }
529
530 if self.lu.HTYPE:
531 env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
532
533 if cfg is not None:
534 env["GANETI_CLUSTER"] = cfg.GetClusterName()
535 env["GANETI_MASTER"] = cfg.GetMasterNode()
536
537 if phase_env:
538 assert not (set(env) & set(phase_env)), "Environment variables conflict"
539 env.update(phase_env)
540
541
542 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
543
544 assert compat.all(key == "PATH" or key.startswith("GANETI_")
545 for key in env)
546
547 return self.callfn(node_list, hpath, phase, env)
548
550 """Run all the scripts for a phase.
551
552 This is the main function of the HookMaster.
553
554 @param phase: one of L{constants.HOOKS_PHASE_POST} or
555 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
556 @param nodes: overrides the predefined list of nodes for the given phase
557 @return: the processed results of the hooks multi-node rpc call
558 @raise errors.HooksFailure: on communication failure to the nodes
559 @raise errors.HooksAbort: on failure of one of the hooks
560
561 """
562 if phase == constants.HOOKS_PHASE_PRE:
563 if nodes is None:
564 nodes = self.pre_nodes
565 env = self.pre_env
566 elif phase == constants.HOOKS_PHASE_POST:
567 if nodes is None:
568 nodes = self.post_nodes
569 env = self._BuildEnv(phase)
570 else:
571 raise AssertionError("Unknown phase '%s'" % phase)
572
573 if not nodes:
574
575
576
577 return
578
579 results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
580 if not results:
581 msg = "Communication Failure"
582 if phase == constants.HOOKS_PHASE_PRE:
583 raise errors.HooksFailure(msg)
584 else:
585 self.lu.LogWarning(msg)
586 return results
587
588 errs = []
589 for node_name in results:
590 res = results[node_name]
591 if res.offline:
592 continue
593
594 msg = res.fail_msg
595 if msg:
596 self.lu.LogWarning("Communication failure to node %s: %s",
597 node_name, msg)
598 continue
599
600 for script, hkr, output in res.payload:
601 if hkr == constants.HKR_FAIL:
602 if phase == constants.HOOKS_PHASE_PRE:
603 errs.append((node_name, script, output))
604 else:
605 if not output:
606 output = "(no output)"
607 self.lu.LogWarning("On %s script %s failed, output: %s" %
608 (node_name, script, output))
609
610 if errs and phase == constants.HOOKS_PHASE_PRE:
611 raise errors.HooksAbort(errs)
612
613 return results
614
626