1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
44 """Exception to report timeouts on acquiring locks.
45
46 """
47
48
50 """Calculate timeouts for lock attempts.
51
52 """
53 result = [1.0]
54
55
56 while sum(result) < 150.0:
57 timeout = (result[-1] * 1.05) ** 1.25
58
59
60
61
62 if timeout > 10.0:
63 timeout = 10.0
64
65 elif timeout < 0.1:
66
67 timeout = 0.1
68
69 result.append(timeout)
70
71 return result
72
73
75 """Class with lock acquire timeout strategy.
76
77 """
78 __slots__ = [
79 "_timeouts",
80 "_random_fn",
81 "_time_fn",
82 ]
83
84 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
85
86 - def __init__(self, _time_fn=time.time, _random_fn=random.random):
87 """Initializes this class.
88
89 @param _time_fn: Time function for unittests
90 @param _random_fn: Random number generator for unittests
91
92 """
93 object.__init__(self)
94
95 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
96 self._time_fn = _time_fn
97 self._random_fn = _random_fn
98
100 """Returns the timeout for the next attempt.
101
102 """
103 try:
104 timeout = self._timeouts.next()
105 except StopIteration:
106
107 timeout = None
108
109 if timeout is not None:
110
111
112 variation_range = timeout * 0.1
113 timeout += ((self._random_fn() * variation_range) -
114 (variation_range * 0.5))
115
116 return timeout
117
118
120 """Base class for OpCode execution callbacks.
121
122 """
124 """Called when we are about to execute the LU.
125
126 This function is called when we're about to start the lu's Exec() method,
127 that is, after we have acquired all locks.
128
129 """
130
132 """Sends feedback from the LU code to the end-user.
133
134 """
135
137 """Check whether job has been cancelled.
138
139 """
140
141
143 """Object which runs OpCodes"""
144 DISPATCH_TABLE = {
145
146 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
147 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
148 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
149 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
150 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
151 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
152 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
153 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
154 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
155 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
156
157 opcodes.OpAddNode: cmdlib.LUAddNode,
158 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
159 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
160 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
161 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
162 opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
163 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
164 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
165 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
166 opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
167 opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
168
169 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
170 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
171 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
172 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
173 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
174 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
175 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
176 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
177 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
178 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
179 opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
180 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
181 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
182 opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
183 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
184 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
185 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
186 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
187 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
188
189 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
190
191 opcodes.OpQueryExports: cmdlib.LUQueryExports,
192 opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
193 opcodes.OpExportInstance: cmdlib.LUExportInstance,
194 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
195
196 opcodes.OpGetTags: cmdlib.LUGetTags,
197 opcodes.OpSearchTags: cmdlib.LUSearchTags,
198 opcodes.OpAddTags: cmdlib.LUAddTags,
199 opcodes.OpDelTags: cmdlib.LUDelTags,
200
201 opcodes.OpTestDelay: cmdlib.LUTestDelay,
202 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
203 opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
204 }
205
207 """Constructor for Processor
208
209 @type context: GanetiContext
210 @param context: global Ganeti context
211 @type ec_id: string
212 @param ec_id: execution context identifier
213
214 """
215 self.context = context
216 self._ec_id = ec_id
217 self._cbs = None
218 self.rpc = rpc.RpcRunner(context.cfg)
219 self.hmclass = HooksMaster
220
221 - def _AcquireLocks(self, level, names, shared, timeout, priority):
222 """Acquires locks via the Ganeti lock manager.
223
224 @type level: int
225 @param level: Lock level
226 @type names: list or string
227 @param names: Lock names
228 @type shared: bool
229 @param shared: Whether the locks should be acquired in shared mode
230 @type timeout: None or float
231 @param timeout: Timeout for acquiring the locks
232 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
233 amount of time
234
235 """
236 if self._cbs:
237 self._cbs.CheckCancel()
238
239 acquired = self.context.glm.acquire(level, names, shared=shared,
240 timeout=timeout, priority=priority)
241
242 if acquired is None:
243 raise LockAcquireTimeout()
244
245 return acquired
246
248 """Logical Unit execution sequence.
249
250 """
251 write_count = self.context.cfg.write_count
252 lu.CheckPrereq()
253 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
254 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
255 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
256 self.Log, None)
257
258 if getattr(lu.op, "dry_run", False):
259
260
261
262 self.LogInfo("dry-run mode requested, not actually executing"
263 " the operation")
264 return lu.dry_run_result
265
266 try:
267 result = lu.Exec(self.Log)
268 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
269 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
270 self.Log, result)
271 finally:
272
273 if write_count != self.context.cfg.write_count:
274 hm.RunConfigUpdate()
275
276 return result
277
279 """Execute a Logical Unit, with the needed locks.
280
281 This is a recursive function that starts locking the given level, and
282 proceeds up, till there are no more locks to acquire. Then it executes the
283 given LU and its opcodes.
284
285 """
286 adding_locks = level in lu.add_locks
287 acquiring_locks = level in lu.needed_locks
288 if level not in locking.LEVELS:
289 if self._cbs:
290 self._cbs.NotifyStart()
291
292 result = self._ExecLU(lu)
293
294 elif adding_locks and acquiring_locks:
295
296
297 raise NotImplementedError("Can't declare locks to acquire when adding"
298 " others")
299
300 elif adding_locks or acquiring_locks:
301 lu.DeclareLocks(level)
302 share = lu.share_locks[level]
303
304 try:
305 assert adding_locks ^ acquiring_locks, \
306 "Locks must be either added or acquired"
307
308 if acquiring_locks:
309
310 needed_locks = lu.needed_locks[level]
311
312 acquired = self._AcquireLocks(level, needed_locks, share,
313 calc_timeout(), priority)
314 else:
315
316 add_locks = lu.add_locks[level]
317 lu.remove_locks[level] = add_locks
318
319 try:
320 self.context.glm.add(level, add_locks, acquired=1, shared=share)
321 except errors.LockError:
322 raise errors.OpPrereqError(
323 "Couldn't add locks (%s), probably because of a race condition"
324 " with another job, who added them first" % add_locks,
325 errors.ECODE_FAULT)
326
327 acquired = add_locks
328
329 try:
330 lu.acquired_locks[level] = acquired
331
332 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
333 finally:
334 if level in lu.remove_locks:
335 self.context.glm.remove(level, lu.remove_locks[level])
336 finally:
337 if self.context.glm.is_owned(level):
338 self.context.glm.release(level)
339
340 else:
341 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
342
343 return result
344
345 - def ExecOpCode(self, op, cbs, timeout=None, priority=None):
346 """Execute an opcode.
347
348 @type op: an OpCode instance
349 @param op: the opcode to be executed
350 @type cbs: L{OpExecCbBase}
351 @param cbs: Runtime callbacks
352 @type timeout: float or None
353 @param timeout: Maximum time to acquire all locks, None for no timeout
354 @type priority: number or None
355 @param priority: Priority for acquiring lock(s)
356 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
357 amount of time
358
359 """
360 if not isinstance(op, opcodes.OpCode):
361 raise errors.ProgrammerError("Non-opcode instance passed"
362 " to ExecOpcode")
363
364 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
365 if lu_class is None:
366 raise errors.OpCodeUnknown("Unknown opcode")
367
368 if timeout is None:
369 calc_timeout = lambda: None
370 else:
371 calc_timeout = locking.RunningTimeout(timeout, False).Remaining
372
373 self._cbs = cbs
374 try:
375
376
377
378 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
379 not lu_class.REQ_BGL, calc_timeout(),
380 priority)
381 try:
382 lu = lu_class(self, op, self.context, self.rpc)
383 lu.ExpandNames()
384 assert lu.needed_locks is not None, "needed_locks not set by LU"
385
386 try:
387 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
388 priority)
389 finally:
390 if self._ec_id:
391 self.context.cfg.DropECReservations(self._ec_id)
392 finally:
393 self.context.glm.release(locking.LEVEL_CLUSTER)
394 finally:
395 self._cbs = None
396
397 - def Log(self, *args):
398 """Forward call to feedback callback function.
399
400 """
401 if self._cbs:
402 self._cbs.Feedback(*args)
403
404 - def LogStep(self, current, total, message):
405 """Log a change in LU execution progress.
406
407 """
408 logging.debug("Step %d/%d %s", current, total, message)
409 self.Log("STEP %d/%d %s" % (current, total, message))
410
412 """Log a warning to the logs and the user.
413
414 The optional keyword argument is 'hint' and can be used to show a
415 hint to the user (presumably related to the warning). If the
416 message is empty, it will not be printed at all, allowing one to
417 show only a hint.
418
419 """
420 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
421 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
422 if args:
423 message = message % tuple(args)
424 if message:
425 logging.warning(message)
426 self.Log(" - WARNING: %s" % message)
427 if "hint" in kwargs:
428 self.Log(" Hint: %s" % kwargs["hint"])
429
430 - def LogInfo(self, message, *args):
438
440 if not self._ec_id:
441 errors.ProgrammerError("Tried to use execution context id when not set")
442 return self._ec_id
443
444
446 """Hooks master.
447
448 This class distributes the run commands to the nodes based on the
449 specific LU class.
450
451 In order to remove the direct dependency on the rpc module, the
452 constructor needs a function which actually does the remote
453 call. This will usually be rpc.call_hooks_runner, but any function
454 which behaves the same works.
455
456 """
464
466 """Compute the environment and the target nodes.
467
468 Based on the opcode and the current node list, this builds the
469 environment for the hooks and the target node list for the run.
470
471 """
472 env = {
473 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
474 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
475 "GANETI_OP_CODE": self.op.OP_ID,
476 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
477 "GANETI_DATA_DIR": constants.DATA_DIR,
478 }
479
480 if self.lu.HPATH is not None:
481 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
482 if lu_env:
483 for key in lu_env:
484 env["GANETI_" + key] = lu_env[key]
485 else:
486 lu_nodes_pre = lu_nodes_post = []
487
488 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
489
491 """Simple wrapper over self.callfn.
492
493 This method fixes the environment before doing the rpc call.
494
495 """
496 env = self.env.copy()
497 env["GANETI_HOOKS_PHASE"] = phase
498 env["GANETI_HOOKS_PATH"] = hpath
499 if self.lu.cfg is not None:
500 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
501 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
502
503 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
504
505 return self.callfn(node_list, hpath, phase, env)
506
508 """Run all the scripts for a phase.
509
510 This is the main function of the HookMaster.
511
512 @param phase: one of L{constants.HOOKS_PHASE_POST} or
513 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
514 @param nodes: overrides the predefined list of nodes for the given phase
515 @return: the processed results of the hooks multi-node rpc call
516 @raise errors.HooksFailure: on communication failure to the nodes
517 @raise errors.HooksAbort: on failure of one of the hooks
518
519 """
520 if not self.node_list[phase] and not nodes:
521
522
523
524 return
525 hpath = self.lu.HPATH
526 if nodes is not None:
527 results = self._RunWrapper(nodes, hpath, phase)
528 else:
529 results = self._RunWrapper(self.node_list[phase], hpath, phase)
530 errs = []
531 if not results:
532 msg = "Communication Failure"
533 if phase == constants.HOOKS_PHASE_PRE:
534 raise errors.HooksFailure(msg)
535 else:
536 self.lu.LogWarning(msg)
537 return results
538 for node_name in results:
539 res = results[node_name]
540 if res.offline:
541 continue
542 msg = res.fail_msg
543 if msg:
544 self.lu.LogWarning("Communication failure to node %s: %s",
545 node_name, msg)
546 continue
547 for script, hkr, output in res.payload:
548 if hkr == constants.HKR_FAIL:
549 if phase == constants.HOOKS_PHASE_PRE:
550 errs.append((node_name, script, output))
551 else:
552 if not output:
553 output = "(no output)"
554 self.lu.LogWarning("On %s script %s failed, output: %s" %
555 (node_name, script, output))
556 if errs and phase == constants.HOOKS_PHASE_PRE:
557 raise errors.HooksAbort(errs)
558 return results
559
571