1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
64 ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
68 ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72
73 CHILD_PROCESS_DELAY = 1.0
74
75
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
80 """Exception raised when this host is not the master."""
81
88
102
105 """Run the watcher hooks.
106
107 """
108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109 constants.HOOKS_NAME_WATCHER)
110 if not os.path.isdir(hooks_dir):
111 return
112
113 try:
114 results = utils.RunParts(hooks_dir)
115 except Exception, err:
116 logging.exception("RunParts %s failed: %s", hooks_dir, err)
117 return
118
119 for (relname, status, runresult) in results:
120 if status == constants.RUNPARTS_SKIP:
121 logging.debug("Watcher hook %s: skipped", relname)
122 elif status == constants.RUNPARTS_ERR:
123 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124 elif status == constants.RUNPARTS_RUN:
125 if runresult.failed:
126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127 relname, runresult.exit_code, runresult.output)
128 else:
129 logging.debug("Watcher hook %s: success (output: %s)", relname,
130 runresult.output)
131 else:
132 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133 status)
134
137 """Abstraction for a Virtual Machine instance.
138
139 """
140 - def __init__(self, name, status, disks_active, snodes):
141 self.name = name
142 self.status = status
143 self.disks_active = disks_active
144 self.snodes = snodes
145
152
159
162 """Data container representing cluster node.
163
164 """
165 - def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class.
167
168 """
169 self.name = name
170 self.bootid = bootid
171 self.offline = offline
172 self.secondaries = secondaries
173
176 """Make a pass over the list of instances, restarting downed ones.
177
178 """
179 notepad.MaintainInstanceList(instances.keys())
180
181 started = set()
182
183 for inst in instances.values():
184 if inst.status in BAD_STATES:
185 n = notepad.NumberOfRestartAttempts(inst.name)
186
187 if n > MAXTRIES:
188 logging.warning("Not restarting instance '%s', retries exhausted",
189 inst.name)
190 continue
191
192 if n == MAXTRIES:
193 notepad.RecordRestartAttempt(inst.name)
194 logging.error("Could not restart instance '%s' after %s attempts,"
195 " giving up", inst.name, MAXTRIES)
196 continue
197
198 try:
199 logging.info("Restarting instance '%s' (attempt #%s)",
200 inst.name, n + 1)
201 inst.Restart(cl)
202 except Exception:
203 logging.exception("Error while restarting instance '%s'", inst.name)
204 else:
205 started.add(inst.name)
206
207 notepad.RecordRestartAttempt(inst.name)
208
209 else:
210 if notepad.NumberOfRestartAttempts(inst.name):
211 notepad.RemoveInstance(inst.name)
212 if inst.status not in HELPLESS_STATES:
213 logging.info("Restart of instance '%s' succeeded", inst.name)
214
215 return started
216
217
218 -def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones.
220
221 """
222 check_nodes = []
223
224 for node in nodes.values():
225 old = notepad.GetNodeBootID(node.name)
226 if not node.bootid:
227
228 if not node.offline:
229 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230 node.name)
231 continue
232
233 if old != node.bootid:
234
235 check_nodes.append(node)
236
237 if check_nodes:
238
239
240 for node in check_nodes:
241 for instance_name in node.secondaries:
242 try:
243 inst = instances[instance_name]
244 except KeyError:
245 logging.info("Can't find instance '%s', maybe it was ignored",
246 instance_name)
247 continue
248
249 if not inst.disks_active:
250 logging.info("Skipping disk activation for instance with not"
251 " activated disks '%s'", inst.name)
252 continue
253
254 if inst.name in started:
255
256
257 logging.debug("Skipping disk activation for instance '%s' as"
258 " it was already started", inst.name)
259 continue
260
261 try:
262 logging.info("Activating disks for instance '%s'", inst.name)
263 inst.ActivateDisks(cl)
264 except Exception:
265 logging.exception("Error while activating disks for instance '%s'",
266 inst.name)
267
268
269 for node in check_nodes:
270 notepad.SetNodeBootID(node.name, node.bootid)
271
274 """Checks if given instances has any secondary in offline status.
275
276 @param instance: The instance object
277 @return: True if any of the secondary is offline, False otherwise
278
279 """
280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
284 """Run a per-group "gnt-cluster verify-disks".
285
286 """
287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(
288 group_name=uuid, priority=constants.OP_PRIO_LOW)])
289 ((_, offline_disk_instances, _), ) = \
290 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
291 cl.ArchiveJob(job_id)
292
293 if not offline_disk_instances:
294
295 logging.debug("Verify-disks reported no offline disks, nothing to do")
296 return
297
298 logging.debug("Will activate disks for instance(s) %s",
299 utils.CommaJoin(offline_disk_instances))
300
301
302
303 job = []
304 for name in offline_disk_instances:
305 try:
306 inst = instances[name]
307 except KeyError:
308 logging.info("Can't find instance '%s', maybe it was ignored", name)
309 continue
310
311 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
312 logging.info("Skipping instance '%s' because it is in a helpless state"
313 " or has offline secondaries", name)
314 continue
315
316 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
317
318 if job:
319 job_id = cli.SendJob(job, cl=cl)
320
321 try:
322 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
323 except Exception:
324 logging.exception("Error while activating disks")
325
328 """Connects to RAPI port and does a simple test.
329
330 Connects to RAPI port of hostname and does a simple test. At this time, the
331 test is GetVersion.
332
333 If RAPI responds with error code "401 Unauthorized", the test is successful,
334 because the aim of this function is to assess whether RAPI is responding, not
335 if it is accessible.
336
337 @type hostname: string
338 @param hostname: hostname of the node to connect to.
339 @rtype: bool
340 @return: Whether RAPI is working properly
341
342 """
343 curl_config = rapi.client.GenericCurlConfig()
344 rapi_client = rapi.client.GanetiRapiClient(hostname,
345 curl_config_fn=curl_config)
346 try:
347 master_version = rapi_client.GetVersion()
348 except rapi.client.CertificateError, err:
349 logging.warning("RAPI certificate error: %s", err)
350 return False
351 except rapi.client.GanetiApiError, err:
352 if err.code == 401:
353
354 return True
355 else:
356 logging.warning("RAPI error: %s", err)
357 return False
358 else:
359 logging.debug("Reported RAPI version %s", master_version)
360 return master_version == constants.RAPI_VERSION
361
364 """Parse the command line options.
365
366 @return: (options, args) as from OptionParser.parse_args()
367
368 """
369 parser = OptionParser(description="Ganeti cluster watcher",
370 usage="%prog [-d]",
371 version="%%prog (ganeti) %s" %
372 constants.RELEASE_VERSION)
373
374 parser.add_option(cli.DEBUG_OPT)
375 parser.add_option(cli.NODEGROUP_OPT)
376 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
377 help="Autoarchive jobs older than this age (default"
378 " 6 hours)")
379 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
380 action="store_true", help="Ignore cluster pause setting")
381 parser.add_option("--wait-children", dest="wait_children",
382 action="store_true", help="Wait for child processes")
383 parser.add_option("--no-wait-children", dest="wait_children",
384 action="store_false",
385 help="Don't wait for child processes")
386
387 parser.set_defaults(wait_children=True)
388 options, args = parser.parse_args()
389 options.job_age = cli.ParseTimespec(options.job_age)
390
391 if args:
392 parser.error("No arguments expected")
393
394 return (options, args)
395
398 """Writes the per-group instance status file.
399
400 The entries are sorted.
401
402 @type filename: string
403 @param filename: Path to instance status file
404 @type data: list of tuple; (instance name as string, status as string)
405 @param data: Instance name and status
406
407 """
408 logging.debug("Updating instance status file '%s' with %s instances",
409 filename, len(data))
410
411 utils.WriteFile(filename,
412 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
413 sorted(data))))
414
417 """Writes an instance status file from L{Instance} objects.
418
419 @type filename: string
420 @param filename: Path to status file
421 @type instances: list of L{Instance}
422
423 """
424 _WriteInstanceStatus(filename, [(inst.name, inst.status)
425 for inst in instances])
426
429 """Reads an instance status file.
430
431 @type filename: string
432 @param filename: Path to status file
433 @rtype: tuple; (None or number, list of lists containing instance name and
434 status)
435 @return: File's mtime and instance status contained in the file; mtime is
436 C{None} if file can't be read
437
438 """
439 logging.debug("Reading per-group instance status from '%s'", filename)
440
441 statcb = utils.FileStatHelper()
442 try:
443 content = utils.ReadFile(filename, preread=statcb)
444 except EnvironmentError, err:
445 if err.errno == errno.ENOENT:
446 logging.error("Can't read '%s', does not exist (yet)", filename)
447 else:
448 logging.exception("Unable to read '%s', ignoring", filename)
449 return (None, None)
450 else:
451 return (statcb.st.st_mtime, [line.split(None, 1)
452 for line in content.splitlines()])
453
456 """Merges all per-group instance status files into a global one.
457
458 @type filename: string
459 @param filename: Path to global instance status file
460 @type pergroup_filename: string
461 @param pergroup_filename: Path to per-group status files, must contain "%s"
462 to be replaced with group UUID
463 @type groups: sequence
464 @param groups: UUIDs of known groups
465
466 """
467
468 lock = utils.FileLock.Open(filename)
469 try:
470 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
471 except errors.LockError, err:
472
473
474
475 logging.error("Can't acquire lock on instance status file '%s', not"
476 " updating: %s", filename, err)
477 return
478
479 logging.debug("Acquired exclusive lock on '%s'", filename)
480
481 data = {}
482
483
484 for group_uuid in groups:
485 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
486
487 if mtime is not None:
488 for (instance_name, status) in instdata:
489 data.setdefault(instance_name, []).append((mtime, status))
490
491
492 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
493 for (instance_name, status) in data.items()]
494
495
496
497 _WriteInstanceStatus(filename, inststatus)
498
525
528 """Starts a new instance of the watcher for every node group.
529
530 """
531 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
532 for arg in sys.argv)
533
534 result = cl.QueryGroups([], ["name", "uuid"], False)
535
536 children = []
537
538 for (idx, (name, uuid)) in enumerate(result):
539 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
540
541 if idx > 0:
542
543 time.sleep(CHILD_PROCESS_DELAY)
544
545 logging.debug("Spawning child for group '%s' (%s), arguments %s",
546 name, uuid, args)
547
548 try:
549
550 pid = os.spawnv(os.P_NOWAIT, args[0], args)
551 except Exception:
552 logging.exception("Failed to start child for group '%s' (%s)",
553 name, uuid)
554 else:
555 logging.debug("Started with PID %s", pid)
556 children.append(pid)
557
558 if wait:
559 for pid in children:
560 logging.debug("Waiting for child PID %s", pid)
561 try:
562 result = utils.RetryOnSignal(os.waitpid, pid, 0)
563 except EnvironmentError, err:
564 result = str(err)
565
566 logging.debug("Child PID %s exited with status %s", pid, result)
567
570 """Archives old jobs.
571
572 """
573 (arch_count, left_count) = cl.AutoArchiveJobs(age)
574 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
575
584
629
632 """Retrieves instances and nodes per node group.
633
634 """
635 job = [
636
637 opcodes.OpQuery(what=constants.QR_INSTANCE,
638 fields=["name", "status", "disks_active", "snodes",
639 "pnode.group.uuid", "snodes.group.uuid"],
640 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
641 use_locking=True,
642 priority=constants.OP_PRIO_LOW),
643
644
645 opcodes.OpQuery(what=constants.QR_NODE,
646 fields=["name", "bootid", "offline"],
647 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
648 use_locking=True,
649 priority=constants.OP_PRIO_LOW),
650 ]
651
652 job_id = cl.SubmitJob(job)
653 results = map(objects.QueryResponse.FromDict,
654 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
655 cl.ArchiveJob(job_id)
656
657 results_data = map(operator.attrgetter("data"), results)
658
659
660 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
661
662
663 (raw_instances, raw_nodes) = [[map(compat.snd, values)
664 for values in res]
665 for res in results_data]
666
667 secondaries = {}
668 instances = []
669
670
671 for (name, status, disks_active, snodes, pnode_group_uuid,
672 snodes_group_uuid) in raw_instances:
673 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
674 logging.error("Ignoring split instance '%s', primary group %s, secondary"
675 " groups %s", name, pnode_group_uuid,
676 utils.CommaJoin(snodes_group_uuid))
677 else:
678 instances.append(Instance(name, status, disks_active, snodes))
679
680 for node in snodes:
681 secondaries.setdefault(node, set()).add(name)
682
683
684 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
685 for (name, bootid, offline) in raw_nodes]
686
687 return (dict((node.name, node) for node in nodes),
688 dict((inst.name, inst) for inst in instances))
689
692 """Returns a list of all node groups known by L{ssconf}.
693
694 """
695 groups = ssconf.SimpleStore().GetNodegroupList()
696
697 result = list(line.split(None, 1)[0] for line in groups
698 if line.strip())
699
700 if not compat.all(map(utils.UUID_RE.match, result)):
701 raise errors.GenericError("Ssconf contains invalid group UUID")
702
703 return result
704
707 """Main function for per-group watcher process.
708
709 """
710 group_uuid = opts.nodegroup.lower()
711
712 if not utils.UUID_RE.match(group_uuid):
713 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
714 " got '%s'" %
715 (cli.NODEGROUP_OPT_NAME, group_uuid))
716
717 logging.info("Watcher for node group '%s'", group_uuid)
718
719 known_groups = _LoadKnownGroups()
720
721
722 if group_uuid not in known_groups:
723 raise errors.GenericError("Node group '%s' is not known by ssconf" %
724 group_uuid)
725
726
727
728 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
729 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
730
731 logging.debug("Using state file %s", state_path)
732
733
734 statefile = state.OpenStateFile(state_path)
735 if not statefile:
736 return constants.EXIT_FAILURE
737
738 notepad = state.WatcherState(statefile)
739 try:
740
741 client = GetLuxiClient(False)
742
743 _CheckMaster(client)
744
745 (nodes, instances) = _GetGroupData(client, group_uuid)
746
747
748 _UpdateInstanceStatus(inst_status_path, instances.values())
749
750 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
751 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
752 known_groups)
753
754 started = _CheckInstances(client, notepad, instances)
755 _CheckDisks(client, notepad, nodes, instances, started)
756 _VerifyDisks(client, group_uuid, nodes, instances)
757 except Exception, err:
758 logging.info("Not updating status file due to failure: %s", err)
759 raise
760 else:
761
762 notepad.Save(state_path)
763
764 return constants.EXIT_SUCCESS
765
768 """Main function.
769
770 """
771 (options, _) = ParseOptions()
772
773 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
774 debug=options.debug, stderr_logging=options.debug)
775
776 if ShouldPause() and not options.ignore_pause:
777 logging.debug("Pause has been set, exiting")
778 return constants.EXIT_SUCCESS
779
780
781 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
782 try:
783 lock.Shared(blocking=False)
784 except (EnvironmentError, errors.LockError), err:
785 logging.error("Can't acquire lock on %s: %s",
786 pathutils.WATCHER_LOCK_FILE, err)
787 return constants.EXIT_SUCCESS
788
789 if options.nodegroup is None:
790 fn = _GlobalWatcher
791 else:
792
793 fn = _GroupWatcher
794
795 try:
796 return fn(options)
797 except (SystemExit, KeyboardInterrupt):
798 raise
799 except NotMasterError:
800 logging.debug("Not master, exiting")
801 return constants.EXIT_NOTMASTER
802 except errors.ResolverError, err:
803 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
804 return constants.EXIT_NODESETUP_ERROR
805 except errors.JobQueueFull:
806 logging.error("Job queue is full, can't query cluster state")
807 except errors.JobQueueDrainError:
808 logging.error("Job queue is drained, can't maintain cluster state")
809 except Exception, err:
810 logging.exception(str(err))
811 return constants.EXIT_FAILURE
812
813 return constants.EXIT_SUCCESS
814