1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30
31
32
33
34 import os
35 import os.path
36 import sys
37 import time
38 import logging
39 from optparse import OptionParser
40
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import serializer
45 from ganeti import errors
46 from ganeti import opcodes
47 from ganeti import cli
48 from ganeti import luxi
49 from ganeti import ssconf
50 from ganeti import bdev
51 from ganeti import hypervisor
52 from ganeti import rapi
53 from ganeti.confd import client as confd_client
54 from ganeti import netutils
55
56 import ganeti.rapi.client
57
58
59 MAXTRIES = 5
60
61
62
63
64 RETRY_EXPIRATION = 8 * 3600
65 BAD_STATES = ['ERROR_down']
66 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
67 NOTICE = 'NOTICE'
68 ERROR = 'ERROR'
69 KEY_RESTART_COUNT = "restart_count"
70 KEY_RESTART_WHEN = "restart_when"
71 KEY_BOOT_ID = "bootid"
72
73
74
75 client = None
79 """Exception raised when this host is not the master."""
80
87
97
100 """Run the watcher hooks.
101
102 """
103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104 constants.HOOKS_NAME_WATCHER)
105 if not os.path.isdir(hooks_dir):
106 return
107
108 try:
109 results = utils.RunParts(hooks_dir)
110 except Exception:
111 logging.exception("RunParts %s failed: %s", hooks_dir)
112 return
113
114 for (relname, status, runresult) in results:
115 if status == constants.RUNPARTS_SKIP:
116 logging.debug("Watcher hook %s: skipped", relname)
117 elif status == constants.RUNPARTS_ERR:
118 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119 elif status == constants.RUNPARTS_RUN:
120 if runresult.failed:
121 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122 relname, runresult.exit_code, runresult.output)
123 else:
124 logging.debug("Watcher hook %s: success (output: %s)", relname,
125 runresult.output)
126
127
128 -class NodeMaintenance(object):
129 """Talks to confd daemons and possible shutdown instances/drbd devices.
130
131 """
132 - def __init__(self):
133 self.store_cb = confd_client.StoreResultCallback()
134 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
135 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
136
137 @staticmethod
139 """Checks whether node maintenance should run.
140
141 """
142 try:
143 return ssconf.SimpleStore().GetMaintainNodeHealth()
144 except errors.ConfigurationError, err:
145 logging.error("Configuration error, not activating node maintenance: %s",
146 err)
147 return False
148
149 @staticmethod
151 """Compute list of hypervisor/running instances.
152
153 """
154 hyp_list = ssconf.SimpleStore().GetHypervisorList()
155 results = []
156 for hv_name in hyp_list:
157 try:
158 hv = hypervisor.GetHypervisor(hv_name)
159 ilist = hv.ListInstances()
160 results.extend([(iname, hv_name) for iname in ilist])
161 except:
162 logging.error("Error while listing instances for hypervisor %s",
163 hv_name, exc_info=True)
164 return results
165
166 @staticmethod
168 """Get list of used DRBD minors.
169
170 """
171 return bdev.DRBD8.GetUsedDevs().keys()
172
173 @classmethod
174 - def DoMaintenance(cls, role):
175 """Maintain the instance list.
176
177 """
178 if role == constants.CONFD_NODE_ROLE_OFFLINE:
179 inst_running = cls.GetRunningInstances()
180 cls.ShutdownInstances(inst_running)
181 drbd_running = cls.GetUsedDRBDs()
182 cls.ShutdownDRBD(drbd_running)
183 else:
184 logging.debug("Not doing anything for role %s", role)
185
186 @staticmethod
187 - def ShutdownInstances(inst_running):
188 """Shutdown running instances.
189
190 """
191 names_running = set([i[0] for i in inst_running])
192 if names_running:
193 logging.info("Following instances should not be running,"
194 " shutting them down: %s", utils.CommaJoin(names_running))
195
196
197 i2h = dict(inst_running)
198 for name in names_running:
199 hv_name = i2h[name]
200 hv = hypervisor.GetHypervisor(hv_name)
201 hv.StopInstance(None, force=True, name=name)
202
203 @staticmethod
204 - def ShutdownDRBD(drbd_running):
205 """Shutdown active DRBD devices.
206
207 """
208 if drbd_running:
209 logging.info("Following DRBD minors should not be active,"
210 " shutting them down: %s", utils.CommaJoin(drbd_running))
211 for minor in drbd_running:
212
213
214
215 bdev.DRBD8._ShutdownAll(minor)
216
218 """Check node status versus cluster desired state.
219
220 """
221 my_name = netutils.Hostname.GetSysName()
222 req = confd_client.ConfdClientRequest(type=
223 constants.CONFD_REQ_NODE_ROLE_BYNAME,
224 query=my_name)
225 self.confd_client.SendRequest(req, async=False, coverage=-1)
226 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
227 if not timed_out:
228
229 status, result = self.store_cb.GetResponse(req.rsalt)
230 assert status, "Missing result but received replies"
231 if not self.filter_cb.consistent[req.rsalt]:
232 logging.warning("Inconsistent replies, not doing anything")
233 return
234 self.DoMaintenance(result.server_reply.answer)
235 else:
236 logging.warning("Confd query timed out, cannot do maintenance actions")
237
240 """Interface to a state file recording restart attempts.
241
242 """
244 """Open, lock, read and parse the file.
245
246 @type statefile: file
247 @param statefile: State file object
248
249 """
250 self.statefile = statefile
251
252 try:
253 state_data = self.statefile.read()
254 if not state_data:
255 self._data = {}
256 else:
257 self._data = serializer.Load(state_data)
258 except Exception, msg:
259
260 self._data = {}
261 logging.warning(("Invalid state file. Using defaults."
262 " Error message: %s"), msg)
263
264 if "instance" not in self._data:
265 self._data["instance"] = {}
266 if "node" not in self._data:
267 self._data["node"] = {}
268
269 self._orig_data = serializer.Dump(self._data)
270
272 """Save state to file, then unlock and close it.
273
274 """
275 assert self.statefile
276
277 serialized_form = serializer.Dump(self._data)
278 if self._orig_data == serialized_form:
279 logging.debug("Data didn't change, just touching status file")
280 os.utime(constants.WATCHER_STATEFILE, None)
281 return
282
283
284
285 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
286 data=serialized_form,
287 prewrite=utils.LockFile, close=False)
288 self.statefile = os.fdopen(fd, 'w+')
289
291 """Unlock configuration file and close it.
292
293 """
294 assert self.statefile
295
296
297 self.statefile.close()
298 self.statefile = None
299
301 """Returns the last boot ID of a node or None.
302
303 """
304 ndata = self._data["node"]
305
306 if name in ndata and KEY_BOOT_ID in ndata[name]:
307 return ndata[name][KEY_BOOT_ID]
308 return None
309
311 """Sets the boot ID of a node.
312
313 """
314 assert bootid
315
316 ndata = self._data["node"]
317
318 if name not in ndata:
319 ndata[name] = {}
320
321 ndata[name][KEY_BOOT_ID] = bootid
322
324 """Returns number of previous restart attempts.
325
326 @type instance: L{Instance}
327 @param instance: the instance to look up
328
329 """
330 idata = self._data["instance"]
331
332 if instance.name in idata:
333 return idata[instance.name][KEY_RESTART_COUNT]
334
335 return 0
336
337 - def MaintainInstanceList(self, instances):
338 """Perform maintenance on the recorded instances.
339
340 @type instances: list of string
341 @param instances: the list of currently existing instances
342
343 """
344 idict = self._data["instance"]
345
346 obsolete_instances = set(idict).difference(instances)
347 for inst in obsolete_instances:
348 logging.debug("Forgetting obsolete instance %s", inst)
349 del idict[inst]
350
351
352 earliest = time.time() - RETRY_EXPIRATION
353 expired_instances = [i for i in idict
354 if idict[i][KEY_RESTART_WHEN] < earliest]
355 for inst in expired_instances:
356 logging.debug("Expiring record for instance %s", inst)
357 del idict[inst]
358
360 """Record a restart attempt.
361
362 @type instance: L{Instance}
363 @param instance: the instance being restarted
364
365 """
366 idata = self._data["instance"]
367
368 if instance.name not in idata:
369 inst = idata[instance.name] = {}
370 else:
371 inst = idata[instance.name]
372
373 inst[KEY_RESTART_WHEN] = time.time()
374 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
375
377 """Update state to reflect that a machine is running.
378
379 This method removes the record for a named instance (as we only
380 track down instances).
381
382 @type instance: L{Instance}
383 @param instance: the instance to remove from books
384
385 """
386 idata = self._data["instance"]
387
388 if instance.name in idata:
389 del idata[instance.name]
390
393 """Abstraction for a Virtual Machine instance.
394
395 """
396 - def __init__(self, name, state, autostart, snodes):
397 self.name = name
398 self.state = state
399 self.autostart = autostart
400 self.snodes = snodes
401
408
415
418 """Get a list of instances on this cluster.
419
420 """
421 op1_fields = ["name", "status", "admin_state", "snodes"]
422 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
423 use_locking=True)
424 op2_fields = ["name", "bootid", "offline"]
425 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
426 use_locking=True)
427
428 job_id = client.SubmitJob([op1, op2])
429
430 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
431
432 logging.debug("Got data from cluster, writing instance status file")
433
434 result = all_results[0]
435 smap = {}
436
437 instances = {}
438
439
440 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
441 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
442
443 for fields in result:
444 (name, status, autostart, snodes) = fields
445
446
447 for node in snodes:
448 if node not in smap:
449 smap[node] = []
450 smap[node].append(name)
451
452 instances[name] = Instance(name, status, autostart, snodes)
453
454 nodes = dict([(name, (bootid, offline))
455 for name, bootid, offline in all_results[1]])
456
457 client.ArchiveJob(job_id)
458
459 return instances, nodes, smap
460
463 """Encapsulate the logic for restarting erroneously halted virtual machines.
464
465 The calling program should periodically instantiate me and call Run().
466 This will traverse the list of instances, and make up to MAXTRIES attempts
467 to restart machines that are down.
468
469 """
481
490
491 @staticmethod
493 """Archive old jobs.
494
495 """
496 arch_count, left_count = client.AutoArchiveJobs(age)
497 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
498
500 """Check all nodes for restarted ones.
501
502 """
503 check_nodes = []
504 for name, (new_id, offline) in self.bootids.iteritems():
505 old = notepad.GetNodeBootID(name)
506 if new_id is None:
507
508 if not offline:
509 logging.debug("Node %s missing boot id, skipping secondary checks",
510 name)
511 continue
512 if old != new_id:
513
514 check_nodes.append(name)
515
516 if check_nodes:
517
518
519 for node in check_nodes:
520 if node not in self.smap:
521 continue
522 for instance_name in self.smap[node]:
523 instance = self.instances[instance_name]
524 if not instance.autostart:
525 logging.info(("Skipping disk activation for non-autostart"
526 " instance %s"), instance.name)
527 continue
528 if instance.name in self.started_instances:
529
530
531 continue
532 try:
533 logging.info("Activating disks for instance %s", instance.name)
534 instance.ActivateDisks()
535 except Exception:
536 logging.exception("Error while activating disks for instance %s",
537 instance.name)
538
539
540 for name in check_nodes:
541 notepad.SetNodeBootID(name, self.bootids[name][0])
542
544 """Make a pass over the list of instances, restarting downed ones.
545
546 """
547 notepad.MaintainInstanceList(self.instances.keys())
548
549 for instance in self.instances.values():
550 if instance.state in BAD_STATES:
551 n = notepad.NumberOfRestartAttempts(instance)
552
553 if n > MAXTRIES:
554 logging.warning("Not restarting instance %s, retries exhausted",
555 instance.name)
556 continue
557 elif n < MAXTRIES:
558 last = " (Attempt #%d)" % (n + 1)
559 else:
560 notepad.RecordRestartAttempt(instance)
561 logging.error("Could not restart %s after %d attempts, giving up",
562 instance.name, MAXTRIES)
563 continue
564 try:
565 logging.info("Restarting %s%s",
566 instance.name, last)
567 instance.Restart()
568 self.started_instances.add(instance.name)
569 except Exception:
570 logging.exception("Error while restarting instance %s",
571 instance.name)
572
573 notepad.RecordRestartAttempt(instance)
574 elif instance.state in HELPLESS_STATES:
575 if notepad.NumberOfRestartAttempts(instance):
576 notepad.RemoveInstance(instance)
577 else:
578 if notepad.NumberOfRestartAttempts(instance):
579 notepad.RemoveInstance(instance)
580 logging.info("Restart of %s succeeded", instance.name)
581
583 """Checks if given instances has any secondary in offline status.
584
585 @param instance: The instance object
586 @return: True if any of the secondary is offline, False otherwise
587
588 """
589 bootids = []
590 for node in instance.snodes:
591 bootids.append(self.bootids[node])
592
593 return compat.any(offline for (_, offline) in bootids)
594
596 """Run gnt-cluster verify-disks.
597
598 """
599 op = opcodes.OpClusterVerifyDisks()
600 job_id = client.SubmitJob([op])
601 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602 client.ArchiveJob(job_id)
603 if not isinstance(result, (tuple, list)):
604 logging.error("Can't get a valid result from verify-disks")
605 return
606 offline_disk_instances = result[1]
607 if not offline_disk_instances:
608
609 return
610 logging.debug("Will activate disks for instances %s",
611 utils.CommaJoin(offline_disk_instances))
612
613
614 job = []
615 for name in offline_disk_instances:
616 instance = self.instances[name]
617 if (instance.state in HELPLESS_STATES or
618 self._CheckForOfflineNodes(instance)):
619 logging.info("Skip instance %s because it is in helpless state or has"
620 " one offline secondary", name)
621 continue
622 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
623
624 if job:
625 job_id = cli.SendJob(job, cl=client)
626
627 try:
628 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
629 except Exception:
630 logging.exception("Error while activating disks")
631
634 """Opens the state file and acquires a lock on it.
635
636 @type path: string
637 @param path: Path to state file
638
639 """
640
641
642
643 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
644
645
646
647
648 try:
649 utils.LockFile(statefile_fd)
650 except errors.LockError, err:
651 logging.error("Can't acquire lock on state file %s: %s", path, err)
652 return None
653
654 return os.fdopen(statefile_fd, "w+")
655
658 """Connects to RAPI port and does a simple test.
659
660 Connects to RAPI port of hostname and does a simple test. At this time, the
661 test is GetVersion.
662
663 @type hostname: string
664 @param hostname: hostname of the node to connect to.
665 @rtype: bool
666 @return: Whether RAPI is working properly
667
668 """
669 curl_config = rapi.client.GenericCurlConfig()
670 rapi_client = rapi.client.GanetiRapiClient(hostname,
671 curl_config_fn=curl_config)
672 try:
673 master_version = rapi_client.GetVersion()
674 except rapi.client.CertificateError, err:
675 logging.warning("RAPI Error: CertificateError (%s)", err)
676 return False
677 except rapi.client.GanetiApiError, err:
678 logging.warning("RAPI Error: GanetiApiError (%s)", err)
679 return False
680 logging.debug("RAPI Result: master_version is %s", master_version)
681 return master_version == constants.RAPI_VERSION
682
685 """Parse the command line options.
686
687 @return: (options, args) as from OptionParser.parse_args()
688
689 """
690 parser = OptionParser(description="Ganeti cluster watcher",
691 usage="%prog [-d]",
692 version="%%prog (ganeti) %s" %
693 constants.RELEASE_VERSION)
694
695 parser.add_option(cli.DEBUG_OPT)
696 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
697 help="Autoarchive jobs older than this age (default"
698 " 6 hours)")
699 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
700 action="store_true", help="Ignore cluster pause setting")
701 options, args = parser.parse_args()
702 options.job_age = cli.ParseTimespec(options.job_age)
703
704 if args:
705 parser.error("No arguments expected")
706
707 return (options, args)
708
709
710 @rapi.client.UsesRapiClient
712 """Main function.
713
714 """
715 global client
716
717 (options, _) = ParseOptions()
718
719 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
720 debug=options.debug, stderr_logging=options.debug)
721
722 if ShouldPause() and not options.ignore_pause:
723 logging.debug("Pause has been set, exiting")
724 return constants.EXIT_SUCCESS
725
726 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
727 if not statefile:
728 return constants.EXIT_FAILURE
729
730 update_file = False
731 try:
732 StartNodeDaemons()
733 RunWatcherHooks()
734
735
736 if NodeMaintenance.ShouldRun():
737 NodeMaintenance().Exec()
738
739 notepad = WatcherState(statefile)
740 try:
741 try:
742 client = cli.GetClient()
743 except errors.OpPrereqError:
744
745 logging.debug("Not on master, exiting")
746 update_file = True
747 return constants.EXIT_SUCCESS
748 except luxi.NoMasterError, err:
749 logging.warning("Master seems to be down (%s), trying to restart",
750 str(err))
751 if not utils.EnsureDaemon(constants.MASTERD):
752 logging.critical("Can't start the master, exiting")
753 return constants.EXIT_FAILURE
754
755 client = cli.GetClient()
756
757
758 utils.EnsureDaemon(constants.RAPI)
759
760
761 logging.debug("Attempting to talk with RAPI.")
762 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
763 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
764 " Restarting Ganeti RAPI.")
765 utils.StopDaemon(constants.RAPI)
766 utils.EnsureDaemon(constants.RAPI)
767 logging.debug("Second attempt to talk with RAPI")
768 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
769 logging.fatal("RAPI is not responding. Please investigate.")
770 logging.debug("Successfully talked to RAPI.")
771
772 try:
773 watcher = Watcher(options, notepad)
774 except errors.ConfigurationError:
775
776 update_file = True
777 return constants.EXIT_SUCCESS
778
779 watcher.Run()
780 update_file = True
781
782 finally:
783 if update_file:
784 notepad.Save()
785 else:
786 logging.debug("Not updating status file due to failure")
787 except SystemExit:
788 raise
789 except NotMasterError:
790 logging.debug("Not master, exiting")
791 return constants.EXIT_NOTMASTER
792 except errors.ResolverError, err:
793 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
794 return constants.EXIT_NODESETUP_ERROR
795 except errors.JobQueueFull:
796 logging.error("Job queue is full, can't query cluster state")
797 except errors.JobQueueDrainError:
798 logging.error("Job queue is drained, can't maintain cluster state")
799 except Exception, err:
800 logging.exception(str(err))
801 return constants.EXIT_FAILURE
802
803 return constants.EXIT_SUCCESS
804