1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42
43
44 _OP_PREFIX = "Op"
45 _LU_PREFIX = "LU"
46
47
49 """Exception to report timeouts on acquiring locks.
50
51 """
52
53
77
78
80 """Class with lock acquire timeout strategy.
81
82 """
83 __slots__ = [
84 "_timeouts",
85 "_random_fn",
86 "_time_fn",
87 ]
88
89 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
90
91 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
92 """Initializes this class.
93
94 @param _time_fn: Time function for unittests
95 @param _random_fn: Random number generator for unittests
96
97 """
98 object.__init__(self)
99
100 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
101 self._time_fn = _time_fn
102 self._random_fn = _random_fn
103
105 """Returns the timeout for the next attempt.
106
107 """
108 try:
109 timeout = self._timeouts.next()
110 except StopIteration:
111
112 timeout = None
113
114 if timeout is not None:
115
116
117 variation_range = timeout * 0.1
118 timeout += ((self._random_fn() * variation_range) -
119 (variation_range * 0.5))
120
121 return timeout
122
123
125 """Base class for OpCode execution callbacks.
126
127 """
129 """Called when we are about to execute the LU.
130
131 This function is called when we're about to start the lu's Exec() method,
132 that is, after we have acquired all locks.
133
134 """
135
137 """Sends feedback from the LU code to the end-user.
138
139 """
140
142 """Check whether job has been cancelled.
143
144 """
145
146
148 """Computes the LU name for a given OpCode name.
149
150 """
151 assert opname.startswith(_OP_PREFIX), \
152 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
153
154 return _LU_PREFIX + opname[len(_OP_PREFIX):]
155
156
164
165
167 """Object which runs OpCodes"""
168 DISPATCH_TABLE = _ComputeDispatchTable()
169
171 """Constructor for Processor
172
173 @type context: GanetiContext
174 @param context: global Ganeti context
175 @type ec_id: string
176 @param ec_id: execution context identifier
177
178 """
179 self.context = context
180 self._ec_id = ec_id
181 self._cbs = None
182 self.rpc = rpc.RpcRunner(context.cfg)
183 self.hmclass = HooksMaster
184
185 - def _AcquireLocks(self, level, names, shared, timeout, priority):
186 """Acquires locks via the Ganeti lock manager.
187
188 @type level: int
189 @param level: Lock level
190 @type names: list or string
191 @param names: Lock names
192 @type shared: bool
193 @param shared: Whether the locks should be acquired in shared mode
194 @type timeout: None or float
195 @param timeout: Timeout for acquiring the locks
196 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
197 amount of time
198
199 """
200 if self._cbs:
201 self._cbs.CheckCancel()
202
203 acquired = self.context.glm.acquire(level, names, shared=shared,
204 timeout=timeout, priority=priority)
205
206 if acquired is None:
207 raise LockAcquireTimeout()
208
209 return acquired
210
212 """Logical Unit execution sequence.
213
214 """
215 write_count = self.context.cfg.write_count
216 lu.CheckPrereq()
217 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
218 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
219 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
220 self.Log, None)
221
222 if getattr(lu.op, "dry_run", False):
223
224
225
226 self.LogInfo("dry-run mode requested, not actually executing"
227 " the operation")
228 return lu.dry_run_result
229
230 try:
231 result = lu.Exec(self.Log)
232 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
233 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
234 self.Log, result)
235 finally:
236
237 if write_count != self.context.cfg.write_count:
238 hm.RunConfigUpdate()
239
240 return result
241
243 """Execute a Logical Unit, with the needed locks.
244
245 This is a recursive function that starts locking the given level, and
246 proceeds up, till there are no more locks to acquire. Then it executes the
247 given LU and its opcodes.
248
249 """
250 adding_locks = level in lu.add_locks
251 acquiring_locks = level in lu.needed_locks
252 if level not in locking.LEVELS:
253 if self._cbs:
254 self._cbs.NotifyStart()
255
256 result = self._ExecLU(lu)
257
258 elif adding_locks and acquiring_locks:
259
260
261 raise NotImplementedError("Can't declare locks to acquire when adding"
262 " others")
263
264 elif adding_locks or acquiring_locks:
265 lu.DeclareLocks(level)
266 share = lu.share_locks[level]
267
268 try:
269 assert adding_locks ^ acquiring_locks, \
270 "Locks must be either added or acquired"
271
272 if acquiring_locks:
273
274 needed_locks = lu.needed_locks[level]
275
276 acquired = self._AcquireLocks(level, needed_locks, share,
277 calc_timeout(), priority)
278 else:
279
280 add_locks = lu.add_locks[level]
281 lu.remove_locks[level] = add_locks
282
283 try:
284 self.context.glm.add(level, add_locks, acquired=1, shared=share)
285 except errors.LockError:
286 raise errors.OpPrereqError(
287 "Couldn't add locks (%s), probably because of a race condition"
288 " with another job, who added them first" % add_locks,
289 errors.ECODE_FAULT)
290
291 acquired = add_locks
292
293 try:
294 lu.acquired_locks[level] = acquired
295
296 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
297 finally:
298 if level in lu.remove_locks:
299 self.context.glm.remove(level, lu.remove_locks[level])
300 finally:
301 if self.context.glm.is_owned(level):
302 self.context.glm.release(level)
303
304 else:
305 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
306
307 return result
308
309 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
310 """Execute an opcode.
311
312 @type op: an OpCode instance
313 @param op: the opcode to be executed
314 @type cbs: L{OpExecCbBase}
315 @param cbs: Runtime callbacks
316 @type timeout: float or None
317 @param timeout: Maximum time to acquire all locks, None for no timeout
318 @type priority: number or None
319 @param priority: Priority for acquiring lock(s)
320 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
321 amount of time
322
323 """
324 if not isinstance(op, opcodes.OpCode):
325 raise errors.ProgrammerError("Non-opcode instance passed"
326 " to ExecOpcode")
327
328 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
329 if lu_class is None:
330 raise errors.OpCodeUnknown("Unknown opcode")
331
332 if timeout is None:
333 calc_timeout = lambda: None
334 else:
335 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
336
337 self._cbs = cbs
338 try:
339
340
341
342 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
343 not lu_class.REQ_BGL, calc_timeout(),
344 priority)
345 try:
346 lu = lu_class(self, op, self.context, self.rpc)
347 lu.ExpandNames()
348 assert lu.needed_locks is not None, "needed_locks not set by LU"
349
350 try:
351 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
352 priority)
353 finally:
354 if self._ec_id:
355 self.context.cfg.DropECReservations(self._ec_id)
356 finally:
357 self.context.glm.release(locking.LEVEL_CLUSTER)
358 finally:
359 self._cbs = None
360
361 - def Log(self, *args):
362 """Forward call to feedback callback function.
363
364 """
365 if self._cbs:
366 self._cbs.Feedback(*args)
367
368 - def LogStep(self, current, total, message):
369 """Log a change in LU execution progress.
370
371 """
372 logging.debug("Step %d/%d %s", current, total, message)
373 self.Log("STEP %d/%d %s" % (current, total, message))
374
376 """Log a warning to the logs and the user.
377
378 The optional keyword argument is 'hint' and can be used to show a
379 hint to the user (presumably related to the warning). If the
380 message is empty, it will not be printed at all, allowing one to
381 show only a hint.
382
383 """
384 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
385 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
386 if args:
387 message = message % tuple(args)
388 if message:
389 logging.warning(message)
390 self.Log(" - WARNING: %s" % message)
391 if "hint" in kwargs:
392 self.Log(" Hint: %s" % kwargs["hint"])
393
394 - def LogInfo(self, message, *args):
402
404 """Returns the current execution context ID.
405
406 """
407 if not self._ec_id:
408 raise errors.ProgrammerError("Tried to use execution context id when"
409 " not set")
410 return self._ec_id
411
412
414 """Hooks master.
415
416 This class distributes the run commands to the nodes based on the
417 specific LU class.
418
419 In order to remove the direct dependency on the rpc module, the
420 constructor needs a function which actually does the remote
421 call. This will usually be rpc.call_hooks_runner, but any function
422 which behaves the same works.
423
424 """
432
434 """Compute the environment and the target nodes.
435
436 Based on the opcode and the current node list, this builds the
437 environment for the hooks and the target node list for the run.
438
439 """
440 env = {
441 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
442 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
443 "GANETI_OP_CODE": self.op.OP_ID,
444 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
445 "GANETI_DATA_DIR": constants.DATA_DIR,
446 }
447
448 if self.lu.HPATH is not None:
449 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
450 if lu_env:
451 for key in lu_env:
452 env["GANETI_" + key] = lu_env[key]
453 else:
454 lu_nodes_pre = lu_nodes_post = []
455
456 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
457
459 """Simple wrapper over self.callfn.
460
461 This method fixes the environment before doing the rpc call.
462
463 """
464 env = self.env.copy()
465 env["GANETI_HOOKS_PHASE"] = phase
466 env["GANETI_HOOKS_PATH"] = hpath
467 if self.lu.cfg is not None:
468 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
469 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
470
471 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
472
473 return self.callfn(node_list, hpath, phase, env)
474
476 """Run all the scripts for a phase.
477
478 This is the main function of the HookMaster.
479
480 @param phase: one of L{constants.HOOKS_PHASE_POST} or
481 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
482 @param nodes: overrides the predefined list of nodes for the given phase
483 @return: the processed results of the hooks multi-node rpc call
484 @raise errors.HooksFailure: on communication failure to the nodes
485 @raise errors.HooksAbort: on failure of one of the hooks
486
487 """
488 if not self.node_list[phase] and not nodes:
489
490
491
492 return
493 hpath = self.lu.HPATH
494 if nodes is not None:
495 results = self._RunWrapper(nodes, hpath, phase)
496 else:
497 results = self._RunWrapper(self.node_list[phase], hpath, phase)
498 errs = []
499 if not results:
500 msg = "Communication Failure"
501 if phase == constants.HOOKS_PHASE_PRE:
502 raise errors.HooksFailure(msg)
503 else:
504 self.lu.LogWarning(msg)
505 return results
506 for node_name in results:
507 res = results[node_name]
508 if res.offline:
509 continue
510 msg = res.fail_msg
511 if msg:
512 self.lu.LogWarning("Communication failure to node %s: %s",
513 node_name, msg)
514 continue
515 for script, hkr, output in res.payload:
516 if hkr == constants.HKR_FAIL:
517 if phase == constants.HOOKS_PHASE_PRE:
518 errs.append((node_name, script, output))
519 else:
520 if not output:
521 output = "(no output)"
522 self.lu.LogWarning("On %s script %s failed, output: %s" %
523 (node_name, script, output))
524 if errs and phase == constants.HOOKS_PHASE_PRE:
525 raise errors.HooksAbort(errs)
526 return results
527
539