1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import time
43 import logging
44 import operator
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import wconfd
50 from ganeti import constants
51 from ganeti import compat
52 from ganeti import errors
53 from ganeti import opcodes
54 from ganeti import cli
55 import ganeti.rpc.errors as rpcerr
56 from ganeti import rapi
57 from ganeti import netutils
58 from ganeti import qlang
59 from ganeti import ssconf
60 from ganeti import ht
61 from ganeti import pathutils
62
63 import ganeti.rapi.client
64 from ganeti.rapi.client import UsesRapiClient
65
66 from ganeti.watcher import nodemaint
67 from ganeti.watcher import state
68
69
70 MAXTRIES = 5
71 BAD_STATES = compat.UniqueFrozenset([
72 constants.INSTST_ERRORDOWN,
73 ])
74 HELPLESS_STATES = compat.UniqueFrozenset([
75 constants.INSTST_NODEDOWN,
76 constants.INSTST_NODEOFFLINE,
77 ])
78 NOTICE = "NOTICE"
79 ERROR = "ERROR"
80
81
82 CHILD_PROCESS_DELAY = 1.0
83
84
85 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
89 """Exception raised when this host is not the master."""
90
97
112
115 """Run the watcher hooks.
116
117 """
118 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
119 constants.HOOKS_NAME_WATCHER)
120 if not os.path.isdir(hooks_dir):
121 return
122
123 try:
124 results = utils.RunParts(hooks_dir)
125 except Exception, err:
126 logging.exception("RunParts %s failed: %s", hooks_dir, err)
127 return
128
129 for (relname, status, runresult) in results:
130 if status == constants.RUNPARTS_SKIP:
131 logging.debug("Watcher hook %s: skipped", relname)
132 elif status == constants.RUNPARTS_ERR:
133 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
134 elif status == constants.RUNPARTS_RUN:
135 if runresult.failed:
136 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
137 relname, runresult.exit_code, runresult.output)
138 else:
139 logging.debug("Watcher hook %s: success (output: %s)", relname,
140 runresult.output)
141 else:
142 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
143 status)
144
147 """Abstraction for a Virtual Machine instance.
148
149 """
150 - def __init__(self, name, status, config_state, config_state_source,
151 disks_active, snodes):
152 self.name = name
153 self.status = status
154 self.config_state = config_state
155 self.config_state_source = config_state_source
156 self.disks_active = disks_active
157 self.snodes = snodes
158
168
178
180 """Determines whether the instance needs cleanup.
181
182 Determines whether the instance needs cleanup after having been
183 shutdown by the user.
184
185 @rtype: bool
186 @return: True if the instance needs cleanup, False otherwise.
187
188 """
189 return self.status == constants.INSTST_USERDOWN and \
190 self.config_state != constants.ADMINST_DOWN
191
192
193 -class Node(object):
194 """Data container representing cluster node.
195
196 """
197 - def __init__(self, name, bootid, offline, secondaries):
198 """Initializes this class.
199
200 """
201 self.name = name
202 self.bootid = bootid
203 self.offline = offline
204 self.secondaries = secondaries
205
208 n = notepad.NumberOfCleanupAttempts(inst.name)
209
210 if inst.name in locks:
211 logging.info("Not cleaning up instance '%s', instance is locked",
212 inst.name)
213 return
214
215 if n > MAXTRIES:
216 logging.warning("Not cleaning up instance '%s', retries exhausted",
217 inst.name)
218 return
219
220 logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
221 inst.name)
222 op = opcodes.OpInstanceShutdown(instance_name=inst.name,
223 admin_state_source=constants.USER_SOURCE)
224
225 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
226 "Cleaning up instance %s" % inst.name,
227 utils.EpochNano())]
228 try:
229 cli.SubmitOpCode(op, cl=cl)
230 if notepad.NumberOfCleanupAttempts(inst.name):
231 notepad.RemoveInstance(inst.name)
232 except Exception:
233 logging.exception("Error while cleaning up instance '%s'", inst.name)
234 notepad.RecordCleanupAttempt(inst.name)
235
238 """Make a pass over the list of instances, restarting downed ones.
239
240 """
241 notepad.MaintainInstanceList(instances.keys())
242
243 started = set()
244
245 for inst in instances.values():
246 if inst.NeedsCleanup():
247 _CleanupInstance(cl, notepad, inst, locks)
248 elif inst.status in BAD_STATES:
249 n = notepad.NumberOfRestartAttempts(inst.name)
250
251 if n > MAXTRIES:
252 logging.warning("Not restarting instance '%s', retries exhausted",
253 inst.name)
254 continue
255
256 if n == MAXTRIES:
257 notepad.RecordRestartAttempt(inst.name)
258 logging.error("Could not restart instance '%s' after %s attempts,"
259 " giving up", inst.name, MAXTRIES)
260 continue
261
262 try:
263 logging.info("Restarting instance '%s' (attempt #%s)",
264 inst.name, n + 1)
265 inst.Restart(cl)
266 except Exception:
267 logging.exception("Error while restarting instance '%s'", inst.name)
268 else:
269 started.add(inst.name)
270
271 notepad.RecordRestartAttempt(inst.name)
272
273 else:
274 if notepad.NumberOfRestartAttempts(inst.name):
275 notepad.RemoveInstance(inst.name)
276 if inst.status not in HELPLESS_STATES:
277 logging.info("Restart of instance '%s' succeeded", inst.name)
278
279 return started
280
281
282 -def _CheckDisks(cl, notepad, nodes, instances, started):
283 """Check all nodes for restarted ones.
284
285 """
286 check_nodes = []
287
288 for node in nodes.values():
289 old = notepad.GetNodeBootID(node.name)
290 if not node.bootid:
291
292 if not node.offline:
293 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
294 node.name)
295 continue
296
297 if old != node.bootid:
298
299 check_nodes.append(node)
300
301 if check_nodes:
302
303
304 for node in check_nodes:
305 for instance_name in node.secondaries:
306 try:
307 inst = instances[instance_name]
308 except KeyError:
309 logging.info("Can't find instance '%s', maybe it was ignored",
310 instance_name)
311 continue
312
313 if not inst.disks_active:
314 logging.info("Skipping disk activation for instance with not"
315 " activated disks '%s'", inst.name)
316 continue
317
318 if inst.name in started:
319
320
321 logging.debug("Skipping disk activation for instance '%s' as"
322 " it was already started", inst.name)
323 continue
324
325 try:
326 logging.info("Activating disks for instance '%s'", inst.name)
327 inst.ActivateDisks(cl)
328 except Exception:
329 logging.exception("Error while activating disks for instance '%s'",
330 inst.name)
331
332
333 for node in check_nodes:
334 notepad.SetNodeBootID(node.name, node.bootid)
335
338 """Checks if given instances has any secondary in offline status.
339
340 @param instance: The instance object
341 @return: True if any of the secondary is offline, False otherwise
342
343 """
344 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
345
348 """Run a per-group "gnt-cluster verify-disks".
349
350 """
351 op = opcodes.OpGroupVerifyDisks(
352 group_name=uuid, priority=constants.OP_PRIO_LOW)
353 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
354 "Verifying disks of group %s" % uuid,
355 utils.EpochNano())]
356 job_id = cl.SubmitJob([op])
357 ((_, offline_disk_instances, _), ) = \
358 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
359 cl.ArchiveJob(job_id)
360
361 if not offline_disk_instances:
362
363 logging.debug("Verify-disks reported no offline disks, nothing to do")
364 return
365
366 logging.debug("Will activate disks for instance(s) %s",
367 utils.CommaJoin(offline_disk_instances))
368
369
370
371 job = []
372 for name in offline_disk_instances:
373 try:
374 inst = instances[name]
375 except KeyError:
376 logging.info("Can't find instance '%s', maybe it was ignored", name)
377 continue
378
379 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
380 logging.info("Skipping instance '%s' because it is in a helpless state"
381 " or has offline secondaries", name)
382 continue
383
384 op = opcodes.OpInstanceActivateDisks(instance_name=name)
385 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
386 "Activating disks for instance %s" % name,
387 utils.EpochNano())]
388 job.append(op)
389
390 if job:
391 job_id = cli.SendJob(job, cl=cl)
392
393 try:
394 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
395 except Exception:
396 logging.exception("Error while activating disks")
397
400 """Connects to RAPI port and does a simple test.
401
402 Connects to RAPI port of hostname and does a simple test. At this time, the
403 test is GetVersion.
404
405 If RAPI responds with error code "401 Unauthorized", the test is successful,
406 because the aim of this function is to assess whether RAPI is responding, not
407 if it is accessible.
408
409 @type hostname: string
410 @param hostname: hostname of the node to connect to.
411 @rtype: bool
412 @return: Whether RAPI is working properly
413
414 """
415 curl_config = rapi.client.GenericCurlConfig()
416 rapi_client = rapi.client.GanetiRapiClient(hostname,
417 curl_config_fn=curl_config)
418 try:
419 master_version = rapi_client.GetVersion()
420 except rapi.client.CertificateError, err:
421 logging.warning("RAPI certificate error: %s", err)
422 return False
423 except rapi.client.GanetiApiError, err:
424 if err.code == 401:
425
426 return True
427 else:
428 logging.warning("RAPI error: %s", err)
429 return False
430 else:
431 logging.debug("Reported RAPI version %s", master_version)
432 return master_version == constants.RAPI_VERSION
433
436 """Probes an echo RPC to WConfD.
437
438 """
439 probe_string = "ganeti watcher probe %d" % time.time()
440
441 try:
442 result = wconfd.Client().Echo(probe_string)
443 except Exception, err:
444 logging.warning("WConfd connection error: %s", err)
445 return False
446
447 if result != probe_string:
448 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result)
449 return False
450
451 return True
452
455 """Parse the command line options.
456
457 @return: (options, args) as from OptionParser.parse_args()
458
459 """
460 parser = OptionParser(description="Ganeti cluster watcher",
461 usage="%prog [-d]",
462 version="%%prog (ganeti) %s" %
463 constants.RELEASE_VERSION)
464
465 parser.add_option(cli.DEBUG_OPT)
466 parser.add_option(cli.NODEGROUP_OPT)
467 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
468 help="Autoarchive jobs older than this age (default"
469 " 6 hours)")
470 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
471 action="store_true", help="Ignore cluster pause setting")
472 parser.add_option("--wait-children", dest="wait_children",
473 action="store_true", help="Wait for child processes")
474 parser.add_option("--no-wait-children", dest="wait_children",
475 action="store_false",
476 help="Don't wait for child processes")
477 parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False,
478 action="store_true", help="Do not verify disk status")
479 parser.add_option("--rapi-ip", dest="rapi_ip",
480 default=constants.IP4_ADDRESS_LOCALHOST,
481 help="Use this IP to talk to RAPI.")
482
483 parser.set_defaults(wait_children=True)
484 options, args = parser.parse_args()
485 options.job_age = cli.ParseTimespec(options.job_age)
486
487 if args:
488 parser.error("No arguments expected")
489
490 return (options, args)
491
494 """Writes the per-group instance status file.
495
496 The entries are sorted.
497
498 @type filename: string
499 @param filename: Path to instance status file
500 @type data: list of tuple; (instance name as string, status as string)
501 @param data: Instance name and status
502
503 """
504 logging.debug("Updating instance status file '%s' with %s instances",
505 filename, len(data))
506
507 utils.WriteFile(filename,
508 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
509 sorted(data))))
510
513 """Writes an instance status file from L{Instance} objects.
514
515 @type filename: string
516 @param filename: Path to status file
517 @type instances: list of L{Instance}
518
519 """
520 _WriteInstanceStatus(filename, [(inst.name, inst.status)
521 for inst in instances])
522
525 """Reads an instance status file.
526
527 @type filename: string
528 @param filename: Path to status file
529 @rtype: tuple; (None or number, list of lists containing instance name and
530 status)
531 @return: File's mtime and instance status contained in the file; mtime is
532 C{None} if file can't be read
533
534 """
535 logging.debug("Reading per-group instance status from '%s'", filename)
536
537 statcb = utils.FileStatHelper()
538 try:
539 content = utils.ReadFile(filename, preread=statcb)
540 except EnvironmentError, err:
541 if err.errno == errno.ENOENT:
542 logging.error("Can't read '%s', does not exist (yet)", filename)
543 else:
544 logging.exception("Unable to read '%s', ignoring", filename)
545 return (None, None)
546 else:
547 return (statcb.st.st_mtime, [line.split(None, 1)
548 for line in content.splitlines()])
549
552 """Merges all per-group instance status files into a global one.
553
554 @type filename: string
555 @param filename: Path to global instance status file
556 @type pergroup_filename: string
557 @param pergroup_filename: Path to per-group status files, must contain "%s"
558 to be replaced with group UUID
559 @type groups: sequence
560 @param groups: UUIDs of known groups
561
562 """
563
564 lock = utils.FileLock.Open(filename)
565 try:
566 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
567 except errors.LockError, err:
568
569
570
571 logging.error("Can't acquire lock on instance status file '%s', not"
572 " updating: %s", filename, err)
573 return
574
575 logging.debug("Acquired exclusive lock on '%s'", filename)
576
577 data = {}
578
579
580 for group_uuid in groups:
581 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
582
583 if mtime is not None:
584 for (instance_name, status) in instdata:
585 data.setdefault(instance_name, []).append((mtime, status))
586
587
588 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
589 for (instance_name, status) in data.items()]
590
591
592
593 _WriteInstanceStatus(filename, inststatus)
594
621
624 """Starts a new instance of the watcher for every node group.
625
626 """
627 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
628 for arg in sys.argv)
629
630 result = cl.QueryGroups([], ["name", "uuid"], False)
631
632 children = []
633
634 for (idx, (name, uuid)) in enumerate(result):
635 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
636
637 if idx > 0:
638
639 time.sleep(CHILD_PROCESS_DELAY)
640
641 logging.debug("Spawning child for group '%s' (%s), arguments %s",
642 name, uuid, args)
643
644 try:
645
646 pid = os.spawnv(os.P_NOWAIT, args[0], args)
647 except Exception:
648 logging.exception("Failed to start child for group '%s' (%s)",
649 name, uuid)
650 else:
651 logging.debug("Started with PID %s", pid)
652 children.append(pid)
653
654 if wait:
655 for pid in children:
656 logging.debug("Waiting for child PID %s", pid)
657 try:
658 result = utils.RetryOnSignal(os.waitpid, pid, 0)
659 except EnvironmentError, err:
660 result = str(err)
661
662 logging.debug("Child PID %s exited with status %s", pid, result)
663
666 """Archives old jobs.
667
668 """
669 (arch_count, left_count) = cl.AutoArchiveJobs(age)
670 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
671
680
736
739 """Retrieves instances and nodes per node group.
740
741 """
742 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None)
743
744 prefix = "instance/"
745 prefix_len = len(prefix)
746
747 locked_instances = set()
748
749 for [[_, name], [_, lock]] in locks.data:
750 if name.startswith(prefix) and lock:
751 locked_instances.add(name[prefix_len:])
752
753 queries = [
754 (constants.QR_INSTANCE,
755 ["name", "status", "admin_state", "admin_state_source", "disks_active",
756 "snodes", "pnode.group.uuid", "snodes.group.uuid"],
757 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
758 (constants.QR_NODE,
759 ["name", "bootid", "offline"],
760 [qlang.OP_EQUAL, "group.uuid", uuid]),
761 ]
762
763 results = []
764 for what, fields, qfilter in queries:
765 results.append(qcl.Query(what, fields, qfilter))
766
767 results_data = map(operator.attrgetter("data"), results)
768
769
770 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
771
772
773 (raw_instances, raw_nodes) = [[map(compat.snd, values)
774 for values in res]
775 for res in results_data]
776
777 secondaries = {}
778 instances = []
779
780
781 for (name, status, config_state, config_state_source, disks_active, snodes,
782 pnode_group_uuid, snodes_group_uuid) in raw_instances:
783 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
784 logging.error("Ignoring split instance '%s', primary group %s, secondary"
785 " groups %s", name, pnode_group_uuid,
786 utils.CommaJoin(snodes_group_uuid))
787 else:
788 instances.append(Instance(name, status, config_state, config_state_source,
789 disks_active, snodes))
790
791 for node in snodes:
792 secondaries.setdefault(node, set()).add(name)
793
794
795 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
796 for (name, bootid, offline) in raw_nodes]
797
798 return (dict((node.name, node) for node in nodes),
799 dict((inst.name, inst) for inst in instances),
800 locked_instances)
801
804 """Returns a list of all node groups known by L{ssconf}.
805
806 """
807 groups = ssconf.SimpleStore().GetNodegroupList()
808
809 result = list(line.split(None, 1)[0] for line in groups
810 if line.strip())
811
812 if not compat.all(map(utils.UUID_RE.match, result)):
813 raise errors.GenericError("Ssconf contains invalid group UUID")
814
815 return result
816
819 """Main function for per-group watcher process.
820
821 """
822 group_uuid = opts.nodegroup.lower()
823
824 if not utils.UUID_RE.match(group_uuid):
825 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
826 " got '%s'" %
827 (cli.NODEGROUP_OPT_NAME, group_uuid))
828
829 logging.info("Watcher for node group '%s'", group_uuid)
830
831 known_groups = _LoadKnownGroups()
832
833
834 if group_uuid not in known_groups:
835 raise errors.GenericError("Node group '%s' is not known by ssconf" %
836 group_uuid)
837
838
839
840 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
841 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
842
843 logging.debug("Using state file %s", state_path)
844
845
846 statefile = state.OpenStateFile(state_path)
847 if not statefile:
848 return constants.EXIT_FAILURE
849
850 notepad = state.WatcherState(statefile)
851 try:
852
853 client = GetLuxiClient(False)
854
855 _CheckMaster(client)
856
857 (nodes, instances, locks) = _GetGroupData(client, group_uuid)
858
859
860 _UpdateInstanceStatus(inst_status_path, instances.values())
861
862 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
863 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
864 known_groups)
865
866 started = _CheckInstances(client, notepad, instances, locks)
867 _CheckDisks(client, notepad, nodes, instances, started)
868 if not opts.no_verify_disks:
869 _VerifyDisks(client, group_uuid, nodes, instances)
870 except Exception, err:
871 logging.info("Not updating status file due to failure: %s", err)
872 raise
873 else:
874
875 notepad.Save(state_path)
876
877 return constants.EXIT_SUCCESS
878
881 """Main function.
882
883 """
884 (options, _) = ParseOptions()
885
886 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
887 debug=options.debug, stderr_logging=options.debug)
888
889 if ShouldPause() and not options.ignore_pause:
890 logging.debug("Pause has been set, exiting")
891 return constants.EXIT_SUCCESS
892
893
894 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
895 try:
896 lock.Shared(blocking=False)
897 except (EnvironmentError, errors.LockError), err:
898 logging.error("Can't acquire lock on %s: %s",
899 pathutils.WATCHER_LOCK_FILE, err)
900 return constants.EXIT_SUCCESS
901
902 if options.nodegroup is None:
903 fn = _GlobalWatcher
904 else:
905
906 fn = _GroupWatcher
907
908 try:
909 return fn(options)
910 except (SystemExit, KeyboardInterrupt):
911 raise
912 except NotMasterError:
913 logging.debug("Not master, exiting")
914 return constants.EXIT_NOTMASTER
915 except errors.ResolverError, err:
916 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
917 return constants.EXIT_NODESETUP_ERROR
918 except errors.JobQueueFull:
919 logging.error("Job queue is full, can't query cluster state")
920 except errors.JobQueueDrainError:
921 logging.error("Job queue is drained, can't maintain cluster state")
922 except Exception, err:
923 logging.exception(str(err))
924 return constants.EXIT_FAILURE
925
926 return constants.EXIT_SUCCESS
927