1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import time
43 import logging
44 import operator
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import constants
50 from ganeti import compat
51 from ganeti import errors
52 from ganeti import opcodes
53 from ganeti import cli
54 import ganeti.rpc.node as rpc
55 import ganeti.rpc.errors as rpcerr
56 from ganeti import rapi
57 from ganeti import netutils
58 from ganeti import qlang
59 from ganeti import objects
60 from ganeti import ssconf
61 from ganeti import ht
62 from ganeti import pathutils
63
64 import ganeti.rapi.client
65 from ganeti.rapi.client import UsesRapiClient
66
67 from ganeti.watcher import nodemaint
68 from ganeti.watcher import state
69
70
71 MAXTRIES = 5
72 BAD_STATES = compat.UniqueFrozenset([
73 constants.INSTST_ERRORDOWN,
74 ])
75 HELPLESS_STATES = compat.UniqueFrozenset([
76 constants.INSTST_NODEDOWN,
77 constants.INSTST_NODEOFFLINE,
78 ])
79 NOTICE = "NOTICE"
80 ERROR = "ERROR"
81
82
83 CHILD_PROCESS_DELAY = 1.0
84
85
86 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
90 """Exception raised when this host is not the master."""
91
98
114
117 """Run the watcher hooks.
118
119 """
120 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
121 constants.HOOKS_NAME_WATCHER)
122 if not os.path.isdir(hooks_dir):
123 return
124
125 try:
126 results = utils.RunParts(hooks_dir)
127 except Exception, err:
128 logging.exception("RunParts %s failed: %s", hooks_dir, err)
129 return
130
131 for (relname, status, runresult) in results:
132 if status == constants.RUNPARTS_SKIP:
133 logging.debug("Watcher hook %s: skipped", relname)
134 elif status == constants.RUNPARTS_ERR:
135 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
136 elif status == constants.RUNPARTS_RUN:
137 if runresult.failed:
138 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
139 relname, runresult.exit_code, runresult.output)
140 else:
141 logging.debug("Watcher hook %s: success (output: %s)", relname,
142 runresult.output)
143 else:
144 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
145 status)
146
149 """Abstraction for a Virtual Machine instance.
150
151 """
152 - def __init__(self, name, status, config_state, config_state_source,
153 disks_active, snodes):
154 self.name = name
155 self.status = status
156 self.config_state = config_state
157 self.config_state_source = config_state_source
158 self.disks_active = disks_active
159 self.snodes = snodes
160
167
174
176 """Determines whether the instance needs cleanup.
177
178 Determines whether the instance needs cleanup after having been
179 shutdown by the user.
180
181 @rtype: bool
182 @return: True if the instance needs cleanup, False otherwise.
183
184 """
185 return self.status == constants.INSTST_USERDOWN and \
186 self.config_state != constants.ADMINST_DOWN
187
188
189 -class Node(object):
190 """Data container representing cluster node.
191
192 """
193 - def __init__(self, name, bootid, offline, secondaries):
194 """Initializes this class.
195
196 """
197 self.name = name
198 self.bootid = bootid
199 self.offline = offline
200 self.secondaries = secondaries
201
228
231 """Make a pass over the list of instances, restarting downed ones.
232
233 """
234 notepad.MaintainInstanceList(instances.keys())
235
236 started = set()
237
238 for inst in instances.values():
239 if inst.NeedsCleanup():
240 _CleanupInstance(cl, notepad, inst, locks)
241 elif inst.status in BAD_STATES:
242 n = notepad.NumberOfRestartAttempts(inst.name)
243
244 if n > MAXTRIES:
245 logging.warning("Not restarting instance '%s', retries exhausted",
246 inst.name)
247 continue
248
249 if n == MAXTRIES:
250 notepad.RecordRestartAttempt(inst.name)
251 logging.error("Could not restart instance '%s' after %s attempts,"
252 " giving up", inst.name, MAXTRIES)
253 continue
254
255 try:
256 logging.info("Restarting instance '%s' (attempt #%s)",
257 inst.name, n + 1)
258 inst.Restart(cl)
259 except Exception:
260 logging.exception("Error while restarting instance '%s'", inst.name)
261 else:
262 started.add(inst.name)
263
264 notepad.RecordRestartAttempt(inst.name)
265
266 else:
267 if notepad.NumberOfRestartAttempts(inst.name):
268 notepad.RemoveInstance(inst.name)
269 if inst.status not in HELPLESS_STATES:
270 logging.info("Restart of instance '%s' succeeded", inst.name)
271
272 return started
273
274
275 -def _CheckDisks(cl, notepad, nodes, instances, started):
276 """Check all nodes for restarted ones.
277
278 """
279 check_nodes = []
280
281 for node in nodes.values():
282 old = notepad.GetNodeBootID(node.name)
283 if not node.bootid:
284
285 if not node.offline:
286 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
287 node.name)
288 continue
289
290 if old != node.bootid:
291
292 check_nodes.append(node)
293
294 if check_nodes:
295
296
297 for node in check_nodes:
298 for instance_name in node.secondaries:
299 try:
300 inst = instances[instance_name]
301 except KeyError:
302 logging.info("Can't find instance '%s', maybe it was ignored",
303 instance_name)
304 continue
305
306 if not inst.disks_active:
307 logging.info("Skipping disk activation for instance with not"
308 " activated disks '%s'", inst.name)
309 continue
310
311 if inst.name in started:
312
313
314 logging.debug("Skipping disk activation for instance '%s' as"
315 " it was already started", inst.name)
316 continue
317
318 try:
319 logging.info("Activating disks for instance '%s'", inst.name)
320 inst.ActivateDisks(cl)
321 except Exception:
322 logging.exception("Error while activating disks for instance '%s'",
323 inst.name)
324
325
326 for node in check_nodes:
327 notepad.SetNodeBootID(node.name, node.bootid)
328
331 """Checks if given instances has any secondary in offline status.
332
333 @param instance: The instance object
334 @return: True if any of the secondary is offline, False otherwise
335
336 """
337 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
338
341 """Run a per-group "gnt-cluster verify-disks".
342
343 """
344 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(
345 group_name=uuid, priority=constants.OP_PRIO_LOW)])
346 ((_, offline_disk_instances, _), ) = \
347 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
348 cl.ArchiveJob(job_id)
349
350 if not offline_disk_instances:
351
352 logging.debug("Verify-disks reported no offline disks, nothing to do")
353 return
354
355 logging.debug("Will activate disks for instance(s) %s",
356 utils.CommaJoin(offline_disk_instances))
357
358
359
360 job = []
361 for name in offline_disk_instances:
362 try:
363 inst = instances[name]
364 except KeyError:
365 logging.info("Can't find instance '%s', maybe it was ignored", name)
366 continue
367
368 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
369 logging.info("Skipping instance '%s' because it is in a helpless state"
370 " or has offline secondaries", name)
371 continue
372
373 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
374
375 if job:
376 job_id = cli.SendJob(job, cl=cl)
377
378 try:
379 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
380 except Exception:
381 logging.exception("Error while activating disks")
382
385 """Connects to RAPI port and does a simple test.
386
387 Connects to RAPI port of hostname and does a simple test. At this time, the
388 test is GetVersion.
389
390 If RAPI responds with error code "401 Unauthorized", the test is successful,
391 because the aim of this function is to assess whether RAPI is responding, not
392 if it is accessible.
393
394 @type hostname: string
395 @param hostname: hostname of the node to connect to.
396 @rtype: bool
397 @return: Whether RAPI is working properly
398
399 """
400 curl_config = rapi.client.GenericCurlConfig()
401 rapi_client = rapi.client.GanetiRapiClient(hostname,
402 curl_config_fn=curl_config)
403 try:
404 master_version = rapi_client.GetVersion()
405 except rapi.client.CertificateError, err:
406 logging.warning("RAPI certificate error: %s", err)
407 return False
408 except rapi.client.GanetiApiError, err:
409 if err.code == 401:
410
411 return True
412 else:
413 logging.warning("RAPI error: %s", err)
414 return False
415 else:
416 logging.debug("Reported RAPI version %s", master_version)
417 return master_version == constants.RAPI_VERSION
418
421 """Parse the command line options.
422
423 @return: (options, args) as from OptionParser.parse_args()
424
425 """
426 parser = OptionParser(description="Ganeti cluster watcher",
427 usage="%prog [-d]",
428 version="%%prog (ganeti) %s" %
429 constants.RELEASE_VERSION)
430
431 parser.add_option(cli.DEBUG_OPT)
432 parser.add_option(cli.NODEGROUP_OPT)
433 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
434 help="Autoarchive jobs older than this age (default"
435 " 6 hours)")
436 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
437 action="store_true", help="Ignore cluster pause setting")
438 parser.add_option("--wait-children", dest="wait_children",
439 action="store_true", help="Wait for child processes")
440 parser.add_option("--no-wait-children", dest="wait_children",
441 action="store_false",
442 help="Don't wait for child processes")
443
444 parser.set_defaults(wait_children=True)
445 options, args = parser.parse_args()
446 options.job_age = cli.ParseTimespec(options.job_age)
447
448 if args:
449 parser.error("No arguments expected")
450
451 return (options, args)
452
455 """Writes the per-group instance status file.
456
457 The entries are sorted.
458
459 @type filename: string
460 @param filename: Path to instance status file
461 @type data: list of tuple; (instance name as string, status as string)
462 @param data: Instance name and status
463
464 """
465 logging.debug("Updating instance status file '%s' with %s instances",
466 filename, len(data))
467
468 utils.WriteFile(filename,
469 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
470 sorted(data))))
471
474 """Writes an instance status file from L{Instance} objects.
475
476 @type filename: string
477 @param filename: Path to status file
478 @type instances: list of L{Instance}
479
480 """
481 _WriteInstanceStatus(filename, [(inst.name, inst.status)
482 for inst in instances])
483
486 """Reads an instance status file.
487
488 @type filename: string
489 @param filename: Path to status file
490 @rtype: tuple; (None or number, list of lists containing instance name and
491 status)
492 @return: File's mtime and instance status contained in the file; mtime is
493 C{None} if file can't be read
494
495 """
496 logging.debug("Reading per-group instance status from '%s'", filename)
497
498 statcb = utils.FileStatHelper()
499 try:
500 content = utils.ReadFile(filename, preread=statcb)
501 except EnvironmentError, err:
502 if err.errno == errno.ENOENT:
503 logging.error("Can't read '%s', does not exist (yet)", filename)
504 else:
505 logging.exception("Unable to read '%s', ignoring", filename)
506 return (None, None)
507 else:
508 return (statcb.st.st_mtime, [line.split(None, 1)
509 for line in content.splitlines()])
510
513 """Merges all per-group instance status files into a global one.
514
515 @type filename: string
516 @param filename: Path to global instance status file
517 @type pergroup_filename: string
518 @param pergroup_filename: Path to per-group status files, must contain "%s"
519 to be replaced with group UUID
520 @type groups: sequence
521 @param groups: UUIDs of known groups
522
523 """
524
525 lock = utils.FileLock.Open(filename)
526 try:
527 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
528 except errors.LockError, err:
529
530
531
532 logging.error("Can't acquire lock on instance status file '%s', not"
533 " updating: %s", filename, err)
534 return
535
536 logging.debug("Acquired exclusive lock on '%s'", filename)
537
538 data = {}
539
540
541 for group_uuid in groups:
542 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
543
544 if mtime is not None:
545 for (instance_name, status) in instdata:
546 data.setdefault(instance_name, []).append((mtime, status))
547
548
549 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
550 for (instance_name, status) in data.items()]
551
552
553
554 _WriteInstanceStatus(filename, inststatus)
555
582
585 """Starts a new instance of the watcher for every node group.
586
587 """
588 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
589 for arg in sys.argv)
590
591 result = cl.QueryGroups([], ["name", "uuid"], False)
592
593 children = []
594
595 for (idx, (name, uuid)) in enumerate(result):
596 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
597
598 if idx > 0:
599
600 time.sleep(CHILD_PROCESS_DELAY)
601
602 logging.debug("Spawning child for group '%s' (%s), arguments %s",
603 name, uuid, args)
604
605 try:
606
607 pid = os.spawnv(os.P_NOWAIT, args[0], args)
608 except Exception:
609 logging.exception("Failed to start child for group '%s' (%s)",
610 name, uuid)
611 else:
612 logging.debug("Started with PID %s", pid)
613 children.append(pid)
614
615 if wait:
616 for pid in children:
617 logging.debug("Waiting for child PID %s", pid)
618 try:
619 result = utils.RetryOnSignal(os.waitpid, pid, 0)
620 except EnvironmentError, err:
621 result = str(err)
622
623 logging.debug("Child PID %s exited with status %s", pid, result)
624
627 """Archives old jobs.
628
629 """
630 (arch_count, left_count) = cl.AutoArchiveJobs(age)
631 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
632
641
687
690 """Retrieves instances and nodes per node group.
691
692 """
693 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None)
694
695 prefix = "instance/"
696 prefix_len = len(prefix)
697
698 locked_instances = set()
699
700 for [[_, name], [_, lock]] in locks.data:
701 if name.startswith(prefix) and lock:
702 locked_instances.add(name[prefix_len:])
703
704 queries = [
705 (constants.QR_INSTANCE,
706 ["name", "status", "admin_state", "admin_state_source", "disks_active",
707 "snodes", "pnode.group.uuid", "snodes.group.uuid"],
708 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
709 (constants.QR_NODE,
710 ["name", "bootid", "offline"],
711 [qlang.OP_EQUAL, "group.uuid", uuid]),
712 ]
713
714 results = []
715 for what, fields, qfilter in queries:
716 results.append(qcl.Query(what, fields, qfilter))
717
718 results_data = map(operator.attrgetter("data"), results)
719
720
721 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
722
723
724 (raw_instances, raw_nodes) = [[map(compat.snd, values)
725 for values in res]
726 for res in results_data]
727
728 secondaries = {}
729 instances = []
730
731
732 for (name, status, config_state, config_state_source, disks_active, snodes,
733 pnode_group_uuid, snodes_group_uuid) in raw_instances:
734 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
735 logging.error("Ignoring split instance '%s', primary group %s, secondary"
736 " groups %s", name, pnode_group_uuid,
737 utils.CommaJoin(snodes_group_uuid))
738 else:
739 instances.append(Instance(name, status, config_state, config_state_source,
740 disks_active, snodes))
741
742 for node in snodes:
743 secondaries.setdefault(node, set()).add(name)
744
745
746 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
747 for (name, bootid, offline) in raw_nodes]
748
749 return (dict((node.name, node) for node in nodes),
750 dict((inst.name, inst) for inst in instances),
751 locked_instances)
752
755 """Returns a list of all node groups known by L{ssconf}.
756
757 """
758 groups = ssconf.SimpleStore().GetNodegroupList()
759
760 result = list(line.split(None, 1)[0] for line in groups
761 if line.strip())
762
763 if not compat.all(map(utils.UUID_RE.match, result)):
764 raise errors.GenericError("Ssconf contains invalid group UUID")
765
766 return result
767
770 """Main function for per-group watcher process.
771
772 """
773 group_uuid = opts.nodegroup.lower()
774
775 if not utils.UUID_RE.match(group_uuid):
776 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
777 " got '%s'" %
778 (cli.NODEGROUP_OPT_NAME, group_uuid))
779
780 logging.info("Watcher for node group '%s'", group_uuid)
781
782 known_groups = _LoadKnownGroups()
783
784
785 if group_uuid not in known_groups:
786 raise errors.GenericError("Node group '%s' is not known by ssconf" %
787 group_uuid)
788
789
790
791 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
792 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
793
794 logging.debug("Using state file %s", state_path)
795
796
797 statefile = state.OpenStateFile(state_path)
798 if not statefile:
799 return constants.EXIT_FAILURE
800
801 notepad = state.WatcherState(statefile)
802 try:
803
804 client = GetLuxiClient(False)
805 query_client = GetLuxiClient(False, query=True)
806
807 _CheckMaster(client)
808
809 (nodes, instances, locks) = _GetGroupData(query_client, group_uuid)
810
811
812 _UpdateInstanceStatus(inst_status_path, instances.values())
813
814 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
815 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
816 known_groups)
817
818 started = _CheckInstances(client, notepad, instances, locks)
819 _CheckDisks(client, notepad, nodes, instances, started)
820 _VerifyDisks(client, group_uuid, nodes, instances)
821 except Exception, err:
822 logging.info("Not updating status file due to failure: %s", err)
823 raise
824 else:
825
826 notepad.Save(state_path)
827
828 return constants.EXIT_SUCCESS
829
832 """Main function.
833
834 """
835 (options, _) = ParseOptions()
836
837 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
838 debug=options.debug, stderr_logging=options.debug)
839
840 if ShouldPause() and not options.ignore_pause:
841 logging.debug("Pause has been set, exiting")
842 return constants.EXIT_SUCCESS
843
844
845 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
846 try:
847 lock.Shared(blocking=False)
848 except (EnvironmentError, errors.LockError), err:
849 logging.error("Can't acquire lock on %s: %s",
850 pathutils.WATCHER_LOCK_FILE, err)
851 return constants.EXIT_SUCCESS
852
853 if options.nodegroup is None:
854 fn = _GlobalWatcher
855 else:
856
857 fn = _GroupWatcher
858
859 try:
860 return fn(options)
861 except (SystemExit, KeyboardInterrupt):
862 raise
863 except NotMasterError:
864 logging.debug("Not master, exiting")
865 return constants.EXIT_NOTMASTER
866 except errors.ResolverError, err:
867 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
868 return constants.EXIT_NODESETUP_ERROR
869 except errors.JobQueueFull:
870 logging.error("Job queue is full, can't query cluster state")
871 except errors.JobQueueDrainError:
872 logging.error("Job queue is drained, can't maintain cluster state")
873 except Exception, err:
874 logging.exception(str(err))
875 return constants.EXIT_FAILURE
876
877 return constants.EXIT_SUCCESS
878