1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
44 """Internal exception to report timeouts on acquiring locks.
45
46 """
47
48
50 """Calculate timeouts for lock attempts.
51
52 """
53 result = [1.0]
54
55
56 while sum(result) < 150.0:
57 timeout = (result[-1] * 1.05) ** 1.25
58
59
60
61
62 if timeout > 10.0:
63 timeout = 10.0
64
65 elif timeout < 0.1:
66
67 timeout = 0.1
68
69 result.append(timeout)
70
71 return result
72
73
75 """Class with lock acquire timeout strategy.
76
77 """
78 __slots__ = [
79 "_attempt",
80 "_random_fn",
81 "_start_time",
82 "_time_fn",
83 "_running_timeout",
84 ]
85
86 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
87
88 - def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
89 """Initializes this class.
90
91 @type attempt: int
92 @param attempt: Current attempt number
93 @param _time_fn: Time function for unittests
94 @param _random_fn: Random number generator for unittests
95
96 """
97 object.__init__(self)
98
99 if attempt < 0:
100 raise ValueError("Attempt must be zero or positive")
101
102 self._attempt = attempt
103 self._time_fn = _time_fn
104 self._random_fn = _random_fn
105
106 try:
107 timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
108 except IndexError:
109
110 timeout = None
111
112 self._running_timeout = locking.RunningTimeout(timeout, False,
113 _time_fn=_time_fn)
114
116 """Returns the strategy for the next attempt.
117
118 """
119 return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
120 _time_fn=self._time_fn,
121 _random_fn=self._random_fn)
122
124 """Returns the remaining timeout.
125
126 """
127 timeout = self._running_timeout.Remaining()
128
129 if timeout is not None:
130
131
132 variation_range = timeout * 0.1
133 timeout += ((self._random_fn() * variation_range) -
134 (variation_range * 0.5))
135
136 return timeout
137
138
140 """Base class for OpCode execution callbacks.
141
142 """
144 """Called when we are about to execute the LU.
145
146 This function is called when we're about to start the lu's Exec() method,
147 that is, after we have acquired all locks.
148
149 """
150
152 """Sends feedback from the LU code to the end-user.
153
154 """
155
157 """Check whether job has been cancelled.
158
159 """
160
161
163 """Object which runs OpCodes"""
164 DISPATCH_TABLE = {
165
166 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
167 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
168 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
169 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
170 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
171 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
172 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
173 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
174 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
175 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
176
177 opcodes.OpAddNode: cmdlib.LUAddNode,
178 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
179 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
180 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
181 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
182 opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
183 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
184 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
185 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
186 opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
187 opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
188
189 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
190 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
191 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
192 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
193 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
194 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
195 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
196 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
197 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
198 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
199 opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
200 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
201 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
202 opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
203 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
204 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
205 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
206 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
207 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
208
209 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
210
211 opcodes.OpQueryExports: cmdlib.LUQueryExports,
212 opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
213 opcodes.OpExportInstance: cmdlib.LUExportInstance,
214 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
215
216 opcodes.OpGetTags: cmdlib.LUGetTags,
217 opcodes.OpSearchTags: cmdlib.LUSearchTags,
218 opcodes.OpAddTags: cmdlib.LUAddTags,
219 opcodes.OpDelTags: cmdlib.LUDelTags,
220
221 opcodes.OpTestDelay: cmdlib.LUTestDelay,
222 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
223 opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
224 }
225
227 """Constructor for Processor
228
229 @type context: GanetiContext
230 @param context: global Ganeti context
231 @type ec_id: string
232 @param ec_id: execution context identifier
233
234 """
235 self.context = context
236 self._ec_id = ec_id
237 self._cbs = None
238 self.rpc = rpc.RpcRunner(context.cfg)
239 self.hmclass = HooksMaster
240
242 """Acquires locks via the Ganeti lock manager.
243
244 @type level: int
245 @param level: Lock level
246 @type names: list or string
247 @param names: Lock names
248 @type shared: bool
249 @param shared: Whether the locks should be acquired in shared mode
250 @type timeout: None or float
251 @param timeout: Timeout for acquiring the locks
252
253 """
254 if self._cbs:
255 self._cbs.CheckCancel()
256
257 acquired = self.context.glm.acquire(level, names, shared=shared,
258 timeout=timeout)
259
260 return acquired
261
263 """Logical Unit execution sequence.
264
265 """
266 write_count = self.context.cfg.write_count
267 lu.CheckPrereq()
268 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
269 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
270 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
271 self.Log, None)
272
273 if getattr(lu.op, "dry_run", False):
274
275
276
277 self.LogInfo("dry-run mode requested, not actually executing"
278 " the operation")
279 return lu.dry_run_result
280
281 try:
282 result = lu.Exec(self.Log)
283 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
284 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
285 self.Log, result)
286 finally:
287
288 if write_count != self.context.cfg.write_count:
289 hm.RunConfigUpdate()
290
291 return result
292
294 """Execute a Logical Unit, with the needed locks.
295
296 This is a recursive function that starts locking the given level, and
297 proceeds up, till there are no more locks to acquire. Then it executes the
298 given LU and its opcodes.
299
300 """
301 adding_locks = level in lu.add_locks
302 acquiring_locks = level in lu.needed_locks
303 if level not in locking.LEVELS:
304 if self._cbs:
305 self._cbs.NotifyStart()
306
307 result = self._ExecLU(lu)
308
309 elif adding_locks and acquiring_locks:
310
311
312 raise NotImplementedError("Can't declare locks to acquire when adding"
313 " others")
314
315 elif adding_locks or acquiring_locks:
316 lu.DeclareLocks(level)
317 share = lu.share_locks[level]
318
319 try:
320 assert adding_locks ^ acquiring_locks, \
321 "Locks must be either added or acquired"
322
323 if acquiring_locks:
324
325 needed_locks = lu.needed_locks[level]
326
327 acquired = self._AcquireLocks(level, needed_locks, share,
328 calc_timeout())
329
330 if acquired is None:
331 raise _LockAcquireTimeout()
332
333 else:
334
335 add_locks = lu.add_locks[level]
336 lu.remove_locks[level] = add_locks
337
338 try:
339 self.context.glm.add(level, add_locks, acquired=1, shared=share)
340 except errors.LockError:
341 raise errors.OpPrereqError(
342 "Couldn't add locks (%s), probably because of a race condition"
343 " with another job, who added them first" % add_locks,
344 errors.ECODE_FAULT)
345
346 acquired = add_locks
347
348 try:
349 lu.acquired_locks[level] = acquired
350
351 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
352 finally:
353 if level in lu.remove_locks:
354 self.context.glm.remove(level, lu.remove_locks[level])
355 finally:
356 if self.context.glm.is_owned(level):
357 self.context.glm.release(level)
358
359 else:
360 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
361
362 return result
363
365 """Execute an opcode.
366
367 @type op: an OpCode instance
368 @param op: the opcode to be executed
369 @type cbs: L{OpExecCbBase}
370 @param cbs: Runtime callbacks
371
372 """
373 if not isinstance(op, opcodes.OpCode):
374 raise errors.ProgrammerError("Non-opcode instance passed"
375 " to ExecOpcode")
376
377 self._cbs = cbs
378 try:
379 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
380 if lu_class is None:
381 raise errors.OpCodeUnknown("Unknown opcode")
382
383 timeout_strategy = _LockAttemptTimeoutStrategy()
384
385 while True:
386 try:
387 acquire_timeout = timeout_strategy.CalcRemainingTimeout()
388
389
390
391
392 if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
393 not lu_class.REQ_BGL, acquire_timeout) is None:
394 raise _LockAcquireTimeout()
395
396 try:
397 lu = lu_class(self, op, self.context, self.rpc)
398 lu.ExpandNames()
399 assert lu.needed_locks is not None, "needed_locks not set by LU"
400
401 try:
402 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
403 timeout_strategy.CalcRemainingTimeout)
404 finally:
405 if self._ec_id:
406 self.context.cfg.DropECReservations(self._ec_id)
407
408 finally:
409 self.context.glm.release(locking.LEVEL_CLUSTER)
410
411 except _LockAcquireTimeout:
412
413 pass
414
415 timeout_strategy = timeout_strategy.NextAttempt()
416
417 finally:
418 self._cbs = None
419
420 - def Log(self, *args):
421 """Forward call to feedback callback function.
422
423 """
424 if self._cbs:
425 self._cbs.Feedback(*args)
426
427 - def LogStep(self, current, total, message):
428 """Log a change in LU execution progress.
429
430 """
431 logging.debug("Step %d/%d %s", current, total, message)
432 self.Log("STEP %d/%d %s" % (current, total, message))
433
435 """Log a warning to the logs and the user.
436
437 The optional keyword argument is 'hint' and can be used to show a
438 hint to the user (presumably related to the warning). If the
439 message is empty, it will not be printed at all, allowing one to
440 show only a hint.
441
442 """
443 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
444 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
445 if args:
446 message = message % tuple(args)
447 if message:
448 logging.warning(message)
449 self.Log(" - WARNING: %s" % message)
450 if "hint" in kwargs:
451 self.Log(" Hint: %s" % kwargs["hint"])
452
453 - def LogInfo(self, message, *args):
461
463 if not self._ec_id:
464 errors.ProgrammerError("Tried to use execution context id when not set")
465 return self._ec_id
466
467
469 """Hooks master.
470
471 This class distributes the run commands to the nodes based on the
472 specific LU class.
473
474 In order to remove the direct dependency on the rpc module, the
475 constructor needs a function which actually does the remote
476 call. This will usually be rpc.call_hooks_runner, but any function
477 which behaves the same works.
478
479 """
487
489 """Compute the environment and the target nodes.
490
491 Based on the opcode and the current node list, this builds the
492 environment for the hooks and the target node list for the run.
493
494 """
495 env = {
496 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
497 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
498 "GANETI_OP_CODE": self.op.OP_ID,
499 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
500 "GANETI_DATA_DIR": constants.DATA_DIR,
501 }
502
503 if self.lu.HPATH is not None:
504 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
505 if lu_env:
506 for key in lu_env:
507 env["GANETI_" + key] = lu_env[key]
508 else:
509 lu_nodes_pre = lu_nodes_post = []
510
511 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
512
514 """Simple wrapper over self.callfn.
515
516 This method fixes the environment before doing the rpc call.
517
518 """
519 env = self.env.copy()
520 env["GANETI_HOOKS_PHASE"] = phase
521 env["GANETI_HOOKS_PATH"] = hpath
522 if self.lu.cfg is not None:
523 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
524 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
525
526 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
527
528 return self.callfn(node_list, hpath, phase, env)
529
531 """Run all the scripts for a phase.
532
533 This is the main function of the HookMaster.
534
535 @param phase: one of L{constants.HOOKS_PHASE_POST} or
536 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
537 @param nodes: overrides the predefined list of nodes for the given phase
538 @return: the processed results of the hooks multi-node rpc call
539 @raise errors.HooksFailure: on communication failure to the nodes
540 @raise errors.HooksAbort: on failure of one of the hooks
541
542 """
543 if not self.node_list[phase] and not nodes:
544
545
546
547 return
548 hpath = self.lu.HPATH
549 if nodes is not None:
550 results = self._RunWrapper(nodes, hpath, phase)
551 else:
552 results = self._RunWrapper(self.node_list[phase], hpath, phase)
553 errs = []
554 if not results:
555 msg = "Communication Failure"
556 if phase == constants.HOOKS_PHASE_PRE:
557 raise errors.HooksFailure(msg)
558 else:
559 self.lu.LogWarning(msg)
560 return results
561 for node_name in results:
562 res = results[node_name]
563 if res.offline:
564 continue
565 msg = res.fail_msg
566 if msg:
567 self.lu.LogWarning("Communication failure to node %s: %s",
568 node_name, msg)
569 continue
570 for script, hkr, output in res.payload:
571 if hkr == constants.HKR_FAIL:
572 if phase == constants.HOOKS_PHASE_PRE:
573 errs.append((node_name, script, output))
574 else:
575 if not output:
576 output = "(no output)"
577 self.lu.LogWarning("On %s script %s failed, output: %s" %
578 (node_name, script, output))
579 if errs and phase == constants.HOOKS_PHASE_PRE:
580 raise errors.HooksAbort(errs)
581 return results
582
594