1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import time
43 import logging
44 import operator
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import wconfd
50 from ganeti import constants
51 from ganeti import compat
52 from ganeti import errors
53 from ganeti import opcodes
54 from ganeti import cli
55 import ganeti.rpc.node as rpc
56 import ganeti.rpc.errors as rpcerr
57 from ganeti import rapi
58 from ganeti import netutils
59 from ganeti import qlang
60 from ganeti import objects
61 from ganeti import ssconf
62 from ganeti import ht
63 from ganeti import pathutils
64
65 import ganeti.rapi.client
66 from ganeti.rapi.client import UsesRapiClient
67
68 from ganeti.watcher import nodemaint
69 from ganeti.watcher import state
70
71
72 MAXTRIES = 5
73 BAD_STATES = compat.UniqueFrozenset([
74 constants.INSTST_ERRORDOWN,
75 ])
76 HELPLESS_STATES = compat.UniqueFrozenset([
77 constants.INSTST_NODEDOWN,
78 constants.INSTST_NODEOFFLINE,
79 ])
80 NOTICE = "NOTICE"
81 ERROR = "ERROR"
82
83
84 CHILD_PROCESS_DELAY = 1.0
85
86
87 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
91 """Exception raised when this host is not the master."""
92
99
115
118 """Run the watcher hooks.
119
120 """
121 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
122 constants.HOOKS_NAME_WATCHER)
123 if not os.path.isdir(hooks_dir):
124 return
125
126 try:
127 results = utils.RunParts(hooks_dir)
128 except Exception, err:
129 logging.exception("RunParts %s failed: %s", hooks_dir, err)
130 return
131
132 for (relname, status, runresult) in results:
133 if status == constants.RUNPARTS_SKIP:
134 logging.debug("Watcher hook %s: skipped", relname)
135 elif status == constants.RUNPARTS_ERR:
136 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
137 elif status == constants.RUNPARTS_RUN:
138 if runresult.failed:
139 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
140 relname, runresult.exit_code, runresult.output)
141 else:
142 logging.debug("Watcher hook %s: success (output: %s)", relname,
143 runresult.output)
144 else:
145 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
146 status)
147
150 """Abstraction for a Virtual Machine instance.
151
152 """
153 - def __init__(self, name, status, config_state, config_state_source,
154 disks_active, snodes):
155 self.name = name
156 self.status = status
157 self.config_state = config_state
158 self.config_state_source = config_state_source
159 self.disks_active = disks_active
160 self.snodes = snodes
161
171
181
183 """Determines whether the instance needs cleanup.
184
185 Determines whether the instance needs cleanup after having been
186 shutdown by the user.
187
188 @rtype: bool
189 @return: True if the instance needs cleanup, False otherwise.
190
191 """
192 return self.status == constants.INSTST_USERDOWN and \
193 self.config_state != constants.ADMINST_DOWN
194
195
196 -class Node(object):
197 """Data container representing cluster node.
198
199 """
200 - def __init__(self, name, bootid, offline, secondaries):
201 """Initializes this class.
202
203 """
204 self.name = name
205 self.bootid = bootid
206 self.offline = offline
207 self.secondaries = secondaries
208
211 n = notepad.NumberOfCleanupAttempts(inst.name)
212
213 if inst.name in locks:
214 logging.info("Not cleaning up instance '%s', instance is locked",
215 inst.name)
216 return
217
218 if n > MAXTRIES:
219 logging.warning("Not cleaning up instance '%s', retries exhausted",
220 inst.name)
221 return
222
223 logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
224 inst.name)
225 op = opcodes.OpInstanceShutdown(instance_name=inst.name,
226 admin_state_source=constants.USER_SOURCE)
227
228 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
229 "Cleaning up instance %s" % inst.name,
230 utils.EpochNano())]
231 try:
232 cli.SubmitOpCode(op, cl=cl)
233 if notepad.NumberOfCleanupAttempts(inst.name):
234 notepad.RemoveInstance(inst.name)
235 except Exception:
236 logging.exception("Error while cleaning up instance '%s'", inst.name)
237 notepad.RecordCleanupAttempt(inst.name)
238
241 """Make a pass over the list of instances, restarting downed ones.
242
243 """
244 notepad.MaintainInstanceList(instances.keys())
245
246 started = set()
247
248 for inst in instances.values():
249 if inst.NeedsCleanup():
250 _CleanupInstance(cl, notepad, inst, locks)
251 elif inst.status in BAD_STATES:
252 n = notepad.NumberOfRestartAttempts(inst.name)
253
254 if n > MAXTRIES:
255 logging.warning("Not restarting instance '%s', retries exhausted",
256 inst.name)
257 continue
258
259 if n == MAXTRIES:
260 notepad.RecordRestartAttempt(inst.name)
261 logging.error("Could not restart instance '%s' after %s attempts,"
262 " giving up", inst.name, MAXTRIES)
263 continue
264
265 try:
266 logging.info("Restarting instance '%s' (attempt #%s)",
267 inst.name, n + 1)
268 inst.Restart(cl)
269 except Exception:
270 logging.exception("Error while restarting instance '%s'", inst.name)
271 else:
272 started.add(inst.name)
273
274 notepad.RecordRestartAttempt(inst.name)
275
276 else:
277 if notepad.NumberOfRestartAttempts(inst.name):
278 notepad.RemoveInstance(inst.name)
279 if inst.status not in HELPLESS_STATES:
280 logging.info("Restart of instance '%s' succeeded", inst.name)
281
282 return started
283
284
285 -def _CheckDisks(cl, notepad, nodes, instances, started):
286 """Check all nodes for restarted ones.
287
288 """
289 check_nodes = []
290
291 for node in nodes.values():
292 old = notepad.GetNodeBootID(node.name)
293 if not node.bootid:
294
295 if not node.offline:
296 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
297 node.name)
298 continue
299
300 if old != node.bootid:
301
302 check_nodes.append(node)
303
304 if check_nodes:
305
306
307 for node in check_nodes:
308 for instance_name in node.secondaries:
309 try:
310 inst = instances[instance_name]
311 except KeyError:
312 logging.info("Can't find instance '%s', maybe it was ignored",
313 instance_name)
314 continue
315
316 if not inst.disks_active:
317 logging.info("Skipping disk activation for instance with not"
318 " activated disks '%s'", inst.name)
319 continue
320
321 if inst.name in started:
322
323
324 logging.debug("Skipping disk activation for instance '%s' as"
325 " it was already started", inst.name)
326 continue
327
328 try:
329 logging.info("Activating disks for instance '%s'", inst.name)
330 inst.ActivateDisks(cl)
331 except Exception:
332 logging.exception("Error while activating disks for instance '%s'",
333 inst.name)
334
335
336 for node in check_nodes:
337 notepad.SetNodeBootID(node.name, node.bootid)
338
341 """Checks if given instances has any secondary in offline status.
342
343 @param instance: The instance object
344 @return: True if any of the secondary is offline, False otherwise
345
346 """
347 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
348
351 """Run a per-group "gnt-cluster verify-disks".
352
353 """
354 op = opcodes.OpGroupVerifyDisks(
355 group_name=uuid, priority=constants.OP_PRIO_LOW)
356 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
357 "Verifying disks of group %s" % uuid,
358 utils.EpochNano())]
359 job_id = cl.SubmitJob([op])
360 ((_, offline_disk_instances, _), ) = \
361 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
362 cl.ArchiveJob(job_id)
363
364 if not offline_disk_instances:
365
366 logging.debug("Verify-disks reported no offline disks, nothing to do")
367 return
368
369 logging.debug("Will activate disks for instance(s) %s",
370 utils.CommaJoin(offline_disk_instances))
371
372
373
374 job = []
375 for name in offline_disk_instances:
376 try:
377 inst = instances[name]
378 except KeyError:
379 logging.info("Can't find instance '%s', maybe it was ignored", name)
380 continue
381
382 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
383 logging.info("Skipping instance '%s' because it is in a helpless state"
384 " or has offline secondaries", name)
385 continue
386
387 op = opcodes.OpInstanceActivateDisks(instance_name=name)
388 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
389 "Activating disks for instance %s" % name,
390 utils.EpochNano())]
391 job.append(op)
392
393 if job:
394 job_id = cli.SendJob(job, cl=cl)
395
396 try:
397 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
398 except Exception:
399 logging.exception("Error while activating disks")
400
403 """Connects to RAPI port and does a simple test.
404
405 Connects to RAPI port of hostname and does a simple test. At this time, the
406 test is GetVersion.
407
408 If RAPI responds with error code "401 Unauthorized", the test is successful,
409 because the aim of this function is to assess whether RAPI is responding, not
410 if it is accessible.
411
412 @type hostname: string
413 @param hostname: hostname of the node to connect to.
414 @rtype: bool
415 @return: Whether RAPI is working properly
416
417 """
418 curl_config = rapi.client.GenericCurlConfig()
419 rapi_client = rapi.client.GanetiRapiClient(hostname,
420 curl_config_fn=curl_config)
421 try:
422 master_version = rapi_client.GetVersion()
423 except rapi.client.CertificateError, err:
424 logging.warning("RAPI certificate error: %s", err)
425 return False
426 except rapi.client.GanetiApiError, err:
427 if err.code == 401:
428
429 return True
430 else:
431 logging.warning("RAPI error: %s", err)
432 return False
433 else:
434 logging.debug("Reported RAPI version %s", master_version)
435 return master_version == constants.RAPI_VERSION
436
439 """Probes an echo RPC to WConfD.
440
441 """
442 probe_string = "ganeti watcher probe %d" % time.time()
443
444 try:
445 result = wconfd.Client().Echo(probe_string)
446 except Exception, err:
447 logging.warning("WConfd connection error: %s", err)
448 return False
449
450 if result != probe_string:
451 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result)
452 return False
453
454 return True
455
458 """Parse the command line options.
459
460 @return: (options, args) as from OptionParser.parse_args()
461
462 """
463 parser = OptionParser(description="Ganeti cluster watcher",
464 usage="%prog [-d]",
465 version="%%prog (ganeti) %s" %
466 constants.RELEASE_VERSION)
467
468 parser.add_option(cli.DEBUG_OPT)
469 parser.add_option(cli.NODEGROUP_OPT)
470 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
471 help="Autoarchive jobs older than this age (default"
472 " 6 hours)")
473 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
474 action="store_true", help="Ignore cluster pause setting")
475 parser.add_option("--wait-children", dest="wait_children",
476 action="store_true", help="Wait for child processes")
477 parser.add_option("--no-wait-children", dest="wait_children",
478 action="store_false",
479 help="Don't wait for child processes")
480 parser.add_option("--rapi-ip", dest="rapi_ip",
481 default=constants.IP4_ADDRESS_LOCALHOST,
482 help="Use this IP to talk to RAPI.")
483
484 parser.set_defaults(wait_children=True)
485 options, args = parser.parse_args()
486 options.job_age = cli.ParseTimespec(options.job_age)
487
488 if args:
489 parser.error("No arguments expected")
490
491 return (options, args)
492
495 """Writes the per-group instance status file.
496
497 The entries are sorted.
498
499 @type filename: string
500 @param filename: Path to instance status file
501 @type data: list of tuple; (instance name as string, status as string)
502 @param data: Instance name and status
503
504 """
505 logging.debug("Updating instance status file '%s' with %s instances",
506 filename, len(data))
507
508 utils.WriteFile(filename,
509 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
510 sorted(data))))
511
514 """Writes an instance status file from L{Instance} objects.
515
516 @type filename: string
517 @param filename: Path to status file
518 @type instances: list of L{Instance}
519
520 """
521 _WriteInstanceStatus(filename, [(inst.name, inst.status)
522 for inst in instances])
523
526 """Reads an instance status file.
527
528 @type filename: string
529 @param filename: Path to status file
530 @rtype: tuple; (None or number, list of lists containing instance name and
531 status)
532 @return: File's mtime and instance status contained in the file; mtime is
533 C{None} if file can't be read
534
535 """
536 logging.debug("Reading per-group instance status from '%s'", filename)
537
538 statcb = utils.FileStatHelper()
539 try:
540 content = utils.ReadFile(filename, preread=statcb)
541 except EnvironmentError, err:
542 if err.errno == errno.ENOENT:
543 logging.error("Can't read '%s', does not exist (yet)", filename)
544 else:
545 logging.exception("Unable to read '%s', ignoring", filename)
546 return (None, None)
547 else:
548 return (statcb.st.st_mtime, [line.split(None, 1)
549 for line in content.splitlines()])
550
553 """Merges all per-group instance status files into a global one.
554
555 @type filename: string
556 @param filename: Path to global instance status file
557 @type pergroup_filename: string
558 @param pergroup_filename: Path to per-group status files, must contain "%s"
559 to be replaced with group UUID
560 @type groups: sequence
561 @param groups: UUIDs of known groups
562
563 """
564
565 lock = utils.FileLock.Open(filename)
566 try:
567 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
568 except errors.LockError, err:
569
570
571
572 logging.error("Can't acquire lock on instance status file '%s', not"
573 " updating: %s", filename, err)
574 return
575
576 logging.debug("Acquired exclusive lock on '%s'", filename)
577
578 data = {}
579
580
581 for group_uuid in groups:
582 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
583
584 if mtime is not None:
585 for (instance_name, status) in instdata:
586 data.setdefault(instance_name, []).append((mtime, status))
587
588
589 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
590 for (instance_name, status) in data.items()]
591
592
593
594 _WriteInstanceStatus(filename, inststatus)
595
622
625 """Starts a new instance of the watcher for every node group.
626
627 """
628 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
629 for arg in sys.argv)
630
631 result = cl.QueryGroups([], ["name", "uuid"], False)
632
633 children = []
634
635 for (idx, (name, uuid)) in enumerate(result):
636 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
637
638 if idx > 0:
639
640 time.sleep(CHILD_PROCESS_DELAY)
641
642 logging.debug("Spawning child for group '%s' (%s), arguments %s",
643 name, uuid, args)
644
645 try:
646
647 pid = os.spawnv(os.P_NOWAIT, args[0], args)
648 except Exception:
649 logging.exception("Failed to start child for group '%s' (%s)",
650 name, uuid)
651 else:
652 logging.debug("Started with PID %s", pid)
653 children.append(pid)
654
655 if wait:
656 for pid in children:
657 logging.debug("Waiting for child PID %s", pid)
658 try:
659 result = utils.RetryOnSignal(os.waitpid, pid, 0)
660 except EnvironmentError, err:
661 result = str(err)
662
663 logging.debug("Child PID %s exited with status %s", pid, result)
664
667 """Archives old jobs.
668
669 """
670 (arch_count, left_count) = cl.AutoArchiveJobs(age)
671 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
672
681
737
740 """Retrieves instances and nodes per node group.
741
742 """
743 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None)
744
745 prefix = "instance/"
746 prefix_len = len(prefix)
747
748 locked_instances = set()
749
750 for [[_, name], [_, lock]] in locks.data:
751 if name.startswith(prefix) and lock:
752 locked_instances.add(name[prefix_len:])
753
754 queries = [
755 (constants.QR_INSTANCE,
756 ["name", "status", "admin_state", "admin_state_source", "disks_active",
757 "snodes", "pnode.group.uuid", "snodes.group.uuid"],
758 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
759 (constants.QR_NODE,
760 ["name", "bootid", "offline"],
761 [qlang.OP_EQUAL, "group.uuid", uuid]),
762 ]
763
764 results = []
765 for what, fields, qfilter in queries:
766 results.append(qcl.Query(what, fields, qfilter))
767
768 results_data = map(operator.attrgetter("data"), results)
769
770
771 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
772
773
774 (raw_instances, raw_nodes) = [[map(compat.snd, values)
775 for values in res]
776 for res in results_data]
777
778 secondaries = {}
779 instances = []
780
781
782 for (name, status, config_state, config_state_source, disks_active, snodes,
783 pnode_group_uuid, snodes_group_uuid) in raw_instances:
784 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
785 logging.error("Ignoring split instance '%s', primary group %s, secondary"
786 " groups %s", name, pnode_group_uuid,
787 utils.CommaJoin(snodes_group_uuid))
788 else:
789 instances.append(Instance(name, status, config_state, config_state_source,
790 disks_active, snodes))
791
792 for node in snodes:
793 secondaries.setdefault(node, set()).add(name)
794
795
796 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
797 for (name, bootid, offline) in raw_nodes]
798
799 return (dict((node.name, node) for node in nodes),
800 dict((inst.name, inst) for inst in instances),
801 locked_instances)
802
805 """Returns a list of all node groups known by L{ssconf}.
806
807 """
808 groups = ssconf.SimpleStore().GetNodegroupList()
809
810 result = list(line.split(None, 1)[0] for line in groups
811 if line.strip())
812
813 if not compat.all(map(utils.UUID_RE.match, result)):
814 raise errors.GenericError("Ssconf contains invalid group UUID")
815
816 return result
817
820 """Main function for per-group watcher process.
821
822 """
823 group_uuid = opts.nodegroup.lower()
824
825 if not utils.UUID_RE.match(group_uuid):
826 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
827 " got '%s'" %
828 (cli.NODEGROUP_OPT_NAME, group_uuid))
829
830 logging.info("Watcher for node group '%s'", group_uuid)
831
832 known_groups = _LoadKnownGroups()
833
834
835 if group_uuid not in known_groups:
836 raise errors.GenericError("Node group '%s' is not known by ssconf" %
837 group_uuid)
838
839
840
841 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
842 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
843
844 logging.debug("Using state file %s", state_path)
845
846
847 statefile = state.OpenStateFile(state_path)
848 if not statefile:
849 return constants.EXIT_FAILURE
850
851 notepad = state.WatcherState(statefile)
852 try:
853
854 client = GetLuxiClient(False)
855
856 _CheckMaster(client)
857
858 (nodes, instances, locks) = _GetGroupData(client, group_uuid)
859
860
861 _UpdateInstanceStatus(inst_status_path, instances.values())
862
863 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
864 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
865 known_groups)
866
867 started = _CheckInstances(client, notepad, instances, locks)
868 _CheckDisks(client, notepad, nodes, instances, started)
869 _VerifyDisks(client, group_uuid, nodes, instances)
870 except Exception, err:
871 logging.info("Not updating status file due to failure: %s", err)
872 raise
873 else:
874
875 notepad.Save(state_path)
876
877 return constants.EXIT_SUCCESS
878
881 """Main function.
882
883 """
884 (options, _) = ParseOptions()
885
886 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
887 debug=options.debug, stderr_logging=options.debug)
888
889 if ShouldPause() and not options.ignore_pause:
890 logging.debug("Pause has been set, exiting")
891 return constants.EXIT_SUCCESS
892
893
894 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
895 try:
896 lock.Shared(blocking=False)
897 except (EnvironmentError, errors.LockError), err:
898 logging.error("Can't acquire lock on %s: %s",
899 pathutils.WATCHER_LOCK_FILE, err)
900 return constants.EXIT_SUCCESS
901
902 if options.nodegroup is None:
903 fn = _GlobalWatcher
904 else:
905
906 fn = _GroupWatcher
907
908 try:
909 return fn(options)
910 except (SystemExit, KeyboardInterrupt):
911 raise
912 except NotMasterError:
913 logging.debug("Not master, exiting")
914 return constants.EXIT_NOTMASTER
915 except errors.ResolverError, err:
916 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
917 return constants.EXIT_NODESETUP_ERROR
918 except errors.JobQueueFull:
919 logging.error("Job queue is full, can't query cluster state")
920 except errors.JobQueueDrainError:
921 logging.error("Job queue is drained, can't maintain cluster state")
922 except Exception, err:
923 logging.exception(str(err))
924 return constants.EXIT_FAILURE
925
926 return constants.EXIT_SUCCESS
927