1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
64 ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
68 ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72
73 CHILD_PROCESS_DELAY = 1.0
74
75
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
80 """Exception raised when this host is not the master."""
81
88
99
102 """Run the watcher hooks.
103
104 """
105 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
106 constants.HOOKS_NAME_WATCHER)
107 if not os.path.isdir(hooks_dir):
108 return
109
110 try:
111 results = utils.RunParts(hooks_dir)
112 except Exception, err:
113 logging.exception("RunParts %s failed: %s", hooks_dir, err)
114 return
115
116 for (relname, status, runresult) in results:
117 if status == constants.RUNPARTS_SKIP:
118 logging.debug("Watcher hook %s: skipped", relname)
119 elif status == constants.RUNPARTS_ERR:
120 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
121 elif status == constants.RUNPARTS_RUN:
122 if runresult.failed:
123 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
124 relname, runresult.exit_code, runresult.output)
125 else:
126 logging.debug("Watcher hook %s: success (output: %s)", relname,
127 runresult.output)
128 else:
129 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
130 status)
131
134 """Abstraction for a Virtual Machine instance.
135
136 """
137 - def __init__(self, name, status, autostart, snodes):
138 self.name = name
139 self.status = status
140 self.autostart = autostart
141 self.snodes = snodes
142
149
156
159 """Data container representing cluster node.
160
161 """
162 - def __init__(self, name, bootid, offline, secondaries):
163 """Initializes this class.
164
165 """
166 self.name = name
167 self.bootid = bootid
168 self.offline = offline
169 self.secondaries = secondaries
170
173 """Make a pass over the list of instances, restarting downed ones.
174
175 """
176 notepad.MaintainInstanceList(instances.keys())
177
178 started = set()
179
180 for inst in instances.values():
181 if inst.status in BAD_STATES:
182 n = notepad.NumberOfRestartAttempts(inst.name)
183
184 if n > MAXTRIES:
185 logging.warning("Not restarting instance '%s', retries exhausted",
186 inst.name)
187 continue
188
189 if n == MAXTRIES:
190 notepad.RecordRestartAttempt(inst.name)
191 logging.error("Could not restart instance '%s' after %s attempts,"
192 " giving up", inst.name, MAXTRIES)
193 continue
194
195 try:
196 logging.info("Restarting instance '%s' (attempt #%s)",
197 inst.name, n + 1)
198 inst.Restart(cl)
199 except Exception:
200 logging.exception("Error while restarting instance '%s'", inst.name)
201 else:
202 started.add(inst.name)
203
204 notepad.RecordRestartAttempt(inst.name)
205
206 else:
207 if notepad.NumberOfRestartAttempts(inst.name):
208 notepad.RemoveInstance(inst.name)
209 if inst.status not in HELPLESS_STATES:
210 logging.info("Restart of instance '%s' succeeded", inst.name)
211
212 return started
213
214
215 -def _CheckDisks(cl, notepad, nodes, instances, started):
216 """Check all nodes for restarted ones.
217
218 """
219 check_nodes = []
220
221 for node in nodes.values():
222 old = notepad.GetNodeBootID(node.name)
223 if not node.bootid:
224
225 if not node.offline:
226 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
227 node.name)
228 continue
229
230 if old != node.bootid:
231
232 check_nodes.append(node)
233
234 if check_nodes:
235
236
237 for node in check_nodes:
238 for instance_name in node.secondaries:
239 try:
240 inst = instances[instance_name]
241 except KeyError:
242 logging.info("Can't find instance '%s', maybe it was ignored",
243 instance_name)
244 continue
245
246 if not inst.autostart:
247 logging.info("Skipping disk activation for non-autostart"
248 " instance '%s'", inst.name)
249 continue
250
251 if inst.name in started:
252
253
254 logging.debug("Skipping disk activation for instance '%s' as"
255 " it was already started", inst.name)
256 continue
257
258 try:
259 logging.info("Activating disks for instance '%s'", inst.name)
260 inst.ActivateDisks(cl)
261 except Exception:
262 logging.exception("Error while activating disks for instance '%s'",
263 inst.name)
264
265
266 for node in check_nodes:
267 notepad.SetNodeBootID(node.name, node.bootid)
268
271 """Checks if given instances has any secondary in offline status.
272
273 @param instance: The instance object
274 @return: True if any of the secondary is offline, False otherwise
275
276 """
277 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278
281 """Run a per-group "gnt-cluster verify-disks".
282
283 """
284 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
285 ((_, offline_disk_instances, _), ) = \
286 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
287 cl.ArchiveJob(job_id)
288
289 if not offline_disk_instances:
290
291 logging.debug("Verify-disks reported no offline disks, nothing to do")
292 return
293
294 logging.debug("Will activate disks for instance(s) %s",
295 utils.CommaJoin(offline_disk_instances))
296
297
298
299 job = []
300 for name in offline_disk_instances:
301 try:
302 inst = instances[name]
303 except KeyError:
304 logging.info("Can't find instance '%s', maybe it was ignored", name)
305 continue
306
307 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
308 logging.info("Skipping instance '%s' because it is in a helpless state"
309 " or has offline secondaries", name)
310 continue
311
312 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
313
314 if job:
315 job_id = cli.SendJob(job, cl=cl)
316
317 try:
318 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
319 except Exception:
320 logging.exception("Error while activating disks")
321
324 """Connects to RAPI port and does a simple test.
325
326 Connects to RAPI port of hostname and does a simple test. At this time, the
327 test is GetVersion.
328
329 @type hostname: string
330 @param hostname: hostname of the node to connect to.
331 @rtype: bool
332 @return: Whether RAPI is working properly
333
334 """
335 curl_config = rapi.client.GenericCurlConfig()
336 rapi_client = rapi.client.GanetiRapiClient(hostname,
337 curl_config_fn=curl_config)
338 try:
339 master_version = rapi_client.GetVersion()
340 except rapi.client.CertificateError, err:
341 logging.warning("RAPI certificate error: %s", err)
342 return False
343 except rapi.client.GanetiApiError, err:
344 logging.warning("RAPI error: %s", err)
345 return False
346 else:
347 logging.debug("Reported RAPI version %s", master_version)
348 return master_version == constants.RAPI_VERSION
349
352 """Parse the command line options.
353
354 @return: (options, args) as from OptionParser.parse_args()
355
356 """
357 parser = OptionParser(description="Ganeti cluster watcher",
358 usage="%prog [-d]",
359 version="%%prog (ganeti) %s" %
360 constants.RELEASE_VERSION)
361
362 parser.add_option(cli.DEBUG_OPT)
363 parser.add_option(cli.NODEGROUP_OPT)
364 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
365 help="Autoarchive jobs older than this age (default"
366 " 6 hours)")
367 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
368 action="store_true", help="Ignore cluster pause setting")
369 parser.add_option("--wait-children", dest="wait_children",
370 action="store_true", help="Wait for child processes")
371 parser.add_option("--no-wait-children", dest="wait_children",
372 action="store_false",
373 help="Don't wait for child processes")
374
375 parser.set_defaults(wait_children=True)
376 options, args = parser.parse_args()
377 options.job_age = cli.ParseTimespec(options.job_age)
378
379 if args:
380 parser.error("No arguments expected")
381
382 return (options, args)
383
386 """Writes the per-group instance status file.
387
388 The entries are sorted.
389
390 @type filename: string
391 @param filename: Path to instance status file
392 @type data: list of tuple; (instance name as string, status as string)
393 @param data: Instance name and status
394
395 """
396 logging.debug("Updating instance status file '%s' with %s instances",
397 filename, len(data))
398
399 utils.WriteFile(filename,
400 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
401 sorted(data))))
402
405 """Writes an instance status file from L{Instance} objects.
406
407 @type filename: string
408 @param filename: Path to status file
409 @type instances: list of L{Instance}
410
411 """
412 _WriteInstanceStatus(filename, [(inst.name, inst.status)
413 for inst in instances])
414
417 """Reads an instance status file.
418
419 @type filename: string
420 @param filename: Path to status file
421 @rtype: tuple; (None or number, list of lists containing instance name and
422 status)
423 @return: File's mtime and instance status contained in the file; mtime is
424 C{None} if file can't be read
425
426 """
427 logging.debug("Reading per-group instance status from '%s'", filename)
428
429 statcb = utils.FileStatHelper()
430 try:
431 content = utils.ReadFile(filename, preread=statcb)
432 except EnvironmentError, err:
433 if err.errno == errno.ENOENT:
434 logging.error("Can't read '%s', does not exist (yet)", filename)
435 else:
436 logging.exception("Unable to read '%s', ignoring", filename)
437 return (None, None)
438 else:
439 return (statcb.st.st_mtime, [line.split(None, 1)
440 for line in content.splitlines()])
441
444 """Merges all per-group instance status files into a global one.
445
446 @type filename: string
447 @param filename: Path to global instance status file
448 @type pergroup_filename: string
449 @param pergroup_filename: Path to per-group status files, must contain "%s"
450 to be replaced with group UUID
451 @type groups: sequence
452 @param groups: UUIDs of known groups
453
454 """
455
456 lock = utils.FileLock.Open(filename)
457 try:
458 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
459 except errors.LockError, err:
460
461
462
463 logging.error("Can't acquire lock on instance status file '%s', not"
464 " updating: %s", filename, err)
465 return
466
467 logging.debug("Acquired exclusive lock on '%s'", filename)
468
469 data = {}
470
471
472 for group_uuid in groups:
473 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
474
475 if mtime is not None:
476 for (instance_name, status) in instdata:
477 data.setdefault(instance_name, []).append((mtime, status))
478
479
480 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
481 for (instance_name, status) in data.items()]
482
483
484
485 _WriteInstanceStatus(filename, inststatus)
486
513
516 """Starts a new instance of the watcher for every node group.
517
518 """
519 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
520 for arg in sys.argv)
521
522 result = cl.QueryGroups([], ["name", "uuid"], False)
523
524 children = []
525
526 for (idx, (name, uuid)) in enumerate(result):
527 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
528
529 if idx > 0:
530
531 time.sleep(CHILD_PROCESS_DELAY)
532
533 logging.debug("Spawning child for group '%s' (%s), arguments %s",
534 name, uuid, args)
535
536 try:
537
538 pid = os.spawnv(os.P_NOWAIT, args[0], args)
539 except Exception:
540 logging.exception("Failed to start child for group '%s' (%s)",
541 name, uuid)
542 else:
543 logging.debug("Started with PID %s", pid)
544 children.append(pid)
545
546 if wait:
547 for pid in children:
548 logging.debug("Waiting for child PID %s", pid)
549 try:
550 result = utils.RetryOnSignal(os.waitpid, pid, 0)
551 except EnvironmentError, err:
552 result = str(err)
553
554 logging.debug("Child PID %s exited with status %s", pid, result)
555
558 """Archives old jobs.
559
560 """
561 (arch_count, left_count) = cl.AutoArchiveJobs(age)
562 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
563
572
617
620 """Retrieves instances and nodes per node group.
621
622 """
623 job = [
624
625 opcodes.OpQuery(what=constants.QR_INSTANCE,
626 fields=["name", "status", "admin_state", "snodes",
627 "pnode.group.uuid", "snodes.group.uuid"],
628 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
629 use_locking=True),
630
631
632 opcodes.OpQuery(what=constants.QR_NODE,
633 fields=["name", "bootid", "offline"],
634 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
635 use_locking=True),
636 ]
637
638 job_id = cl.SubmitJob(job)
639 results = map(objects.QueryResponse.FromDict,
640 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
641 cl.ArchiveJob(job_id)
642
643 results_data = map(operator.attrgetter("data"), results)
644
645
646 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
647
648
649 (raw_instances, raw_nodes) = [[map(compat.snd, values)
650 for values in res]
651 for res in results_data]
652
653 secondaries = {}
654 instances = []
655
656
657 for (name, status, autostart, snodes, pnode_group_uuid,
658 snodes_group_uuid) in raw_instances:
659 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
660 logging.error("Ignoring split instance '%s', primary group %s, secondary"
661 " groups %s", name, pnode_group_uuid,
662 utils.CommaJoin(snodes_group_uuid))
663 else:
664 instances.append(Instance(name, status, autostart, snodes))
665
666 for node in snodes:
667 secondaries.setdefault(node, set()).add(name)
668
669
670 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
671 for (name, bootid, offline) in raw_nodes]
672
673 return (dict((node.name, node) for node in nodes),
674 dict((inst.name, inst) for inst in instances))
675
678 """Returns a list of all node groups known by L{ssconf}.
679
680 """
681 groups = ssconf.SimpleStore().GetNodegroupList()
682
683 result = list(line.split(None, 1)[0] for line in groups
684 if line.strip())
685
686 if not compat.all(map(utils.UUID_RE.match, result)):
687 raise errors.GenericError("Ssconf contains invalid group UUID")
688
689 return result
690
693 """Main function for per-group watcher process.
694
695 """
696 group_uuid = opts.nodegroup.lower()
697
698 if not utils.UUID_RE.match(group_uuid):
699 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
700 " got '%s'" %
701 (cli.NODEGROUP_OPT_NAME, group_uuid))
702
703 logging.info("Watcher for node group '%s'", group_uuid)
704
705 known_groups = _LoadKnownGroups()
706
707
708 if group_uuid not in known_groups:
709 raise errors.GenericError("Node group '%s' is not known by ssconf" %
710 group_uuid)
711
712
713
714 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
715 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
716
717 logging.debug("Using state file %s", state_path)
718
719
720 statefile = state.OpenStateFile(state_path)
721 if not statefile:
722 return constants.EXIT_FAILURE
723
724 notepad = state.WatcherState(statefile)
725 try:
726
727 client = GetLuxiClient(False)
728
729 _CheckMaster(client)
730
731 (nodes, instances) = _GetGroupData(client, group_uuid)
732
733
734 _UpdateInstanceStatus(inst_status_path, instances.values())
735
736 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
737 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
738 known_groups)
739
740 started = _CheckInstances(client, notepad, instances)
741 _CheckDisks(client, notepad, nodes, instances, started)
742 _VerifyDisks(client, group_uuid, nodes, instances)
743 except Exception, err:
744 logging.info("Not updating status file due to failure: %s", err)
745 raise
746 else:
747
748 notepad.Save(state_path)
749
750 return constants.EXIT_SUCCESS
751
754 """Main function.
755
756 """
757 (options, _) = ParseOptions()
758
759 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
760 debug=options.debug, stderr_logging=options.debug)
761
762 if ShouldPause() and not options.ignore_pause:
763 logging.debug("Pause has been set, exiting")
764 return constants.EXIT_SUCCESS
765
766
767 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
768 try:
769 lock.Shared(blocking=False)
770 except (EnvironmentError, errors.LockError), err:
771 logging.error("Can't acquire lock on %s: %s",
772 pathutils.WATCHER_LOCK_FILE, err)
773 return constants.EXIT_SUCCESS
774
775 if options.nodegroup is None:
776 fn = _GlobalWatcher
777 else:
778
779 fn = _GroupWatcher
780
781 try:
782 return fn(options)
783 except (SystemExit, KeyboardInterrupt):
784 raise
785 except NotMasterError:
786 logging.debug("Not master, exiting")
787 return constants.EXIT_NOTMASTER
788 except errors.ResolverError, err:
789 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
790 return constants.EXIT_NODESETUP_ERROR
791 except errors.JobQueueFull:
792 logging.error("Job queue is full, can't query cluster state")
793 except errors.JobQueueDrainError:
794 logging.error("Job queue is drained, can't maintain cluster state")
795 except Exception, err:
796 logging.exception(str(err))
797 return constants.EXIT_FAILURE
798
799 return constants.EXIT_SUCCESS
800