1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30
31
32
33
34 import os
35 import sys
36 import time
37 import logging
38 from optparse import OptionParser
39
40 from ganeti import utils
41 from ganeti import constants
42 from ganeti import serializer
43 from ganeti import errors
44 from ganeti import opcodes
45 from ganeti import cli
46 from ganeti import luxi
47 from ganeti import ssconf
48 from ganeti import bdev
49 from ganeti import hypervisor
50 from ganeti import rapi
51 from ganeti.confd import client as confd_client
52 from ganeti import netutils
53
54 import ganeti.rapi.client
55
56
57 MAXTRIES = 5
58
59
60
61
62 RETRY_EXPIRATION = 8 * 3600
63 BAD_STATES = ['ERROR_down']
64 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
65 NOTICE = 'NOTICE'
66 ERROR = 'ERROR'
67 KEY_RESTART_COUNT = "restart_count"
68 KEY_RESTART_WHEN = "restart_when"
69 KEY_BOOT_ID = "bootid"
70
71
72
73 client = None
77 """Exception raised when this host is not the master."""
78
85
95
98 """Run the watcher hooks.
99
100 """
101 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102 constants.HOOKS_NAME_WATCHER)
103 if not os.path.isdir(hooks_dir):
104 return
105
106 try:
107 results = utils.RunParts(hooks_dir)
108 except Exception, msg:
109 logging.critical("RunParts %s failed: %s", hooks_dir, msg)
110
111 for (relname, status, runresult) in results:
112 if status == constants.RUNPARTS_SKIP:
113 logging.debug("Watcher hook %s: skipped", relname)
114 elif status == constants.RUNPARTS_ERR:
115 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116 elif status == constants.RUNPARTS_RUN:
117 if runresult.failed:
118 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119 relname, runresult.exit_code, runresult.output)
120 else:
121 logging.debug("Watcher hook %s: success (output: %s)", relname,
122 runresult.output)
123
124
125 -class NodeMaintenance(object):
126 """Talks to confd daemons and possible shutdown instances/drbd devices.
127
128 """
129 - def __init__(self):
130 self.store_cb = confd_client.StoreResultCallback()
131 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
133
134 @staticmethod
136 """Checks whether node maintenance should run.
137
138 """
139 try:
140 return ssconf.SimpleStore().GetMaintainNodeHealth()
141 except errors.ConfigurationError, err:
142 logging.error("Configuration error, not activating node maintenance: %s",
143 err)
144 return False
145
146 @staticmethod
148 """Compute list of hypervisor/running instances.
149
150 """
151 hyp_list = ssconf.SimpleStore().GetHypervisorList()
152 results = []
153 for hv_name in hyp_list:
154 try:
155 hv = hypervisor.GetHypervisor(hv_name)
156 ilist = hv.ListInstances()
157 results.extend([(iname, hv_name) for iname in ilist])
158 except:
159 logging.error("Error while listing instances for hypervisor %s",
160 hv_name, exc_info=True)
161 return results
162
163 @staticmethod
165 """Get list of used DRBD minors.
166
167 """
168 return bdev.DRBD8.GetUsedDevs().keys()
169
170 @classmethod
171 - def DoMaintenance(cls, role):
172 """Maintain the instance list.
173
174 """
175 if role == constants.CONFD_NODE_ROLE_OFFLINE:
176 inst_running = cls.GetRunningInstances()
177 cls.ShutdownInstances(inst_running)
178 drbd_running = cls.GetUsedDRBDs()
179 cls.ShutdownDRBD(drbd_running)
180 else:
181 logging.debug("Not doing anything for role %s", role)
182
183 @staticmethod
184 - def ShutdownInstances(inst_running):
185 """Shutdown running instances.
186
187 """
188 names_running = set([i[0] for i in inst_running])
189 if names_running:
190 logging.info("Following instances should not be running,"
191 " shutting them down: %s", utils.CommaJoin(names_running))
192
193
194 i2h = dict(inst_running)
195 for name in names_running:
196 hv_name = i2h[name]
197 hv = hypervisor.GetHypervisor(hv_name)
198 hv.StopInstance(None, force=True, name=name)
199
200 @staticmethod
201 - def ShutdownDRBD(drbd_running):
202 """Shutdown active DRBD devices.
203
204 """
205 if drbd_running:
206 logging.info("Following DRBD minors should not be active,"
207 " shutting them down: %s", utils.CommaJoin(drbd_running))
208 for minor in drbd_running:
209
210
211
212 bdev.DRBD8._ShutdownAll(minor)
213
215 """Check node status versus cluster desired state.
216
217 """
218 my_name = netutils.Hostname.GetSysName()
219 req = confd_client.ConfdClientRequest(type=
220 constants.CONFD_REQ_NODE_ROLE_BYNAME,
221 query=my_name)
222 self.confd_client.SendRequest(req, async=False, coverage=-1)
223 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
224 if not timed_out:
225
226 status, result = self.store_cb.GetResponse(req.rsalt)
227 assert status, "Missing result but received replies"
228 if not self.filter_cb.consistent[req.rsalt]:
229 logging.warning("Inconsistent replies, not doing anything")
230 return
231 self.DoMaintenance(result.server_reply.answer)
232 else:
233 logging.warning("Confd query timed out, cannot do maintenance actions")
234
237 """Interface to a state file recording restart attempts.
238
239 """
241 """Open, lock, read and parse the file.
242
243 @type statefile: file
244 @param statefile: State file object
245
246 """
247 self.statefile = statefile
248
249 try:
250 state_data = self.statefile.read()
251 if not state_data:
252 self._data = {}
253 else:
254 self._data = serializer.Load(state_data)
255 except Exception, msg:
256
257 self._data = {}
258 logging.warning(("Invalid state file. Using defaults."
259 " Error message: %s"), msg)
260
261 if "instance" not in self._data:
262 self._data["instance"] = {}
263 if "node" not in self._data:
264 self._data["node"] = {}
265
266 self._orig_data = serializer.Dump(self._data)
267
269 """Save state to file, then unlock and close it.
270
271 """
272 assert self.statefile
273
274 serialized_form = serializer.Dump(self._data)
275 if self._orig_data == serialized_form:
276 logging.debug("Data didn't change, just touching status file")
277 os.utime(constants.WATCHER_STATEFILE, None)
278 return
279
280
281
282 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283 data=serialized_form,
284 prewrite=utils.LockFile, close=False)
285 self.statefile = os.fdopen(fd, 'w+')
286
288 """Unlock configuration file and close it.
289
290 """
291 assert self.statefile
292
293
294 self.statefile.close()
295 self.statefile = None
296
298 """Returns the last boot ID of a node or None.
299
300 """
301 ndata = self._data["node"]
302
303 if name in ndata and KEY_BOOT_ID in ndata[name]:
304 return ndata[name][KEY_BOOT_ID]
305 return None
306
308 """Sets the boot ID of a node.
309
310 """
311 assert bootid
312
313 ndata = self._data["node"]
314
315 if name not in ndata:
316 ndata[name] = {}
317
318 ndata[name][KEY_BOOT_ID] = bootid
319
321 """Returns number of previous restart attempts.
322
323 @type instance: L{Instance}
324 @param instance: the instance to look up
325
326 """
327 idata = self._data["instance"]
328
329 if instance.name in idata:
330 return idata[instance.name][KEY_RESTART_COUNT]
331
332 return 0
333
334 - def MaintainInstanceList(self, instances):
335 """Perform maintenance on the recorded instances.
336
337 @type instances: list of string
338 @param instances: the list of currently existing instances
339
340 """
341 idict = self._data["instance"]
342
343 obsolete_instances = set(idict).difference(instances)
344 for inst in obsolete_instances:
345 logging.debug("Forgetting obsolete instance %s", inst)
346 del idict[inst]
347
348
349 earliest = time.time() - RETRY_EXPIRATION
350 expired_instances = [i for i in idict
351 if idict[i][KEY_RESTART_WHEN] < earliest]
352 for inst in expired_instances:
353 logging.debug("Expiring record for instance %s", inst)
354 del idict[inst]
355
357 """Record a restart attempt.
358
359 @type instance: L{Instance}
360 @param instance: the instance being restarted
361
362 """
363 idata = self._data["instance"]
364
365 if instance.name not in idata:
366 inst = idata[instance.name] = {}
367 else:
368 inst = idata[instance.name]
369
370 inst[KEY_RESTART_WHEN] = time.time()
371 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
372
374 """Update state to reflect that a machine is running.
375
376 This method removes the record for a named instance (as we only
377 track down instances).
378
379 @type instance: L{Instance}
380 @param instance: the instance to remove from books
381
382 """
383 idata = self._data["instance"]
384
385 if instance.name in idata:
386 del idata[instance.name]
387
390 """Abstraction for a Virtual Machine instance.
391
392 """
393 - def __init__(self, name, state, autostart):
394 self.name = name
395 self.state = state
396 self.autostart = autostart
397
404
411
414 """Get a list of instances on this cluster.
415
416 """
417 op1_fields = ["name", "status", "admin_state", "snodes"]
418 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
419 use_locking=True)
420 op2_fields = ["name", "bootid", "offline"]
421 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
422 use_locking=True)
423
424 job_id = client.SubmitJob([op1, op2])
425
426 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427
428 logging.debug("Got data from cluster, writing instance status file")
429
430 result = all_results[0]
431 smap = {}
432
433 instances = {}
434
435
436 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
438
439 for fields in result:
440 (name, status, autostart, snodes) = fields
441
442
443 for node in snodes:
444 if node not in smap:
445 smap[node] = []
446 smap[node].append(name)
447
448 instances[name] = Instance(name, status, autostart)
449
450 nodes = dict([(name, (bootid, offline))
451 for name, bootid, offline in all_results[1]])
452
453 client.ArchiveJob(job_id)
454
455 return instances, nodes, smap
456
459 """Encapsulate the logic for restarting erroneously halted virtual machines.
460
461 The calling program should periodically instantiate me and call Run().
462 This will traverse the list of instances, and make up to MAXTRIES attempts
463 to restart machines that are down.
464
465 """
477
486
487 @staticmethod
489 """Archive old jobs.
490
491 """
492 arch_count, left_count = client.AutoArchiveJobs(age)
493 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
494
496 """Check all nodes for restarted ones.
497
498 """
499 check_nodes = []
500 for name, (new_id, offline) in self.bootids.iteritems():
501 old = notepad.GetNodeBootID(name)
502 if new_id is None:
503
504 if not offline:
505 logging.debug("Node %s missing boot id, skipping secondary checks",
506 name)
507 continue
508 if old != new_id:
509
510 check_nodes.append(name)
511
512 if check_nodes:
513
514
515 for node in check_nodes:
516 if node not in self.smap:
517 continue
518 for instance_name in self.smap[node]:
519 instance = self.instances[instance_name]
520 if not instance.autostart:
521 logging.info(("Skipping disk activation for non-autostart"
522 " instance %s"), instance.name)
523 continue
524 if instance.name in self.started_instances:
525
526
527 continue
528 try:
529 logging.info("Activating disks for instance %s", instance.name)
530 instance.ActivateDisks()
531 except Exception:
532 logging.exception("Error while activating disks for instance %s",
533 instance.name)
534
535
536 for name in check_nodes:
537 notepad.SetNodeBootID(name, self.bootids[name][0])
538
540 """Make a pass over the list of instances, restarting downed ones.
541
542 """
543 notepad.MaintainInstanceList(self.instances.keys())
544
545 for instance in self.instances.values():
546 if instance.state in BAD_STATES:
547 n = notepad.NumberOfRestartAttempts(instance)
548
549 if n > MAXTRIES:
550 logging.warning("Not restarting instance %s, retries exhausted",
551 instance.name)
552 continue
553 elif n < MAXTRIES:
554 last = " (Attempt #%d)" % (n + 1)
555 else:
556 notepad.RecordRestartAttempt(instance)
557 logging.error("Could not restart %s after %d attempts, giving up",
558 instance.name, MAXTRIES)
559 continue
560 try:
561 logging.info("Restarting %s%s",
562 instance.name, last)
563 instance.Restart()
564 self.started_instances.add(instance.name)
565 except Exception:
566 logging.exception("Error while restarting instance %s",
567 instance.name)
568
569 notepad.RecordRestartAttempt(instance)
570 elif instance.state in HELPLESS_STATES:
571 if notepad.NumberOfRestartAttempts(instance):
572 notepad.RemoveInstance(instance)
573 else:
574 if notepad.NumberOfRestartAttempts(instance):
575 notepad.RemoveInstance(instance)
576 logging.info("Restart of %s succeeded", instance.name)
577
578 @staticmethod
580 """Run gnt-cluster verify-disks.
581
582 """
583 op = opcodes.OpVerifyDisks()
584 job_id = client.SubmitJob([op])
585 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586 client.ArchiveJob(job_id)
587 if not isinstance(result, (tuple, list)):
588 logging.error("Can't get a valid result from verify-disks")
589 return
590 offline_disk_instances = result[2]
591 if not offline_disk_instances:
592
593 return
594 logging.debug("Will activate disks for instances %s",
595 utils.CommaJoin(offline_disk_instances))
596
597
598 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
599 for name in offline_disk_instances]
600 job_id = cli.SendJob(job, cl=client)
601
602 try:
603 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604 except Exception:
605 logging.exception("Error while activating disks")
606
609 """Opens the state file and acquires a lock on it.
610
611 @type path: string
612 @param path: Path to state file
613
614 """
615
616
617
618 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
619
620
621
622
623 try:
624 utils.LockFile(statefile_fd)
625 except errors.LockError, err:
626 logging.error("Can't acquire lock on state file %s: %s", path, err)
627 return None
628
629 return os.fdopen(statefile_fd, "w+")
630
633 """Connects to RAPI port and does a simple test.
634
635 Connects to RAPI port of hostname and does a simple test. At this time, the
636 test is GetVersion.
637
638 @type hostname: string
639 @param hostname: hostname of the node to connect to.
640 @rtype: bool
641 @return: Whether RAPI is working properly
642
643 """
644 curl_config = rapi.client.GenericCurlConfig()
645 rapi_client = rapi.client.GanetiRapiClient(hostname,
646 curl_config_fn=curl_config)
647 try:
648 master_version = rapi_client.GetVersion()
649 except rapi.client.CertificateError, err:
650 logging.warning("RAPI Error: CertificateError (%s)", err)
651 return False
652 except rapi.client.GanetiApiError, err:
653 logging.warning("RAPI Error: GanetiApiError (%s)", err)
654 return False
655 logging.debug("RAPI Result: master_version is %s", master_version)
656 return master_version == constants.RAPI_VERSION
657
660 """Parse the command line options.
661
662 @return: (options, args) as from OptionParser.parse_args()
663
664 """
665 parser = OptionParser(description="Ganeti cluster watcher",
666 usage="%prog [-d]",
667 version="%%prog (ganeti) %s" %
668 constants.RELEASE_VERSION)
669
670 parser.add_option(cli.DEBUG_OPT)
671 parser.add_option("-A", "--job-age", dest="job_age",
672 help="Autoarchive jobs older than this age (default"
673 " 6 hours)", default=6*3600)
674 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
675 action="store_true", help="Ignore cluster pause setting")
676 options, args = parser.parse_args()
677 options.job_age = cli.ParseTimespec(options.job_age)
678 return options, args
679
680
681 @rapi.client.UsesRapiClient
683 """Main function.
684
685 """
686 global client
687
688 options, args = ParseOptions()
689
690 if args:
691 print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
692 return constants.EXIT_FAILURE
693
694 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
695 stderr_logging=options.debug)
696
697 if ShouldPause() and not options.ignore_pause:
698 logging.debug("Pause has been set, exiting")
699 return constants.EXIT_SUCCESS
700
701 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
702 if not statefile:
703 return constants.EXIT_FAILURE
704
705 update_file = False
706 try:
707 StartNodeDaemons()
708 RunWatcherHooks()
709
710
711 if NodeMaintenance.ShouldRun():
712 NodeMaintenance().Exec()
713
714 notepad = WatcherState(statefile)
715 try:
716 try:
717 client = cli.GetClient()
718 except errors.OpPrereqError:
719
720 logging.debug("Not on master, exiting")
721 update_file = True
722 return constants.EXIT_SUCCESS
723 except luxi.NoMasterError, err:
724 logging.warning("Master seems to be down (%s), trying to restart",
725 str(err))
726 if not utils.EnsureDaemon(constants.MASTERD):
727 logging.critical("Can't start the master, exiting")
728 return constants.EXIT_FAILURE
729
730 client = cli.GetClient()
731
732
733 utils.EnsureDaemon(constants.RAPI)
734
735
736 logging.debug("Attempting to talk with RAPI.")
737 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
738 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
739 " Restarting Ganeti RAPI.")
740 utils.StopDaemon(constants.RAPI)
741 utils.EnsureDaemon(constants.RAPI)
742 logging.debug("Second attempt to talk with RAPI")
743 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
744 logging.fatal("RAPI is not responding. Please investigate.")
745 logging.debug("Successfully talked to RAPI.")
746
747 try:
748 watcher = Watcher(options, notepad)
749 except errors.ConfigurationError:
750
751 update_file = True
752 return constants.EXIT_SUCCESS
753
754 watcher.Run()
755 update_file = True
756
757 finally:
758 if update_file:
759 notepad.Save()
760 else:
761 logging.debug("Not updating status file due to failure")
762 except SystemExit:
763 raise
764 except NotMasterError:
765 logging.debug("Not master, exiting")
766 return constants.EXIT_NOTMASTER
767 except errors.ResolverError, err:
768 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
769 return constants.EXIT_NODESETUP_ERROR
770 except errors.JobQueueFull:
771 logging.error("Job queue is full, can't query cluster state")
772 except errors.JobQueueDrainError:
773 logging.error("Job queue is drained, can't maintain cluster state")
774 except Exception, err:
775 logging.exception(str(err))
776 return constants.EXIT_FAILURE
777
778 return constants.EXIT_SUCCESS
779