1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52
53 import ganeti.rapi.client
54 from ganeti.rapi.client import UsesRapiClient
55
56 from ganeti.watcher import nodemaint
57 from ganeti.watcher import state
58
59
60 MAXTRIES = 5
61 BAD_STATES = frozenset([
62 constants.INSTST_ERRORDOWN,
63 ])
64 HELPLESS_STATES = frozenset([
65 constants.INSTST_NODEDOWN,
66 constants.INSTST_NODEOFFLINE,
67 ])
68 NOTICE = "NOTICE"
69 ERROR = "ERROR"
70
71
72 CHILD_PROCESS_DELAY = 1.0
73
74
75 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
79 """Exception raised when this host is not the master."""
80
87
98
101 """Run the watcher hooks.
102
103 """
104 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
105 constants.HOOKS_NAME_WATCHER)
106 if not os.path.isdir(hooks_dir):
107 return
108
109 try:
110 results = utils.RunParts(hooks_dir)
111 except Exception, err:
112 logging.exception("RunParts %s failed: %s", hooks_dir, err)
113 return
114
115 for (relname, status, runresult) in results:
116 if status == constants.RUNPARTS_SKIP:
117 logging.debug("Watcher hook %s: skipped", relname)
118 elif status == constants.RUNPARTS_ERR:
119 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
120 elif status == constants.RUNPARTS_RUN:
121 if runresult.failed:
122 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
123 relname, runresult.exit_code, runresult.output)
124 else:
125 logging.debug("Watcher hook %s: success (output: %s)", relname,
126 runresult.output)
127 else:
128 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
129 status)
130
133 """Abstraction for a Virtual Machine instance.
134
135 """
136 - def __init__(self, name, status, autostart, snodes):
137 self.name = name
138 self.status = status
139 self.autostart = autostart
140 self.snodes = snodes
141
148
155
158 """Data container representing cluster node.
159
160 """
161 - def __init__(self, name, bootid, offline, secondaries):
162 """Initializes this class.
163
164 """
165 self.name = name
166 self.bootid = bootid
167 self.offline = offline
168 self.secondaries = secondaries
169
172 """Make a pass over the list of instances, restarting downed ones.
173
174 """
175 notepad.MaintainInstanceList(instances.keys())
176
177 started = set()
178
179 for inst in instances.values():
180 if inst.status in BAD_STATES:
181 n = notepad.NumberOfRestartAttempts(inst.name)
182
183 if n > MAXTRIES:
184 logging.warning("Not restarting instance '%s', retries exhausted",
185 inst.name)
186 continue
187
188 if n == MAXTRIES:
189 notepad.RecordRestartAttempt(inst.name)
190 logging.error("Could not restart instance '%s' after %s attempts,"
191 " giving up", inst.name, MAXTRIES)
192 continue
193
194 try:
195 logging.info("Restarting instance '%s' (attempt #%s)",
196 inst.name, n + 1)
197 inst.Restart(cl)
198 except Exception:
199 logging.exception("Error while restarting instance '%s'", inst.name)
200 else:
201 started.add(inst.name)
202
203 notepad.RecordRestartAttempt(inst.name)
204
205 else:
206 if notepad.NumberOfRestartAttempts(inst.name):
207 notepad.RemoveInstance(inst.name)
208 if inst.status not in HELPLESS_STATES:
209 logging.info("Restart of instance '%s' succeeded", inst.name)
210
211 return started
212
213
214 -def _CheckDisks(cl, notepad, nodes, instances, started):
215 """Check all nodes for restarted ones.
216
217 """
218 check_nodes = []
219
220 for node in nodes.values():
221 old = notepad.GetNodeBootID(node.name)
222 if not node.bootid:
223
224 if not node.offline:
225 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
226 node.name)
227 continue
228
229 if old != node.bootid:
230
231 check_nodes.append(node)
232
233 if check_nodes:
234
235
236 for node in check_nodes:
237 for instance_name in node.secondaries:
238 try:
239 inst = instances[instance_name]
240 except KeyError:
241 logging.info("Can't find instance '%s', maybe it was ignored",
242 instance_name)
243 continue
244
245 if not inst.autostart:
246 logging.info("Skipping disk activation for non-autostart"
247 " instance '%s'", inst.name)
248 continue
249
250 if inst.name in started:
251
252
253 logging.debug("Skipping disk activation for instance '%s' as"
254 " it was already started", inst.name)
255 continue
256
257 try:
258 logging.info("Activating disks for instance '%s'", inst.name)
259 inst.ActivateDisks(cl)
260 except Exception:
261 logging.exception("Error while activating disks for instance '%s'",
262 inst.name)
263
264
265 for node in check_nodes:
266 notepad.SetNodeBootID(node.name, node.bootid)
267
270 """Checks if given instances has any secondary in offline status.
271
272 @param instance: The instance object
273 @return: True if any of the secondary is offline, False otherwise
274
275 """
276 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277
280 """Run a per-group "gnt-cluster verify-disks".
281
282 """
283 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
284 ((_, offline_disk_instances, _), ) = \
285 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
286 cl.ArchiveJob(job_id)
287
288 if not offline_disk_instances:
289
290 logging.debug("Verify-disks reported no offline disks, nothing to do")
291 return
292
293 logging.debug("Will activate disks for instance(s) %s",
294 utils.CommaJoin(offline_disk_instances))
295
296
297
298 job = []
299 for name in offline_disk_instances:
300 try:
301 inst = instances[name]
302 except KeyError:
303 logging.info("Can't find instance '%s', maybe it was ignored", name)
304 continue
305
306 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
307 logging.info("Skipping instance '%s' because it is in a helpless state"
308 " or has offline secondaries", name)
309 continue
310
311 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312
313 if job:
314 job_id = cli.SendJob(job, cl=cl)
315
316 try:
317 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
318 except Exception:
319 logging.exception("Error while activating disks")
320
323 """Connects to RAPI port and does a simple test.
324
325 Connects to RAPI port of hostname and does a simple test. At this time, the
326 test is GetVersion.
327
328 @type hostname: string
329 @param hostname: hostname of the node to connect to.
330 @rtype: bool
331 @return: Whether RAPI is working properly
332
333 """
334 curl_config = rapi.client.GenericCurlConfig()
335 rapi_client = rapi.client.GanetiRapiClient(hostname,
336 curl_config_fn=curl_config)
337 try:
338 master_version = rapi_client.GetVersion()
339 except rapi.client.CertificateError, err:
340 logging.warning("RAPI certificate error: %s", err)
341 return False
342 except rapi.client.GanetiApiError, err:
343 logging.warning("RAPI error: %s", err)
344 return False
345 else:
346 logging.debug("Reported RAPI version %s", master_version)
347 return master_version == constants.RAPI_VERSION
348
351 """Parse the command line options.
352
353 @return: (options, args) as from OptionParser.parse_args()
354
355 """
356 parser = OptionParser(description="Ganeti cluster watcher",
357 usage="%prog [-d]",
358 version="%%prog (ganeti) %s" %
359 constants.RELEASE_VERSION)
360
361 parser.add_option(cli.DEBUG_OPT)
362 parser.add_option(cli.NODEGROUP_OPT)
363 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
364 help="Autoarchive jobs older than this age (default"
365 " 6 hours)")
366 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
367 action="store_true", help="Ignore cluster pause setting")
368 parser.add_option("--wait-children", dest="wait_children",
369 action="store_true", help="Wait for child processes")
370 parser.add_option("--no-wait-children", dest="wait_children",
371 action="store_false",
372 help="Don't wait for child processes")
373
374 parser.set_defaults(wait_children=True)
375 options, args = parser.parse_args()
376 options.job_age = cli.ParseTimespec(options.job_age)
377
378 if args:
379 parser.error("No arguments expected")
380
381 return (options, args)
382
385 """Writes the per-group instance status file.
386
387 The entries are sorted.
388
389 @type filename: string
390 @param filename: Path to instance status file
391 @type data: list of tuple; (instance name as string, status as string)
392 @param data: Instance name and status
393
394 """
395 logging.debug("Updating instance status file '%s' with %s instances",
396 filename, len(data))
397
398 utils.WriteFile(filename,
399 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
400 sorted(data))))
401
404 """Writes an instance status file from L{Instance} objects.
405
406 @type filename: string
407 @param filename: Path to status file
408 @type instances: list of L{Instance}
409
410 """
411 _WriteInstanceStatus(filename, [(inst.name, inst.status)
412 for inst in instances])
413
416 """Reads an instance status file.
417
418 @type filename: string
419 @param filename: Path to status file
420 @rtype: tuple; (None or number, list of lists containing instance name and
421 status)
422 @return: File's mtime and instance status contained in the file; mtime is
423 C{None} if file can't be read
424
425 """
426 logging.debug("Reading per-group instance status from '%s'", filename)
427
428 statcb = utils.FileStatHelper()
429 try:
430 content = utils.ReadFile(filename, preread=statcb)
431 except EnvironmentError, err:
432 if err.errno == errno.ENOENT:
433 logging.error("Can't read '%s', does not exist (yet)", filename)
434 else:
435 logging.exception("Unable to read '%s', ignoring", filename)
436 return (None, None)
437 else:
438 return (statcb.st.st_mtime, [line.split(None, 1)
439 for line in content.splitlines()])
440
443 """Merges all per-group instance status files into a global one.
444
445 @type filename: string
446 @param filename: Path to global instance status file
447 @type pergroup_filename: string
448 @param pergroup_filename: Path to per-group status files, must contain "%s"
449 to be replaced with group UUID
450 @type groups: sequence
451 @param groups: UUIDs of known groups
452
453 """
454
455 lock = utils.FileLock.Open(filename)
456 try:
457 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
458 except errors.LockError, err:
459
460
461
462 logging.error("Can't acquire lock on instance status file '%s', not"
463 " updating: %s", filename, err)
464 return
465
466 logging.debug("Acquired exclusive lock on '%s'", filename)
467
468 data = {}
469
470
471 for group_uuid in groups:
472 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
473
474 if mtime is not None:
475 for (instance_name, status) in instdata:
476 data.setdefault(instance_name, []).append((mtime, status))
477
478
479 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
480 for (instance_name, status) in data.items()]
481
482
483
484 _WriteInstanceStatus(filename, inststatus)
485
512
515 """Starts a new instance of the watcher for every node group.
516
517 """
518 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
519 for arg in sys.argv)
520
521 result = cl.QueryGroups([], ["name", "uuid"], False)
522
523 children = []
524
525 for (idx, (name, uuid)) in enumerate(result):
526 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
527
528 if idx > 0:
529
530 time.sleep(CHILD_PROCESS_DELAY)
531
532 logging.debug("Spawning child for group '%s' (%s), arguments %s",
533 name, uuid, args)
534
535 try:
536
537 pid = os.spawnv(os.P_NOWAIT, args[0], args)
538 except Exception:
539 logging.exception("Failed to start child for group '%s' (%s)",
540 name, uuid)
541 else:
542 logging.debug("Started with PID %s", pid)
543 children.append(pid)
544
545 if wait:
546 for pid in children:
547 logging.debug("Waiting for child PID %s", pid)
548 try:
549 result = utils.RetryOnSignal(os.waitpid, pid, 0)
550 except EnvironmentError, err:
551 result = str(err)
552
553 logging.debug("Child PID %s exited with status %s", pid, result)
554
557 """Archives old jobs.
558
559 """
560 (arch_count, left_count) = cl.AutoArchiveJobs(age)
561 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
562
571
616
619 """Retrieves instances and nodes per node group.
620
621 """
622 job = [
623
624 opcodes.OpQuery(what=constants.QR_INSTANCE,
625 fields=["name", "status", "admin_state", "snodes",
626 "pnode.group.uuid", "snodes.group.uuid"],
627 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
628 use_locking=True),
629
630
631 opcodes.OpQuery(what=constants.QR_NODE,
632 fields=["name", "bootid", "offline"],
633 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
634 use_locking=True),
635 ]
636
637 job_id = cl.SubmitJob(job)
638 results = map(objects.QueryResponse.FromDict,
639 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
640 cl.ArchiveJob(job_id)
641
642 results_data = map(operator.attrgetter("data"), results)
643
644
645 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
646
647
648 (raw_instances, raw_nodes) = [[map(compat.snd, values)
649 for values in res]
650 for res in results_data]
651
652 secondaries = {}
653 instances = []
654
655
656 for (name, status, autostart, snodes, pnode_group_uuid,
657 snodes_group_uuid) in raw_instances:
658 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
659 logging.error("Ignoring split instance '%s', primary group %s, secondary"
660 " groups %s", name, pnode_group_uuid,
661 utils.CommaJoin(snodes_group_uuid))
662 else:
663 instances.append(Instance(name, status, autostart, snodes))
664
665 for node in snodes:
666 secondaries.setdefault(node, set()).add(name)
667
668
669 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
670 for (name, bootid, offline) in raw_nodes]
671
672 return (dict((node.name, node) for node in nodes),
673 dict((inst.name, inst) for inst in instances))
674
677 """Returns a list of all node groups known by L{ssconf}.
678
679 """
680 groups = ssconf.SimpleStore().GetNodegroupList()
681
682 result = list(line.split(None, 1)[0] for line in groups
683 if line.strip())
684
685 if not compat.all(map(utils.UUID_RE.match, result)):
686 raise errors.GenericError("Ssconf contains invalid group UUID")
687
688 return result
689
692 """Main function for per-group watcher process.
693
694 """
695 group_uuid = opts.nodegroup.lower()
696
697 if not utils.UUID_RE.match(group_uuid):
698 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
699 " got '%s'" %
700 (cli.NODEGROUP_OPT_NAME, group_uuid))
701
702 logging.info("Watcher for node group '%s'", group_uuid)
703
704 known_groups = _LoadKnownGroups()
705
706
707 if group_uuid not in known_groups:
708 raise errors.GenericError("Node group '%s' is not known by ssconf" %
709 group_uuid)
710
711
712
713 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
714 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
715
716 logging.debug("Using state file %s", state_path)
717
718
719 statefile = state.OpenStateFile(state_path)
720 if not statefile:
721 return constants.EXIT_FAILURE
722
723 notepad = state.WatcherState(statefile)
724 try:
725
726 client = GetLuxiClient(False)
727
728 _CheckMaster(client)
729
730 (nodes, instances) = _GetGroupData(client, group_uuid)
731
732
733 _UpdateInstanceStatus(inst_status_path, instances.values())
734
735 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
736 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
737 known_groups)
738
739 started = _CheckInstances(client, notepad, instances)
740 _CheckDisks(client, notepad, nodes, instances, started)
741 _VerifyDisks(client, group_uuid, nodes, instances)
742 except Exception, err:
743 logging.info("Not updating status file due to failure: %s", err)
744 raise
745 else:
746
747 notepad.Save(state_path)
748
749 return constants.EXIT_SUCCESS
750
753 """Main function.
754
755 """
756 (options, _) = ParseOptions()
757
758 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
759 debug=options.debug, stderr_logging=options.debug)
760
761 if ShouldPause() and not options.ignore_pause:
762 logging.debug("Pause has been set, exiting")
763 return constants.EXIT_SUCCESS
764
765
766 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
767 try:
768 lock.Shared(blocking=False)
769 except (EnvironmentError, errors.LockError), err:
770 logging.error("Can't acquire lock on %s: %s",
771 constants.WATCHER_LOCK_FILE, err)
772 return constants.EXIT_SUCCESS
773
774 if options.nodegroup is None:
775 fn = _GlobalWatcher
776 else:
777
778 fn = _GroupWatcher
779
780 try:
781 return fn(options)
782 except (SystemExit, KeyboardInterrupt):
783 raise
784 except NotMasterError:
785 logging.debug("Not master, exiting")
786 return constants.EXIT_NOTMASTER
787 except errors.ResolverError, err:
788 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
789 return constants.EXIT_NODESETUP_ERROR
790 except errors.JobQueueFull:
791 logging.error("Job queue is full, can't query cluster state")
792 except errors.JobQueueDrainError:
793 logging.error("Job queue is drained, can't maintain cluster state")
794 except Exception, err:
795 logging.exception(str(err))
796 return constants.EXIT_FAILURE
797
798 return constants.EXIT_SUCCESS
799