1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client
54
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
57
58
59 MAXTRIES = 5
60 BAD_STATES = frozenset([
61 constants.INSTST_ERRORDOWN,
62 ])
63 HELPLESS_STATES = frozenset([
64 constants.INSTST_NODEDOWN,
65 constants.INSTST_NODEOFFLINE,
66 ])
67 NOTICE = "NOTICE"
68 ERROR = "ERROR"
69
70
71 CHILD_PROCESS_DELAY = 1.0
72
73
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
78 """Exception raised when this host is not the master."""
79
86
96
99 """Run the watcher hooks.
100
101 """
102 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103 constants.HOOKS_NAME_WATCHER)
104 if not os.path.isdir(hooks_dir):
105 return
106
107 try:
108 results = utils.RunParts(hooks_dir)
109 except Exception, err:
110 logging.exception("RunParts %s failed: %s", hooks_dir, err)
111 return
112
113 for (relname, status, runresult) in results:
114 if status == constants.RUNPARTS_SKIP:
115 logging.debug("Watcher hook %s: skipped", relname)
116 elif status == constants.RUNPARTS_ERR:
117 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118 elif status == constants.RUNPARTS_RUN:
119 if runresult.failed:
120 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121 relname, runresult.exit_code, runresult.output)
122 else:
123 logging.debug("Watcher hook %s: success (output: %s)", relname,
124 runresult.output)
125 else:
126 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
127 status)
128
131 """Abstraction for a Virtual Machine instance.
132
133 """
134 - def __init__(self, name, status, autostart, snodes):
135 self.name = name
136 self.status = status
137 self.autostart = autostart
138 self.snodes = snodes
139
146
153
156 """Data container representing cluster node.
157
158 """
159 - def __init__(self, name, bootid, offline, secondaries):
160 """Initializes this class.
161
162 """
163 self.name = name
164 self.bootid = bootid
165 self.offline = offline
166 self.secondaries = secondaries
167
170 """Make a pass over the list of instances, restarting downed ones.
171
172 """
173 notepad.MaintainInstanceList(instances.keys())
174
175 started = set()
176
177 for inst in instances.values():
178 if inst.status in BAD_STATES:
179 n = notepad.NumberOfRestartAttempts(inst.name)
180
181 if n > MAXTRIES:
182 logging.warning("Not restarting instance '%s', retries exhausted",
183 inst.name)
184 continue
185
186 if n == MAXTRIES:
187 notepad.RecordRestartAttempt(inst.name)
188 logging.error("Could not restart instance '%s' after %s attempts,"
189 " giving up", inst.name, MAXTRIES)
190 continue
191
192 try:
193 logging.info("Restarting instance '%s' (attempt #%s)",
194 inst.name, n + 1)
195 inst.Restart(cl)
196 except Exception:
197 logging.exception("Error while restarting instance '%s'", inst.name)
198 else:
199 started.add(inst.name)
200
201 notepad.RecordRestartAttempt(inst.name)
202
203 else:
204 if notepad.NumberOfRestartAttempts(inst.name):
205 notepad.RemoveInstance(inst.name)
206 if inst.status not in HELPLESS_STATES:
207 logging.info("Restart of instance '%s' succeeded", inst.name)
208
209 return started
210
211
212 -def _CheckDisks(cl, notepad, nodes, instances, started):
213 """Check all nodes for restarted ones.
214
215 """
216 check_nodes = []
217
218 for node in nodes.values():
219 old = notepad.GetNodeBootID(node.name)
220 if not node.bootid:
221
222 if not node.offline:
223 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
224 node.name)
225 continue
226
227 if old != node.bootid:
228
229 check_nodes.append(node)
230
231 if check_nodes:
232
233
234 for node in check_nodes:
235 for instance_name in node.secondaries:
236 try:
237 inst = instances[instance_name]
238 except KeyError:
239 logging.info("Can't find instance '%s', maybe it was ignored",
240 instance_name)
241 continue
242
243 if not inst.autostart:
244 logging.info("Skipping disk activation for non-autostart"
245 " instance '%s'", inst.name)
246 continue
247
248 if inst.name in started:
249
250
251 logging.debug("Skipping disk activation for instance '%s' as"
252 " it was already started", inst.name)
253 continue
254
255 try:
256 logging.info("Activating disks for instance '%s'", inst.name)
257 inst.ActivateDisks(cl)
258 except Exception:
259 logging.exception("Error while activating disks for instance '%s'",
260 inst.name)
261
262
263 for node in check_nodes:
264 notepad.SetNodeBootID(node.name, node.bootid)
265
268 """Checks if given instances has any secondary in offline status.
269
270 @param instance: The instance object
271 @return: True if any of the secondary is offline, False otherwise
272
273 """
274 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275
278 """Run a per-group "gnt-cluster verify-disks".
279
280 """
281 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282 ((_, offline_disk_instances, _), ) = \
283 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284 cl.ArchiveJob(job_id)
285
286 if not offline_disk_instances:
287
288 logging.debug("Verify-disks reported no offline disks, nothing to do")
289 return
290
291 logging.debug("Will activate disks for instance(s) %s",
292 utils.CommaJoin(offline_disk_instances))
293
294
295
296 job = []
297 for name in offline_disk_instances:
298 try:
299 inst = instances[name]
300 except KeyError:
301 logging.info("Can't find instance '%s', maybe it was ignored", name)
302 continue
303
304 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305 logging.info("Skipping instance '%s' because it is in a helpless state or"
306 " has offline secondaries", name)
307 continue
308
309 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310
311 if job:
312 job_id = cli.SendJob(job, cl=cl)
313
314 try:
315 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316 except Exception:
317 logging.exception("Error while activating disks")
318
321 """Connects to RAPI port and does a simple test.
322
323 Connects to RAPI port of hostname and does a simple test. At this time, the
324 test is GetVersion.
325
326 @type hostname: string
327 @param hostname: hostname of the node to connect to.
328 @rtype: bool
329 @return: Whether RAPI is working properly
330
331 """
332 curl_config = rapi.client.GenericCurlConfig()
333 rapi_client = rapi.client.GanetiRapiClient(hostname,
334 curl_config_fn=curl_config)
335 try:
336 master_version = rapi_client.GetVersion()
337 except rapi.client.CertificateError, err:
338 logging.warning("RAPI certificate error: %s", err)
339 return False
340 except rapi.client.GanetiApiError, err:
341 logging.warning("RAPI error: %s", err)
342 return False
343 else:
344 logging.debug("Reported RAPI version %s", master_version)
345 return master_version == constants.RAPI_VERSION
346
349 """Parse the command line options.
350
351 @return: (options, args) as from OptionParser.parse_args()
352
353 """
354 parser = OptionParser(description="Ganeti cluster watcher",
355 usage="%prog [-d]",
356 version="%%prog (ganeti) %s" %
357 constants.RELEASE_VERSION)
358
359 parser.add_option(cli.DEBUG_OPT)
360 parser.add_option(cli.NODEGROUP_OPT)
361 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362 help="Autoarchive jobs older than this age (default"
363 " 6 hours)")
364 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365 action="store_true", help="Ignore cluster pause setting")
366 parser.add_option("--wait-children", dest="wait_children",
367 action="store_true", help="Wait for child processes")
368 parser.add_option("--no-wait-children", dest="wait_children",
369 action="store_false", help="Don't wait for child processes")
370
371 parser.set_defaults(wait_children=True)
372 options, args = parser.parse_args()
373 options.job_age = cli.ParseTimespec(options.job_age)
374
375 if args:
376 parser.error("No arguments expected")
377
378 return (options, args)
379
382 """Writes the per-group instance status file.
383
384 The entries are sorted.
385
386 @type filename: string
387 @param filename: Path to instance status file
388 @type data: list of tuple; (instance name as string, status as string)
389 @param data: Instance name and status
390
391 """
392 logging.debug("Updating instance status file '%s' with %s instances",
393 filename, len(data))
394
395 utils.WriteFile(filename,
396 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
397 sorted(data))))
398
401 """Writes an instance status file from L{Instance} objects.
402
403 @type filename: string
404 @param filename: Path to status file
405 @type instances: list of L{Instance}
406
407 """
408 _WriteInstanceStatus(filename, [(inst.name, inst.status)
409 for inst in instances])
410
413 """Helper to store file handle's C{fstat}.
414
415 """
417 """Initializes this class.
418
419 """
420 self.st = None
421
423 """Calls C{fstat} on file handle.
424
425 """
426 self.st = os.fstat(fh.fileno())
427
430 """Reads an instance status file.
431
432 @type filename: string
433 @param filename: Path to status file
434 @rtype: tuple; (None or number, list of lists containing instance name and
435 status)
436 @return: File's mtime and instance status contained in the file; mtime is
437 C{None} if file can't be read
438
439 """
440 logging.debug("Reading per-group instance status from '%s'", filename)
441
442 statcb = _StatCb()
443 try:
444 content = utils.ReadFile(filename, preread=statcb)
445 except EnvironmentError, err:
446 if err.errno == errno.ENOENT:
447 logging.error("Can't read '%s', does not exist (yet)", filename)
448 else:
449 logging.exception("Unable to read '%s', ignoring", filename)
450 return (None, None)
451 else:
452 return (statcb.st.st_mtime, [line.split(None, 1)
453 for line in content.splitlines()])
454
457 """Merges all per-group instance status files into a global one.
458
459 @type filename: string
460 @param filename: Path to global instance status file
461 @type pergroup_filename: string
462 @param pergroup_filename: Path to per-group status files, must contain "%s"
463 to be replaced with group UUID
464 @type groups: sequence
465 @param groups: UUIDs of known groups
466
467 """
468
469 lock = utils.FileLock.Open(filename)
470 try:
471 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
472 except errors.LockError, err:
473
474
475
476 logging.error("Can't acquire lock on instance status file '%s', not"
477 " updating: %s", filename, err)
478 return
479
480 logging.debug("Acquired exclusive lock on '%s'", filename)
481
482 data = {}
483
484
485 for group_uuid in groups:
486 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
487
488 if mtime is not None:
489 for (instance_name, status) in instdata:
490 data.setdefault(instance_name, []).append((mtime, status))
491
492
493 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
494 for (instance_name, status) in data.items()]
495
496
497
498 _WriteInstanceStatus(filename, inststatus)
499
526
529 """Starts a new instance of the watcher for every node group.
530
531 """
532 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
533 for arg in sys.argv)
534
535 result = cl.QueryGroups([], ["name", "uuid"], False)
536
537 children = []
538
539 for (idx, (name, uuid)) in enumerate(result):
540 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
541
542 if idx > 0:
543
544 time.sleep(CHILD_PROCESS_DELAY)
545
546 logging.debug("Spawning child for group '%s' (%s), arguments %s",
547 name, uuid, args)
548
549 try:
550
551 pid = os.spawnv(os.P_NOWAIT, args[0], args)
552 except Exception:
553 logging.exception("Failed to start child for group '%s' (%s)",
554 name, uuid)
555 else:
556 logging.debug("Started with PID %s", pid)
557 children.append(pid)
558
559 if wait:
560 for pid in children:
561 logging.debug("Waiting for child PID %s", pid)
562 try:
563 result = utils.RetryOnSignal(os.waitpid, pid, 0)
564 except EnvironmentError, err:
565 result = str(err)
566
567 logging.debug("Child PID %s exited with status %s", pid, result)
568
571 """Archives old jobs.
572
573 """
574 (arch_count, left_count) = cl.AutoArchiveJobs(age)
575 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
576
585
586
587 @rapi.client.UsesRapiClient
630
633 """Retrieves instances and nodes per node group.
634
635 """
636 job = [
637
638 opcodes.OpQuery(what=constants.QR_INSTANCE,
639 fields=["name", "status", "admin_state", "snodes",
640 "pnode.group.uuid", "snodes.group.uuid"],
641 filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
642 use_locking=True),
643
644
645 opcodes.OpQuery(what=constants.QR_NODE,
646 fields=["name", "bootid", "offline"],
647 filter=[qlang.OP_EQUAL, "group.uuid", uuid],
648 use_locking=True),
649 ]
650
651 job_id = cl.SubmitJob(job)
652 results = map(objects.QueryResponse.FromDict,
653 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
654 cl.ArchiveJob(job_id)
655
656 results_data = map(operator.attrgetter("data"), results)
657
658
659 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
660
661
662 (raw_instances, raw_nodes) = [[map(compat.snd, values)
663 for values in res]
664 for res in results_data]
665
666 secondaries = {}
667 instances = []
668
669
670 for (name, status, autostart, snodes, pnode_group_uuid,
671 snodes_group_uuid) in raw_instances:
672 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
673 logging.error("Ignoring split instance '%s', primary group %s, secondary"
674 " groups %s", name, pnode_group_uuid,
675 utils.CommaJoin(snodes_group_uuid))
676 else:
677 instances.append(Instance(name, status, autostart, snodes))
678
679 for node in snodes:
680 secondaries.setdefault(node, set()).add(name)
681
682
683 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
684 for (name, bootid, offline) in raw_nodes]
685
686 return (dict((node.name, node) for node in nodes),
687 dict((inst.name, inst) for inst in instances))
688
691 """Returns a list of all node groups known by L{ssconf}.
692
693 """
694 groups = ssconf.SimpleStore().GetNodegroupList()
695
696 result = list(line.split(None, 1)[0] for line in groups
697 if line.strip())
698
699 if not compat.all(map(utils.UUID_RE.match, result)):
700 raise errors.GenericError("Ssconf contains invalid group UUID")
701
702 return result
703
706 """Main function for per-group watcher process.
707
708 """
709 group_uuid = opts.nodegroup.lower()
710
711 if not utils.UUID_RE.match(group_uuid):
712 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
713 " got '%s'" %
714 (cli.NODEGROUP_OPT_NAME, group_uuid))
715
716 logging.info("Watcher for node group '%s'", group_uuid)
717
718 known_groups = _LoadKnownGroups()
719
720
721 if group_uuid not in known_groups:
722 raise errors.GenericError("Node group '%s' is not known by ssconf" %
723 group_uuid)
724
725
726 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
727 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
728
729 logging.debug("Using state file %s", state_path)
730
731
732 statefile = state.OpenStateFile(state_path)
733 if not statefile:
734 return constants.EXIT_FAILURE
735
736 notepad = state.WatcherState(statefile)
737 try:
738
739 client = GetLuxiClient(False)
740
741 _CheckMaster(client)
742
743 (nodes, instances) = _GetGroupData(client, group_uuid)
744
745
746 _UpdateInstanceStatus(inst_status_path, instances.values())
747
748 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
749 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
750 known_groups)
751
752 started = _CheckInstances(client, notepad, instances)
753 _CheckDisks(client, notepad, nodes, instances, started)
754 _VerifyDisks(client, group_uuid, nodes, instances)
755 except Exception, err:
756 logging.info("Not updating status file due to failure: %s", err)
757 raise
758 else:
759
760 notepad.Save(state_path)
761
762 return constants.EXIT_SUCCESS
763
766 """Main function.
767
768 """
769 (options, _) = ParseOptions()
770
771 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
772 debug=options.debug, stderr_logging=options.debug)
773
774 if ShouldPause() and not options.ignore_pause:
775 logging.debug("Pause has been set, exiting")
776 return constants.EXIT_SUCCESS
777
778
779 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
780 try:
781 lock.Shared(blocking=False)
782 except (EnvironmentError, errors.LockError), err:
783 logging.error("Can't acquire lock on %s: %s",
784 constants.WATCHER_LOCK_FILE, err)
785 return constants.EXIT_SUCCESS
786
787 if options.nodegroup is None:
788 fn = _GlobalWatcher
789 else:
790
791 fn = _GroupWatcher
792
793 try:
794 return fn(options)
795 except (SystemExit, KeyboardInterrupt):
796 raise
797 except NotMasterError:
798 logging.debug("Not master, exiting")
799 return constants.EXIT_NOTMASTER
800 except errors.ResolverError, err:
801 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
802 return constants.EXIT_NODESETUP_ERROR
803 except errors.JobQueueFull:
804 logging.error("Job queue is full, can't query cluster state")
805 except errors.JobQueueDrainError:
806 logging.error("Job queue is drained, can't maintain cluster state")
807 except Exception, err:
808 logging.exception(str(err))
809 return constants.EXIT_FAILURE
810
811 return constants.EXIT_SUCCESS
812