1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import time
43 import logging
44 import operator
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import wconfd
50 from ganeti import constants
51 from ganeti import compat
52 from ganeti import errors
53 from ganeti import opcodes
54 from ganeti import cli
55 import ganeti.rpc.errors as rpcerr
56 from ganeti import rapi
57 from ganeti import netutils
58 from ganeti import qlang
59 from ganeti import ssconf
60 from ganeti import ht
61 from ganeti import pathutils
62
63 import ganeti.rapi.client
64 from ganeti.rapi.client import UsesRapiClient
65
66 from ganeti.watcher import nodemaint
67 from ganeti.watcher import state
68
69
70 MAXTRIES = 5
71 BAD_STATES = compat.UniqueFrozenset([
72 constants.INSTST_ERRORDOWN,
73 ])
74 HELPLESS_STATES = compat.UniqueFrozenset([
75 constants.INSTST_NODEDOWN,
76 constants.INSTST_NODEOFFLINE,
77 ])
78 NOTICE = "NOTICE"
79 ERROR = "ERROR"
80
81
82 CHILD_PROCESS_DELAY = 1.0
83
84
85 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
89 """Exception raised when this host is not the master."""
90
97
112
115 """Run the watcher hooks.
116
117 """
118 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
119 constants.HOOKS_NAME_WATCHER)
120 if not os.path.isdir(hooks_dir):
121 return
122
123 try:
124 results = utils.RunParts(hooks_dir)
125 except Exception, err:
126 logging.exception("RunParts %s failed: %s", hooks_dir, err)
127 return
128
129 for (relname, status, runresult) in results:
130 if status == constants.RUNPARTS_SKIP:
131 logging.debug("Watcher hook %s: skipped", relname)
132 elif status == constants.RUNPARTS_ERR:
133 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
134 elif status == constants.RUNPARTS_RUN:
135 if runresult.failed:
136 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
137 relname, runresult.exit_code, runresult.output)
138 else:
139 logging.debug("Watcher hook %s: success (output: %s)", relname,
140 runresult.output)
141 else:
142 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
143 status)
144
147 """Abstraction for a Virtual Machine instance.
148
149 """
150 - def __init__(self, name, status, config_state, config_state_source,
151 disks_active, snodes):
152 self.name = name
153 self.status = status
154 self.config_state = config_state
155 self.config_state_source = config_state_source
156 self.disks_active = disks_active
157 self.snodes = snodes
158
168
178
180 """Determines whether the instance needs cleanup.
181
182 Determines whether the instance needs cleanup after having been
183 shutdown by the user.
184
185 @rtype: bool
186 @return: True if the instance needs cleanup, False otherwise.
187
188 """
189 return self.status == constants.INSTST_USERDOWN and \
190 self.config_state != constants.ADMINST_DOWN
191
192
193 -class Node(object):
194 """Data container representing cluster node.
195
196 """
197 - def __init__(self, name, bootid, offline, secondaries):
198 """Initializes this class.
199
200 """
201 self.name = name
202 self.bootid = bootid
203 self.offline = offline
204 self.secondaries = secondaries
205
208 n = notepad.NumberOfCleanupAttempts(inst.name)
209
210 if inst.name in locks:
211 logging.info("Not cleaning up instance '%s', instance is locked",
212 inst.name)
213 return
214
215 if n > MAXTRIES:
216 logging.warning("Not cleaning up instance '%s', retries exhausted",
217 inst.name)
218 return
219
220 logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
221 inst.name)
222 op = opcodes.OpInstanceShutdown(instance_name=inst.name,
223 admin_state_source=constants.USER_SOURCE)
224
225 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
226 "Cleaning up instance %s" % inst.name,
227 utils.EpochNano())]
228 try:
229 cli.SubmitOpCode(op, cl=cl)
230 if notepad.NumberOfCleanupAttempts(inst.name):
231 notepad.RemoveInstance(inst.name)
232 except Exception:
233 logging.exception("Error while cleaning up instance '%s'", inst.name)
234 notepad.RecordCleanupAttempt(inst.name)
235
238 """Make a pass over the list of instances, restarting downed ones.
239
240 """
241 notepad.MaintainInstanceList(instances.keys())
242
243 started = set()
244
245 for inst in instances.values():
246 if inst.NeedsCleanup():
247 _CleanupInstance(cl, notepad, inst, locks)
248 elif inst.status in BAD_STATES:
249 n = notepad.NumberOfRestartAttempts(inst.name)
250
251 if n > MAXTRIES:
252 logging.warning("Not restarting instance '%s', retries exhausted",
253 inst.name)
254 continue
255
256 if n == MAXTRIES:
257 notepad.RecordRestartAttempt(inst.name)
258 logging.error("Could not restart instance '%s' after %s attempts,"
259 " giving up", inst.name, MAXTRIES)
260 continue
261
262 try:
263 logging.info("Restarting instance '%s' (attempt #%s)",
264 inst.name, n + 1)
265 inst.Restart(cl)
266 except Exception:
267 logging.exception("Error while restarting instance '%s'", inst.name)
268 else:
269 started.add(inst.name)
270
271 notepad.RecordRestartAttempt(inst.name)
272
273 else:
274 if notepad.NumberOfRestartAttempts(inst.name):
275 notepad.RemoveInstance(inst.name)
276 if inst.status not in HELPLESS_STATES:
277 logging.info("Restart of instance '%s' succeeded", inst.name)
278
279 return started
280
281
282 -def _CheckDisks(cl, notepad, nodes, instances, started):
283 """Check all nodes for restarted ones.
284
285 """
286 check_nodes = []
287
288 for node in nodes.values():
289 old = notepad.GetNodeBootID(node.name)
290 if not node.bootid:
291
292 if not node.offline:
293 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
294 node.name)
295 continue
296
297 if old != node.bootid:
298
299 check_nodes.append(node)
300
301 if check_nodes:
302
303
304 for node in check_nodes:
305 for instance_name in node.secondaries:
306 try:
307 inst = instances[instance_name]
308 except KeyError:
309 logging.info("Can't find instance '%s', maybe it was ignored",
310 instance_name)
311 continue
312
313 if not inst.disks_active:
314 logging.info("Skipping disk activation for instance with not"
315 " activated disks '%s'", inst.name)
316 continue
317
318 if inst.name in started:
319
320
321 logging.debug("Skipping disk activation for instance '%s' as"
322 " it was already started", inst.name)
323 continue
324
325 try:
326 logging.info("Activating disks for instance '%s'", inst.name)
327 inst.ActivateDisks(cl)
328 except Exception:
329 logging.exception("Error while activating disks for instance '%s'",
330 inst.name)
331
332
333 for node in check_nodes:
334 notepad.SetNodeBootID(node.name, node.bootid)
335
338 """Checks if given instances has any secondary in offline status.
339
340 @param instance: The instance object
341 @return: True if any of the secondary is offline, False otherwise
342
343 """
344 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
345
348 """Run a per-group "gnt-cluster verify-disks".
349
350 """
351 op = opcodes.OpGroupVerifyDisks(
352 group_name=uuid, priority=constants.OP_PRIO_LOW)
353 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
354 "Verifying disks of group %s" % uuid,
355 utils.EpochNano())]
356 job_id = cl.SubmitJob([op])
357 ((_, offline_disk_instances, _), ) = \
358 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
359 cl.ArchiveJob(job_id)
360
361 if not offline_disk_instances:
362
363 logging.debug("Verify-disks reported no offline disks, nothing to do")
364 return
365
366 logging.debug("Will activate disks for instance(s) %s",
367 utils.CommaJoin(offline_disk_instances))
368
369
370
371 job = []
372 for name in offline_disk_instances:
373 try:
374 inst = instances[name]
375 except KeyError:
376 logging.info("Can't find instance '%s', maybe it was ignored", name)
377 continue
378
379 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
380 logging.info("Skipping instance '%s' because it is in a helpless state"
381 " or has offline secondaries", name)
382 continue
383
384 op = opcodes.OpInstanceActivateDisks(instance_name=name)
385 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
386 "Activating disks for instance %s" % name,
387 utils.EpochNano())]
388 job.append(op)
389
390 if job:
391 job_id = cli.SendJob(job, cl=cl)
392
393 try:
394 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
395 except Exception:
396 logging.exception("Error while activating disks")
397
400 """Connects to RAPI port and does a simple test.
401
402 Connects to RAPI port of hostname and does a simple test. At this time, the
403 test is GetVersion.
404
405 If RAPI responds with error code "401 Unauthorized", the test is successful,
406 because the aim of this function is to assess whether RAPI is responding, not
407 if it is accessible.
408
409 @type hostname: string
410 @param hostname: hostname of the node to connect to.
411 @rtype: bool
412 @return: Whether RAPI is working properly
413
414 """
415 curl_config = rapi.client.GenericCurlConfig()
416 rapi_client = rapi.client.GanetiRapiClient(hostname,
417 curl_config_fn=curl_config)
418 try:
419 master_version = rapi_client.GetVersion()
420 except rapi.client.CertificateError, err:
421 logging.warning("RAPI certificate error: %s", err)
422 return False
423 except rapi.client.GanetiApiError, err:
424 if err.code == 401:
425
426 return True
427 else:
428 logging.warning("RAPI error: %s", err)
429 return False
430 else:
431 logging.debug("Reported RAPI version %s", master_version)
432 return master_version == constants.RAPI_VERSION
433
436 """Probes an echo RPC to WConfD.
437
438 """
439 probe_string = "ganeti watcher probe %d" % time.time()
440
441 try:
442 result = wconfd.Client().Echo(probe_string)
443 except Exception, err:
444 logging.warning("WConfd connection error: %s", err)
445 return False
446
447 if result != probe_string:
448 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result)
449 return False
450
451 return True
452
455 """Parse the command line options.
456
457 @return: (options, args) as from OptionParser.parse_args()
458
459 """
460 parser = OptionParser(description="Ganeti cluster watcher",
461 usage="%prog [-d]",
462 version="%%prog (ganeti) %s" %
463 constants.RELEASE_VERSION)
464
465 parser.add_option(cli.DEBUG_OPT)
466 parser.add_option(cli.NODEGROUP_OPT)
467 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
468 help="Autoarchive jobs older than this age (default"
469 " 6 hours)")
470 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
471 action="store_true", help="Ignore cluster pause setting")
472 parser.add_option("--wait-children", dest="wait_children",
473 action="store_true", help="Wait for child processes")
474 parser.add_option("--no-wait-children", dest="wait_children",
475 action="store_false",
476 help="Don't wait for child processes")
477 parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False,
478 action="store_true", help="Do not verify disk status")
479 parser.add_option("--rapi-ip", dest="rapi_ip",
480 default=constants.IP4_ADDRESS_LOCALHOST,
481 help="Use this IP to talk to RAPI.")
482
483 parser.set_defaults(wait_children=True)
484 options, args = parser.parse_args()
485 options.job_age = cli.ParseTimespec(options.job_age)
486
487 if args:
488 parser.error("No arguments expected")
489
490 return (options, args)
491
494 """Writes the per-group instance status file.
495
496 The entries are sorted.
497
498 @type filename: string
499 @param filename: Path to instance status file
500 @type data: list of tuple; (instance name as string, status as string)
501 @param data: Instance name and status
502
503 """
504 logging.debug("Updating instance status file '%s' with %s instances",
505 filename, len(data))
506
507 utils.WriteFile(filename,
508 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
509 sorted(data))))
510
513 """Writes an instance status file from L{Instance} objects.
514
515 @type filename: string
516 @param filename: Path to status file
517 @type instances: list of L{Instance}
518
519 """
520 _WriteInstanceStatus(filename, [(inst.name, inst.status)
521 for inst in instances])
522
525 """Reads an instance status file.
526
527 @type filename: string
528 @param filename: Path to status file
529 @rtype: tuple; (None or number, list of lists containing instance name and
530 status)
531 @return: File's mtime and instance status contained in the file; mtime is
532 C{None} if file can't be read
533
534 """
535 logging.debug("Reading per-group instance status from '%s'", filename)
536
537 statcb = utils.FileStatHelper()
538 try:
539 content = utils.ReadFile(filename, preread=statcb)
540 except EnvironmentError, err:
541 if err.errno == errno.ENOENT:
542 logging.error("Can't read '%s', does not exist (yet)", filename)
543 else:
544 logging.exception("Unable to read '%s', ignoring", filename)
545 return (None, None)
546 else:
547 return (statcb.st.st_mtime, [line.split(None, 1)
548 for line in content.splitlines()])
549
552 """Merges all per-group instance status files into a global one.
553
554 @type filename: string
555 @param filename: Path to global instance status file
556 @type pergroup_filename: string
557 @param pergroup_filename: Path to per-group status files, must contain "%s"
558 to be replaced with group UUID
559 @type groups: sequence
560 @param groups: UUIDs of known groups
561
562 """
563
564 lock = utils.FileLock.Open(filename)
565 try:
566 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
567 except errors.LockError, err:
568
569
570
571 logging.error("Can't acquire lock on instance status file '%s', not"
572 " updating: %s", filename, err)
573 return
574
575 logging.debug("Acquired exclusive lock on '%s'", filename)
576
577 data = {}
578
579
580 for group_uuid in groups:
581 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
582
583 if mtime is not None:
584 for (instance_name, status) in instdata:
585 data.setdefault(instance_name, []).append((mtime, status))
586
587
588 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
589 for (instance_name, status) in data.items()]
590
591
592
593 _WriteInstanceStatus(filename, inststatus)
594
621
624 """Starts a new instance of the watcher for every node group.
625
626 """
627 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
628 for arg in sys.argv)
629
630 result = cl.QueryGroups([], ["name", "uuid"], False)
631
632 children = []
633
634 for (idx, (name, uuid)) in enumerate(result):
635 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
636
637 if idx > 0:
638
639 time.sleep(CHILD_PROCESS_DELAY)
640
641 logging.debug("Spawning child for group '%s' (%s), arguments %s",
642 name, uuid, args)
643
644 try:
645
646 pid = os.spawnv(os.P_NOWAIT, args[0], args)
647 except Exception:
648 logging.exception("Failed to start child for group '%s' (%s)",
649 name, uuid)
650 else:
651 logging.debug("Started with PID %s", pid)
652 children.append(pid)
653
654 if wait:
655 for pid in children:
656 logging.debug("Waiting for child PID %s", pid)
657 try:
658 result = utils.RetryOnSignal(os.waitpid, pid, 0)
659 except EnvironmentError, err:
660 result = str(err)
661
662 logging.debug("Child PID %s exited with status %s", pid, result)
663
666 """Archives old jobs.
667
668 """
669 (arch_count, left_count) = cl.AutoArchiveJobs(age)
670 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
671
680
737
740 """Retrieves instances and nodes per node group.
741
742 """
743 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None)
744
745 prefix = "instance/"
746 prefix_len = len(prefix)
747
748 locked_instances = set()
749
750 for [[_, name], [_, lock]] in locks.data:
751 if name.startswith(prefix) and lock:
752 locked_instances.add(name[prefix_len:])
753
754 queries = [
755 (constants.QR_INSTANCE,
756 ["name", "status", "admin_state", "admin_state_source", "disks_active",
757 "snodes", "pnode.group.uuid", "snodes.group.uuid"],
758 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
759 (constants.QR_NODE,
760 ["name", "bootid", "offline"],
761 [qlang.OP_EQUAL, "group.uuid", uuid]),
762 ]
763
764 results = []
765 for what, fields, qfilter in queries:
766 results.append(qcl.Query(what, fields, qfilter))
767
768 results_data = map(operator.attrgetter("data"), results)
769
770
771 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
772
773
774 (raw_instances, raw_nodes) = [[map(compat.snd, values)
775 for values in res]
776 for res in results_data]
777
778 secondaries = {}
779 instances = []
780
781
782 for (name, status, config_state, config_state_source, disks_active, snodes,
783 pnode_group_uuid, snodes_group_uuid) in raw_instances:
784 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
785 logging.error("Ignoring split instance '%s', primary group %s, secondary"
786 " groups %s", name, pnode_group_uuid,
787 utils.CommaJoin(snodes_group_uuid))
788 else:
789 instances.append(Instance(name, status, config_state, config_state_source,
790 disks_active, snodes))
791
792 for node in snodes:
793 secondaries.setdefault(node, set()).add(name)
794
795
796 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
797 for (name, bootid, offline) in raw_nodes]
798
799 return (dict((node.name, node) for node in nodes),
800 dict((inst.name, inst) for inst in instances),
801 locked_instances)
802
805 """Returns a list of all node groups known by L{ssconf}.
806
807 """
808 groups = ssconf.SimpleStore().GetNodegroupList()
809
810 result = list(line.split(None, 1)[0] for line in groups
811 if line.strip())
812
813 if not compat.all(map(utils.UUID_RE.match, result)):
814 raise errors.GenericError("Ssconf contains invalid group UUID")
815
816 return result
817
820 """Main function for per-group watcher process.
821
822 """
823 group_uuid = opts.nodegroup.lower()
824
825 if not utils.UUID_RE.match(group_uuid):
826 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
827 " got '%s'" %
828 (cli.NODEGROUP_OPT_NAME, group_uuid))
829
830 logging.info("Watcher for node group '%s'", group_uuid)
831
832 known_groups = _LoadKnownGroups()
833
834
835 if group_uuid not in known_groups:
836 raise errors.GenericError("Node group '%s' is not known by ssconf" %
837 group_uuid)
838
839
840
841 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
842 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
843
844 logging.debug("Using state file %s", state_path)
845
846
847 statefile = state.OpenStateFile(state_path)
848 if not statefile:
849 return constants.EXIT_FAILURE
850
851 notepad = state.WatcherState(statefile)
852 try:
853
854 client = GetLuxiClient(False)
855
856 _CheckMaster(client)
857
858 (nodes, instances, locks) = _GetGroupData(client, group_uuid)
859
860
861 _UpdateInstanceStatus(inst_status_path, instances.values())
862
863 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
864 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
865 known_groups)
866
867 started = _CheckInstances(client, notepad, instances, locks)
868 _CheckDisks(client, notepad, nodes, instances, started)
869 if not opts.no_verify_disks:
870 _VerifyDisks(client, group_uuid, nodes, instances)
871 except Exception, err:
872 logging.info("Not updating status file due to failure: %s", err)
873 raise
874 else:
875
876 notepad.Save(state_path)
877
878 return constants.EXIT_SUCCESS
879
882 """Main function.
883
884 """
885 (options, _) = ParseOptions()
886
887 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
888 debug=options.debug, stderr_logging=options.debug)
889
890 if ShouldPause() and not options.ignore_pause:
891 logging.debug("Pause has been set, exiting")
892 return constants.EXIT_SUCCESS
893
894
895 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
896 try:
897 lock.Shared(blocking=False)
898 except (EnvironmentError, errors.LockError), err:
899 logging.error("Can't acquire lock on %s: %s",
900 pathutils.WATCHER_LOCK_FILE, err)
901 return constants.EXIT_SUCCESS
902
903 if options.nodegroup is None:
904 fn = _GlobalWatcher
905 else:
906
907 fn = _GroupWatcher
908
909 try:
910 return fn(options)
911 except (SystemExit, KeyboardInterrupt):
912 raise
913 except NotMasterError:
914 logging.debug("Not master, exiting")
915 return constants.EXIT_NOTMASTER
916 except errors.ResolverError, err:
917 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
918 return constants.EXIT_NODESETUP_ERROR
919 except errors.JobQueueFull:
920 logging.error("Job queue is full, can't query cluster state")
921 except errors.JobQueueDrainError:
922 logging.error("Job queue is drained, can't maintain cluster state")
923 except Exception, err:
924 logging.exception(str(err))
925 return constants.EXIT_FAILURE
926
927 return constants.EXIT_SUCCESS
928