1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
64 ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
68 ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72
73 CHILD_PROCESS_DELAY = 1.0
74
75
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
80 """Exception raised when this host is not the master."""
81
88
102
105 """Run the watcher hooks.
106
107 """
108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109 constants.HOOKS_NAME_WATCHER)
110 if not os.path.isdir(hooks_dir):
111 return
112
113 try:
114 results = utils.RunParts(hooks_dir)
115 except Exception, err:
116 logging.exception("RunParts %s failed: %s", hooks_dir, err)
117 return
118
119 for (relname, status, runresult) in results:
120 if status == constants.RUNPARTS_SKIP:
121 logging.debug("Watcher hook %s: skipped", relname)
122 elif status == constants.RUNPARTS_ERR:
123 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124 elif status == constants.RUNPARTS_RUN:
125 if runresult.failed:
126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127 relname, runresult.exit_code, runresult.output)
128 else:
129 logging.debug("Watcher hook %s: success (output: %s)", relname,
130 runresult.output)
131 else:
132 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133 status)
134
137 """Abstraction for a Virtual Machine instance.
138
139 """
140 - def __init__(self, name, status, disks_active, snodes):
141 self.name = name
142 self.status = status
143 self.disks_active = disks_active
144 self.snodes = snodes
145
152
159
162 """Data container representing cluster node.
163
164 """
165 - def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class.
167
168 """
169 self.name = name
170 self.bootid = bootid
171 self.offline = offline
172 self.secondaries = secondaries
173
176 """Make a pass over the list of instances, restarting downed ones.
177
178 """
179 notepad.MaintainInstanceList(instances.keys())
180
181 started = set()
182
183 for inst in instances.values():
184 if inst.status in BAD_STATES:
185 n = notepad.NumberOfRestartAttempts(inst.name)
186
187 if n > MAXTRIES:
188 logging.warning("Not restarting instance '%s', retries exhausted",
189 inst.name)
190 continue
191
192 if n == MAXTRIES:
193 notepad.RecordRestartAttempt(inst.name)
194 logging.error("Could not restart instance '%s' after %s attempts,"
195 " giving up", inst.name, MAXTRIES)
196 continue
197
198 try:
199 logging.info("Restarting instance '%s' (attempt #%s)",
200 inst.name, n + 1)
201 inst.Restart(cl)
202 except Exception:
203 logging.exception("Error while restarting instance '%s'", inst.name)
204 else:
205 started.add(inst.name)
206
207 notepad.RecordRestartAttempt(inst.name)
208
209 else:
210 if notepad.NumberOfRestartAttempts(inst.name):
211 notepad.RemoveInstance(inst.name)
212 if inst.status not in HELPLESS_STATES:
213 logging.info("Restart of instance '%s' succeeded", inst.name)
214
215 return started
216
217
218 -def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones.
220
221 """
222 check_nodes = []
223
224 for node in nodes.values():
225 old = notepad.GetNodeBootID(node.name)
226 if not node.bootid:
227
228 if not node.offline:
229 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230 node.name)
231 continue
232
233 if old != node.bootid:
234
235 check_nodes.append(node)
236
237 if check_nodes:
238
239
240 for node in check_nodes:
241 for instance_name in node.secondaries:
242 try:
243 inst = instances[instance_name]
244 except KeyError:
245 logging.info("Can't find instance '%s', maybe it was ignored",
246 instance_name)
247 continue
248
249 if not inst.disks_active:
250 logging.info("Skipping disk activation for instance with not"
251 " activated disks '%s'", inst.name)
252 continue
253
254 if inst.name in started:
255
256
257 logging.debug("Skipping disk activation for instance '%s' as"
258 " it was already started", inst.name)
259 continue
260
261 try:
262 logging.info("Activating disks for instance '%s'", inst.name)
263 inst.ActivateDisks(cl)
264 except Exception:
265 logging.exception("Error while activating disks for instance '%s'",
266 inst.name)
267
268
269 for node in check_nodes:
270 notepad.SetNodeBootID(node.name, node.bootid)
271
274 """Checks if given instances has any secondary in offline status.
275
276 @param instance: The instance object
277 @return: True if any of the secondary is offline, False otherwise
278
279 """
280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
284 """Run a per-group "gnt-cluster verify-disks".
285
286 """
287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288 ((_, offline_disk_instances, _), ) = \
289 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290 cl.ArchiveJob(job_id)
291
292 if not offline_disk_instances:
293
294 logging.debug("Verify-disks reported no offline disks, nothing to do")
295 return
296
297 logging.debug("Will activate disks for instance(s) %s",
298 utils.CommaJoin(offline_disk_instances))
299
300
301
302 job = []
303 for name in offline_disk_instances:
304 try:
305 inst = instances[name]
306 except KeyError:
307 logging.info("Can't find instance '%s', maybe it was ignored", name)
308 continue
309
310 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311 logging.info("Skipping instance '%s' because it is in a helpless state"
312 " or has offline secondaries", name)
313 continue
314
315 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
316
317 if job:
318 job_id = cli.SendJob(job, cl=cl)
319
320 try:
321 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322 except Exception:
323 logging.exception("Error while activating disks")
324
327 """Connects to RAPI port and does a simple test.
328
329 Connects to RAPI port of hostname and does a simple test. At this time, the
330 test is GetVersion.
331
332 If RAPI responds with error code "401 Unauthorized", the test is successful,
333 because the aim of this function is to assess whether RAPI is responding, not
334 if it is accessible.
335
336 @type hostname: string
337 @param hostname: hostname of the node to connect to.
338 @rtype: bool
339 @return: Whether RAPI is working properly
340
341 """
342 curl_config = rapi.client.GenericCurlConfig()
343 rapi_client = rapi.client.GanetiRapiClient(hostname,
344 curl_config_fn=curl_config)
345 try:
346 master_version = rapi_client.GetVersion()
347 except rapi.client.CertificateError, err:
348 logging.warning("RAPI certificate error: %s", err)
349 return False
350 except rapi.client.GanetiApiError, err:
351 if err.code == 401:
352
353 return True
354 else:
355 logging.warning("RAPI error: %s", err)
356 return False
357 else:
358 logging.debug("Reported RAPI version %s", master_version)
359 return master_version == constants.RAPI_VERSION
360
363 """Parse the command line options.
364
365 @return: (options, args) as from OptionParser.parse_args()
366
367 """
368 parser = OptionParser(description="Ganeti cluster watcher",
369 usage="%prog [-d]",
370 version="%%prog (ganeti) %s" %
371 constants.RELEASE_VERSION)
372
373 parser.add_option(cli.DEBUG_OPT)
374 parser.add_option(cli.NODEGROUP_OPT)
375 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
376 help="Autoarchive jobs older than this age (default"
377 " 6 hours)")
378 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
379 action="store_true", help="Ignore cluster pause setting")
380 parser.add_option("--wait-children", dest="wait_children",
381 action="store_true", help="Wait for child processes")
382 parser.add_option("--no-wait-children", dest="wait_children",
383 action="store_false",
384 help="Don't wait for child processes")
385
386 parser.set_defaults(wait_children=True)
387 options, args = parser.parse_args()
388 options.job_age = cli.ParseTimespec(options.job_age)
389
390 if args:
391 parser.error("No arguments expected")
392
393 return (options, args)
394
397 """Writes the per-group instance status file.
398
399 The entries are sorted.
400
401 @type filename: string
402 @param filename: Path to instance status file
403 @type data: list of tuple; (instance name as string, status as string)
404 @param data: Instance name and status
405
406 """
407 logging.debug("Updating instance status file '%s' with %s instances",
408 filename, len(data))
409
410 utils.WriteFile(filename,
411 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
412 sorted(data))))
413
416 """Writes an instance status file from L{Instance} objects.
417
418 @type filename: string
419 @param filename: Path to status file
420 @type instances: list of L{Instance}
421
422 """
423 _WriteInstanceStatus(filename, [(inst.name, inst.status)
424 for inst in instances])
425
428 """Reads an instance status file.
429
430 @type filename: string
431 @param filename: Path to status file
432 @rtype: tuple; (None or number, list of lists containing instance name and
433 status)
434 @return: File's mtime and instance status contained in the file; mtime is
435 C{None} if file can't be read
436
437 """
438 logging.debug("Reading per-group instance status from '%s'", filename)
439
440 statcb = utils.FileStatHelper()
441 try:
442 content = utils.ReadFile(filename, preread=statcb)
443 except EnvironmentError, err:
444 if err.errno == errno.ENOENT:
445 logging.error("Can't read '%s', does not exist (yet)", filename)
446 else:
447 logging.exception("Unable to read '%s', ignoring", filename)
448 return (None, None)
449 else:
450 return (statcb.st.st_mtime, [line.split(None, 1)
451 for line in content.splitlines()])
452
455 """Merges all per-group instance status files into a global one.
456
457 @type filename: string
458 @param filename: Path to global instance status file
459 @type pergroup_filename: string
460 @param pergroup_filename: Path to per-group status files, must contain "%s"
461 to be replaced with group UUID
462 @type groups: sequence
463 @param groups: UUIDs of known groups
464
465 """
466
467 lock = utils.FileLock.Open(filename)
468 try:
469 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
470 except errors.LockError, err:
471
472
473
474 logging.error("Can't acquire lock on instance status file '%s', not"
475 " updating: %s", filename, err)
476 return
477
478 logging.debug("Acquired exclusive lock on '%s'", filename)
479
480 data = {}
481
482
483 for group_uuid in groups:
484 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
485
486 if mtime is not None:
487 for (instance_name, status) in instdata:
488 data.setdefault(instance_name, []).append((mtime, status))
489
490
491 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
492 for (instance_name, status) in data.items()]
493
494
495
496 _WriteInstanceStatus(filename, inststatus)
497
524
527 """Starts a new instance of the watcher for every node group.
528
529 """
530 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
531 for arg in sys.argv)
532
533 result = cl.QueryGroups([], ["name", "uuid"], False)
534
535 children = []
536
537 for (idx, (name, uuid)) in enumerate(result):
538 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
539
540 if idx > 0:
541
542 time.sleep(CHILD_PROCESS_DELAY)
543
544 logging.debug("Spawning child for group '%s' (%s), arguments %s",
545 name, uuid, args)
546
547 try:
548
549 pid = os.spawnv(os.P_NOWAIT, args[0], args)
550 except Exception:
551 logging.exception("Failed to start child for group '%s' (%s)",
552 name, uuid)
553 else:
554 logging.debug("Started with PID %s", pid)
555 children.append(pid)
556
557 if wait:
558 for pid in children:
559 logging.debug("Waiting for child PID %s", pid)
560 try:
561 result = utils.RetryOnSignal(os.waitpid, pid, 0)
562 except EnvironmentError, err:
563 result = str(err)
564
565 logging.debug("Child PID %s exited with status %s", pid, result)
566
569 """Archives old jobs.
570
571 """
572 (arch_count, left_count) = cl.AutoArchiveJobs(age)
573 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
574
583
628
631 """Retrieves instances and nodes per node group.
632
633 """
634 job = [
635
636 opcodes.OpQuery(what=constants.QR_INSTANCE,
637 fields=["name", "status", "disks_active", "snodes",
638 "pnode.group.uuid", "snodes.group.uuid"],
639 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
640 use_locking=True),
641
642
643 opcodes.OpQuery(what=constants.QR_NODE,
644 fields=["name", "bootid", "offline"],
645 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
646 use_locking=True),
647 ]
648
649 job_id = cl.SubmitJob(job)
650 results = map(objects.QueryResponse.FromDict,
651 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
652 cl.ArchiveJob(job_id)
653
654 results_data = map(operator.attrgetter("data"), results)
655
656
657 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
658
659
660 (raw_instances, raw_nodes) = [[map(compat.snd, values)
661 for values in res]
662 for res in results_data]
663
664 secondaries = {}
665 instances = []
666
667
668 for (name, status, disks_active, snodes, pnode_group_uuid,
669 snodes_group_uuid) in raw_instances:
670 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
671 logging.error("Ignoring split instance '%s', primary group %s, secondary"
672 " groups %s", name, pnode_group_uuid,
673 utils.CommaJoin(snodes_group_uuid))
674 else:
675 instances.append(Instance(name, status, disks_active, snodes))
676
677 for node in snodes:
678 secondaries.setdefault(node, set()).add(name)
679
680
681 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
682 for (name, bootid, offline) in raw_nodes]
683
684 return (dict((node.name, node) for node in nodes),
685 dict((inst.name, inst) for inst in instances))
686
689 """Returns a list of all node groups known by L{ssconf}.
690
691 """
692 groups = ssconf.SimpleStore().GetNodegroupList()
693
694 result = list(line.split(None, 1)[0] for line in groups
695 if line.strip())
696
697 if not compat.all(map(utils.UUID_RE.match, result)):
698 raise errors.GenericError("Ssconf contains invalid group UUID")
699
700 return result
701
704 """Main function for per-group watcher process.
705
706 """
707 group_uuid = opts.nodegroup.lower()
708
709 if not utils.UUID_RE.match(group_uuid):
710 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
711 " got '%s'" %
712 (cli.NODEGROUP_OPT_NAME, group_uuid))
713
714 logging.info("Watcher for node group '%s'", group_uuid)
715
716 known_groups = _LoadKnownGroups()
717
718
719 if group_uuid not in known_groups:
720 raise errors.GenericError("Node group '%s' is not known by ssconf" %
721 group_uuid)
722
723
724
725 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
726 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
727
728 logging.debug("Using state file %s", state_path)
729
730
731 statefile = state.OpenStateFile(state_path)
732 if not statefile:
733 return constants.EXIT_FAILURE
734
735 notepad = state.WatcherState(statefile)
736 try:
737
738 client = GetLuxiClient(False)
739
740 _CheckMaster(client)
741
742 (nodes, instances) = _GetGroupData(client, group_uuid)
743
744
745 _UpdateInstanceStatus(inst_status_path, instances.values())
746
747 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
748 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
749 known_groups)
750
751 started = _CheckInstances(client, notepad, instances)
752 _CheckDisks(client, notepad, nodes, instances, started)
753 _VerifyDisks(client, group_uuid, nodes, instances)
754 except Exception, err:
755 logging.info("Not updating status file due to failure: %s", err)
756 raise
757 else:
758
759 notepad.Save(state_path)
760
761 return constants.EXIT_SUCCESS
762
765 """Main function.
766
767 """
768 (options, _) = ParseOptions()
769
770 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
771 debug=options.debug, stderr_logging=options.debug)
772
773 if ShouldPause() and not options.ignore_pause:
774 logging.debug("Pause has been set, exiting")
775 return constants.EXIT_SUCCESS
776
777
778 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
779 try:
780 lock.Shared(blocking=False)
781 except (EnvironmentError, errors.LockError), err:
782 logging.error("Can't acquire lock on %s: %s",
783 pathutils.WATCHER_LOCK_FILE, err)
784 return constants.EXIT_SUCCESS
785
786 if options.nodegroup is None:
787 fn = _GlobalWatcher
788 else:
789
790 fn = _GroupWatcher
791
792 try:
793 return fn(options)
794 except (SystemExit, KeyboardInterrupt):
795 raise
796 except NotMasterError:
797 logging.debug("Not master, exiting")
798 return constants.EXIT_NOTMASTER
799 except errors.ResolverError, err:
800 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
801 return constants.EXIT_NODESETUP_ERROR
802 except errors.JobQueueFull:
803 logging.error("Job queue is full, can't query cluster state")
804 except errors.JobQueueDrainError:
805 logging.error("Job queue is drained, can't maintain cluster state")
806 except Exception, err:
807 logging.exception(str(err))
808 return constants.EXIT_FAILURE
809
810 return constants.EXIT_SUCCESS
811