1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import time
43 import logging
44 import operator
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import constants
50 from ganeti import compat
51 from ganeti import errors
52 from ganeti import opcodes
53 from ganeti import cli
54 from ganeti import luxi
55 from ganeti import rapi
56 from ganeti import netutils
57 from ganeti import qlang
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import ht
61 from ganeti import pathutils
62
63 import ganeti.rapi.client
64 from ganeti.rapi.client import UsesRapiClient
65
66 from ganeti.watcher import nodemaint
67 from ganeti.watcher import state
68
69
70 MAXTRIES = 5
71 BAD_STATES = compat.UniqueFrozenset([
72 constants.INSTST_ERRORDOWN,
73 ])
74 HELPLESS_STATES = compat.UniqueFrozenset([
75 constants.INSTST_NODEDOWN,
76 constants.INSTST_NODEOFFLINE,
77 ])
78 NOTICE = "NOTICE"
79 ERROR = "ERROR"
80
81
82 CHILD_PROCESS_DELAY = 1.0
83
84
85 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
89 """Exception raised when this host is not the master."""
90
97
111
114 """Run the watcher hooks.
115
116 """
117 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
118 constants.HOOKS_NAME_WATCHER)
119 if not os.path.isdir(hooks_dir):
120 return
121
122 try:
123 results = utils.RunParts(hooks_dir)
124 except Exception, err:
125 logging.exception("RunParts %s failed: %s", hooks_dir, err)
126 return
127
128 for (relname, status, runresult) in results:
129 if status == constants.RUNPARTS_SKIP:
130 logging.debug("Watcher hook %s: skipped", relname)
131 elif status == constants.RUNPARTS_ERR:
132 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
133 elif status == constants.RUNPARTS_RUN:
134 if runresult.failed:
135 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
136 relname, runresult.exit_code, runresult.output)
137 else:
138 logging.debug("Watcher hook %s: success (output: %s)", relname,
139 runresult.output)
140 else:
141 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
142 status)
143
146 """Abstraction for a Virtual Machine instance.
147
148 """
149 - def __init__(self, name, status, disks_active, snodes):
150 self.name = name
151 self.status = status
152 self.disks_active = disks_active
153 self.snodes = snodes
154
161
168
169
170 -class Node(object):
171 """Data container representing cluster node.
172
173 """
174 - def __init__(self, name, bootid, offline, secondaries):
175 """Initializes this class.
176
177 """
178 self.name = name
179 self.bootid = bootid
180 self.offline = offline
181 self.secondaries = secondaries
182
185 """Make a pass over the list of instances, restarting downed ones.
186
187 """
188 notepad.MaintainInstanceList(instances.keys())
189
190 started = set()
191
192 for inst in instances.values():
193 if inst.status in BAD_STATES:
194 n = notepad.NumberOfRestartAttempts(inst.name)
195
196 if n > MAXTRIES:
197 logging.warning("Not restarting instance '%s', retries exhausted",
198 inst.name)
199 continue
200
201 if n == MAXTRIES:
202 notepad.RecordRestartAttempt(inst.name)
203 logging.error("Could not restart instance '%s' after %s attempts,"
204 " giving up", inst.name, MAXTRIES)
205 continue
206
207 try:
208 logging.info("Restarting instance '%s' (attempt #%s)",
209 inst.name, n + 1)
210 inst.Restart(cl)
211 except Exception:
212 logging.exception("Error while restarting instance '%s'", inst.name)
213 else:
214 started.add(inst.name)
215
216 notepad.RecordRestartAttempt(inst.name)
217
218 else:
219 if notepad.NumberOfRestartAttempts(inst.name):
220 notepad.RemoveInstance(inst.name)
221 if inst.status not in HELPLESS_STATES:
222 logging.info("Restart of instance '%s' succeeded", inst.name)
223
224 return started
225
226
227 -def _CheckDisks(cl, notepad, nodes, instances, started):
228 """Check all nodes for restarted ones.
229
230 """
231 check_nodes = []
232
233 for node in nodes.values():
234 old = notepad.GetNodeBootID(node.name)
235 if not node.bootid:
236
237 if not node.offline:
238 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
239 node.name)
240 continue
241
242 if old != node.bootid:
243
244 check_nodes.append(node)
245
246 if check_nodes:
247
248
249 for node in check_nodes:
250 for instance_name in node.secondaries:
251 try:
252 inst = instances[instance_name]
253 except KeyError:
254 logging.info("Can't find instance '%s', maybe it was ignored",
255 instance_name)
256 continue
257
258 if not inst.disks_active:
259 logging.info("Skipping disk activation for instance with not"
260 " activated disks '%s'", inst.name)
261 continue
262
263 if inst.name in started:
264
265
266 logging.debug("Skipping disk activation for instance '%s' as"
267 " it was already started", inst.name)
268 continue
269
270 try:
271 logging.info("Activating disks for instance '%s'", inst.name)
272 inst.ActivateDisks(cl)
273 except Exception:
274 logging.exception("Error while activating disks for instance '%s'",
275 inst.name)
276
277
278 for node in check_nodes:
279 notepad.SetNodeBootID(node.name, node.bootid)
280
283 """Checks if given instances has any secondary in offline status.
284
285 @param instance: The instance object
286 @return: True if any of the secondary is offline, False otherwise
287
288 """
289 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
290
293 """Run a per-group "gnt-cluster verify-disks".
294
295 """
296 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(
297 group_name=uuid, priority=constants.OP_PRIO_LOW)])
298 ((_, offline_disk_instances, _), ) = \
299 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
300 cl.ArchiveJob(job_id)
301
302 if not offline_disk_instances:
303
304 logging.debug("Verify-disks reported no offline disks, nothing to do")
305 return
306
307 logging.debug("Will activate disks for instance(s) %s",
308 utils.CommaJoin(offline_disk_instances))
309
310
311
312 job = []
313 for name in offline_disk_instances:
314 try:
315 inst = instances[name]
316 except KeyError:
317 logging.info("Can't find instance '%s', maybe it was ignored", name)
318 continue
319
320 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
321 logging.info("Skipping instance '%s' because it is in a helpless state"
322 " or has offline secondaries", name)
323 continue
324
325 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
326
327 if job:
328 job_id = cli.SendJob(job, cl=cl)
329
330 try:
331 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
332 except Exception:
333 logging.exception("Error while activating disks")
334
337 """Connects to RAPI port and does a simple test.
338
339 Connects to RAPI port of hostname and does a simple test. At this time, the
340 test is GetVersion.
341
342 If RAPI responds with error code "401 Unauthorized", the test is successful,
343 because the aim of this function is to assess whether RAPI is responding, not
344 if it is accessible.
345
346 @type hostname: string
347 @param hostname: hostname of the node to connect to.
348 @rtype: bool
349 @return: Whether RAPI is working properly
350
351 """
352 curl_config = rapi.client.GenericCurlConfig()
353 rapi_client = rapi.client.GanetiRapiClient(hostname,
354 curl_config_fn=curl_config)
355 try:
356 master_version = rapi_client.GetVersion()
357 except rapi.client.CertificateError, err:
358 logging.warning("RAPI certificate error: %s", err)
359 return False
360 except rapi.client.GanetiApiError, err:
361 if err.code == 401:
362
363 return True
364 else:
365 logging.warning("RAPI error: %s", err)
366 return False
367 else:
368 logging.debug("Reported RAPI version %s", master_version)
369 return master_version == constants.RAPI_VERSION
370
373 """Parse the command line options.
374
375 @return: (options, args) as from OptionParser.parse_args()
376
377 """
378 parser = OptionParser(description="Ganeti cluster watcher",
379 usage="%prog [-d]",
380 version="%%prog (ganeti) %s" %
381 constants.RELEASE_VERSION)
382
383 parser.add_option(cli.DEBUG_OPT)
384 parser.add_option(cli.NODEGROUP_OPT)
385 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
386 help="Autoarchive jobs older than this age (default"
387 " 6 hours)")
388 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
389 action="store_true", help="Ignore cluster pause setting")
390 parser.add_option("--wait-children", dest="wait_children",
391 action="store_true", help="Wait for child processes")
392 parser.add_option("--no-wait-children", dest="wait_children",
393 action="store_false",
394 help="Don't wait for child processes")
395
396 parser.set_defaults(wait_children=True)
397 options, args = parser.parse_args()
398 options.job_age = cli.ParseTimespec(options.job_age)
399
400 if args:
401 parser.error("No arguments expected")
402
403 return (options, args)
404
407 """Writes the per-group instance status file.
408
409 The entries are sorted.
410
411 @type filename: string
412 @param filename: Path to instance status file
413 @type data: list of tuple; (instance name as string, status as string)
414 @param data: Instance name and status
415
416 """
417 logging.debug("Updating instance status file '%s' with %s instances",
418 filename, len(data))
419
420 utils.WriteFile(filename,
421 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
422 sorted(data))))
423
426 """Writes an instance status file from L{Instance} objects.
427
428 @type filename: string
429 @param filename: Path to status file
430 @type instances: list of L{Instance}
431
432 """
433 _WriteInstanceStatus(filename, [(inst.name, inst.status)
434 for inst in instances])
435
438 """Reads an instance status file.
439
440 @type filename: string
441 @param filename: Path to status file
442 @rtype: tuple; (None or number, list of lists containing instance name and
443 status)
444 @return: File's mtime and instance status contained in the file; mtime is
445 C{None} if file can't be read
446
447 """
448 logging.debug("Reading per-group instance status from '%s'", filename)
449
450 statcb = utils.FileStatHelper()
451 try:
452 content = utils.ReadFile(filename, preread=statcb)
453 except EnvironmentError, err:
454 if err.errno == errno.ENOENT:
455 logging.error("Can't read '%s', does not exist (yet)", filename)
456 else:
457 logging.exception("Unable to read '%s', ignoring", filename)
458 return (None, None)
459 else:
460 return (statcb.st.st_mtime, [line.split(None, 1)
461 for line in content.splitlines()])
462
465 """Merges all per-group instance status files into a global one.
466
467 @type filename: string
468 @param filename: Path to global instance status file
469 @type pergroup_filename: string
470 @param pergroup_filename: Path to per-group status files, must contain "%s"
471 to be replaced with group UUID
472 @type groups: sequence
473 @param groups: UUIDs of known groups
474
475 """
476
477 lock = utils.FileLock.Open(filename)
478 try:
479 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
480 except errors.LockError, err:
481
482
483
484 logging.error("Can't acquire lock on instance status file '%s', not"
485 " updating: %s", filename, err)
486 return
487
488 logging.debug("Acquired exclusive lock on '%s'", filename)
489
490 data = {}
491
492
493 for group_uuid in groups:
494 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
495
496 if mtime is not None:
497 for (instance_name, status) in instdata:
498 data.setdefault(instance_name, []).append((mtime, status))
499
500
501 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
502 for (instance_name, status) in data.items()]
503
504
505
506 _WriteInstanceStatus(filename, inststatus)
507
534
537 """Starts a new instance of the watcher for every node group.
538
539 """
540 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
541 for arg in sys.argv)
542
543 result = cl.QueryGroups([], ["name", "uuid"], False)
544
545 children = []
546
547 for (idx, (name, uuid)) in enumerate(result):
548 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
549
550 if idx > 0:
551
552 time.sleep(CHILD_PROCESS_DELAY)
553
554 logging.debug("Spawning child for group '%s' (%s), arguments %s",
555 name, uuid, args)
556
557 try:
558
559 pid = os.spawnv(os.P_NOWAIT, args[0], args)
560 except Exception:
561 logging.exception("Failed to start child for group '%s' (%s)",
562 name, uuid)
563 else:
564 logging.debug("Started with PID %s", pid)
565 children.append(pid)
566
567 if wait:
568 for pid in children:
569 logging.debug("Waiting for child PID %s", pid)
570 try:
571 result = utils.RetryOnSignal(os.waitpid, pid, 0)
572 except EnvironmentError, err:
573 result = str(err)
574
575 logging.debug("Child PID %s exited with status %s", pid, result)
576
579 """Archives old jobs.
580
581 """
582 (arch_count, left_count) = cl.AutoArchiveJobs(age)
583 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
584
593
638
641 """Retrieves instances and nodes per node group.
642
643 """
644 job = [
645
646 opcodes.OpQuery(what=constants.QR_INSTANCE,
647 fields=["name", "status", "disks_active", "snodes",
648 "pnode.group.uuid", "snodes.group.uuid"],
649 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
650 use_locking=True,
651 priority=constants.OP_PRIO_LOW),
652
653
654 opcodes.OpQuery(what=constants.QR_NODE,
655 fields=["name", "bootid", "offline"],
656 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
657 use_locking=True,
658 priority=constants.OP_PRIO_LOW),
659 ]
660
661 job_id = cl.SubmitJob(job)
662 results = map(objects.QueryResponse.FromDict,
663 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
664 cl.ArchiveJob(job_id)
665
666 results_data = map(operator.attrgetter("data"), results)
667
668
669 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
670
671
672 (raw_instances, raw_nodes) = [[map(compat.snd, values)
673 for values in res]
674 for res in results_data]
675
676 secondaries = {}
677 instances = []
678
679
680 for (name, status, disks_active, snodes, pnode_group_uuid,
681 snodes_group_uuid) in raw_instances:
682 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
683 logging.error("Ignoring split instance '%s', primary group %s, secondary"
684 " groups %s", name, pnode_group_uuid,
685 utils.CommaJoin(snodes_group_uuid))
686 else:
687 instances.append(Instance(name, status, disks_active, snodes))
688
689 for node in snodes:
690 secondaries.setdefault(node, set()).add(name)
691
692
693 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
694 for (name, bootid, offline) in raw_nodes]
695
696 return (dict((node.name, node) for node in nodes),
697 dict((inst.name, inst) for inst in instances))
698
701 """Returns a list of all node groups known by L{ssconf}.
702
703 """
704 groups = ssconf.SimpleStore().GetNodegroupList()
705
706 result = list(line.split(None, 1)[0] for line in groups
707 if line.strip())
708
709 if not compat.all(map(utils.UUID_RE.match, result)):
710 raise errors.GenericError("Ssconf contains invalid group UUID")
711
712 return result
713
716 """Main function for per-group watcher process.
717
718 """
719 group_uuid = opts.nodegroup.lower()
720
721 if not utils.UUID_RE.match(group_uuid):
722 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
723 " got '%s'" %
724 (cli.NODEGROUP_OPT_NAME, group_uuid))
725
726 logging.info("Watcher for node group '%s'", group_uuid)
727
728 known_groups = _LoadKnownGroups()
729
730
731 if group_uuid not in known_groups:
732 raise errors.GenericError("Node group '%s' is not known by ssconf" %
733 group_uuid)
734
735
736
737 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
738 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
739
740 logging.debug("Using state file %s", state_path)
741
742
743 statefile = state.OpenStateFile(state_path)
744 if not statefile:
745 return constants.EXIT_FAILURE
746
747 notepad = state.WatcherState(statefile)
748 try:
749
750 client = GetLuxiClient(False)
751
752 _CheckMaster(client)
753
754 (nodes, instances) = _GetGroupData(client, group_uuid)
755
756
757 _UpdateInstanceStatus(inst_status_path, instances.values())
758
759 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
760 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
761 known_groups)
762
763 started = _CheckInstances(client, notepad, instances)
764 _CheckDisks(client, notepad, nodes, instances, started)
765 _VerifyDisks(client, group_uuid, nodes, instances)
766 except Exception, err:
767 logging.info("Not updating status file due to failure: %s", err)
768 raise
769 else:
770
771 notepad.Save(state_path)
772
773 return constants.EXIT_SUCCESS
774
777 """Main function.
778
779 """
780 (options, _) = ParseOptions()
781
782 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
783 debug=options.debug, stderr_logging=options.debug)
784
785 if ShouldPause() and not options.ignore_pause:
786 logging.debug("Pause has been set, exiting")
787 return constants.EXIT_SUCCESS
788
789
790 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
791 try:
792 lock.Shared(blocking=False)
793 except (EnvironmentError, errors.LockError), err:
794 logging.error("Can't acquire lock on %s: %s",
795 pathutils.WATCHER_LOCK_FILE, err)
796 return constants.EXIT_SUCCESS
797
798 if options.nodegroup is None:
799 fn = _GlobalWatcher
800 else:
801
802 fn = _GroupWatcher
803
804 try:
805 return fn(options)
806 except (SystemExit, KeyboardInterrupt):
807 raise
808 except NotMasterError:
809 logging.debug("Not master, exiting")
810 return constants.EXIT_NOTMASTER
811 except errors.ResolverError, err:
812 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
813 return constants.EXIT_NODESETUP_ERROR
814 except errors.JobQueueFull:
815 logging.error("Job queue is full, can't query cluster state")
816 except errors.JobQueueDrainError:
817 logging.error("Job queue is drained, can't maintain cluster state")
818 except Exception, err:
819 logging.exception(str(err))
820 return constants.EXIT_FAILURE
821
822 return constants.EXIT_SUCCESS
823