Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  # pylint: disable-msg=C0103,W0142 
 31   
 32  # C0103: Invalid name ganeti-watcher 
 33   
 34  import os 
 35  import os.path 
 36  import sys 
 37  import time 
 38  import logging 
 39  from optparse import OptionParser 
 40   
 41  from ganeti import utils 
 42  from ganeti import constants 
 43  from ganeti import compat 
 44  from ganeti import serializer 
 45  from ganeti import errors 
 46  from ganeti import opcodes 
 47  from ganeti import cli 
 48  from ganeti import luxi 
 49  from ganeti import ssconf 
 50  from ganeti import bdev 
 51  from ganeti import hypervisor 
 52  from ganeti import rapi 
 53  from ganeti.confd import client as confd_client 
 54  from ganeti import netutils 
 55   
 56  import ganeti.rapi.client # pylint: disable-msg=W0611 
 57   
 58   
 59  MAXTRIES = 5 
 60  # Delete any record that is older than 8 hours; this value is based on 
 61  # the fact that the current retry counter is 5, and watcher runs every 
 62  # 5 minutes, so it takes around half an hour to exceed the retry 
 63  # counter, so 8 hours (16*1/2h) seems like a reasonable reset time 
 64  RETRY_EXPIRATION = 8 * 3600 
 65  BAD_STATES = ['ERROR_down'] 
 66  HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline'] 
 67  NOTICE = 'NOTICE' 
 68  ERROR = 'ERROR' 
 69  KEY_RESTART_COUNT = "restart_count" 
 70  KEY_RESTART_WHEN = "restart_when" 
 71  KEY_BOOT_ID = "bootid" 
 72   
 73   
 74  # Global LUXI client object 
 75  client = None 
76 77 78 -class NotMasterError(errors.GenericError):
79 """Exception raised when this host is not the master."""
80
81 82 -def ShouldPause():
83 """Check whether we should pause. 84 85 """ 86 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87
88 89 -def StartNodeDaemons():
90 """Start all the daemons that should be running on all nodes. 91 92 """ 93 # on master or not, try to start the node daemon 94 utils.EnsureDaemon(constants.NODED) 95 # start confd as well. On non candidates it will be in disabled mode. 96 utils.EnsureDaemon(constants.CONFD)
97
98 99 -def RunWatcherHooks():
100 """Run the watcher hooks. 101 102 """ 103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR, 104 constants.HOOKS_NAME_WATCHER) 105 if not os.path.isdir(hooks_dir): 106 return 107 108 try: 109 results = utils.RunParts(hooks_dir) 110 except Exception: # pylint: disable-msg=W0703 111 logging.exception("RunParts %s failed: %s", hooks_dir) 112 return 113 114 for (relname, status, runresult) in results: 115 if status == constants.RUNPARTS_SKIP: 116 logging.debug("Watcher hook %s: skipped", relname) 117 elif status == constants.RUNPARTS_ERR: 118 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 119 elif status == constants.RUNPARTS_RUN: 120 if runresult.failed: 121 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 122 relname, runresult.exit_code, runresult.output) 123 else: 124 logging.debug("Watcher hook %s: success (output: %s)", relname, 125 runresult.output)
126
127 128 -class NodeMaintenance(object):
129 """Talks to confd daemons and possible shutdown instances/drbd devices. 130 131 """
132 - def __init__(self):
133 self.store_cb = confd_client.StoreResultCallback() 134 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb) 135 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
136 137 @staticmethod
138 - def ShouldRun():
139 """Checks whether node maintenance should run. 140 141 """ 142 try: 143 return ssconf.SimpleStore().GetMaintainNodeHealth() 144 except errors.ConfigurationError, err: 145 logging.error("Configuration error, not activating node maintenance: %s", 146 err) 147 return False
148 149 @staticmethod
151 """Compute list of hypervisor/running instances. 152 153 """ 154 hyp_list = ssconf.SimpleStore().GetHypervisorList() 155 results = [] 156 for hv_name in hyp_list: 157 try: 158 hv = hypervisor.GetHypervisor(hv_name) 159 ilist = hv.ListInstances() 160 results.extend([(iname, hv_name) for iname in ilist]) 161 except: # pylint: disable-msg=W0702 162 logging.error("Error while listing instances for hypervisor %s", 163 hv_name, exc_info=True) 164 return results
165 166 @staticmethod
167 - def GetUsedDRBDs():
168 """Get list of used DRBD minors. 169 170 """ 171 return bdev.DRBD8.GetUsedDevs().keys()
172 173 @classmethod
174 - def DoMaintenance(cls, role):
175 """Maintain the instance list. 176 177 """ 178 if role == constants.CONFD_NODE_ROLE_OFFLINE: 179 inst_running = cls.GetRunningInstances() 180 cls.ShutdownInstances(inst_running) 181 drbd_running = cls.GetUsedDRBDs() 182 cls.ShutdownDRBD(drbd_running) 183 else: 184 logging.debug("Not doing anything for role %s", role)
185 186 @staticmethod
187 - def ShutdownInstances(inst_running):
188 """Shutdown running instances. 189 190 """ 191 names_running = set([i[0] for i in inst_running]) 192 if names_running: 193 logging.info("Following instances should not be running," 194 " shutting them down: %s", utils.CommaJoin(names_running)) 195 # this dictionary will collapse duplicate instance names (only 196 # xen pvm/vhm) into a single key, which is fine 197 i2h = dict(inst_running) 198 for name in names_running: 199 hv_name = i2h[name] 200 hv = hypervisor.GetHypervisor(hv_name) 201 hv.StopInstance(None, force=True, name=name)
202 203 @staticmethod
204 - def ShutdownDRBD(drbd_running):
205 """Shutdown active DRBD devices. 206 207 """ 208 if drbd_running: 209 logging.info("Following DRBD minors should not be active," 210 " shutting them down: %s", utils.CommaJoin(drbd_running)) 211 for minor in drbd_running: 212 # pylint: disable-msg=W0212 213 # using the private method as is, pending enhancements to the DRBD 214 # interface 215 bdev.DRBD8._ShutdownAll(minor)
216
217 - def Exec(self):
218 """Check node status versus cluster desired state. 219 220 """ 221 my_name = netutils.Hostname.GetSysName() 222 req = confd_client.ConfdClientRequest(type= 223 constants.CONFD_REQ_NODE_ROLE_BYNAME, 224 query=my_name) 225 self.confd_client.SendRequest(req, async=False, coverage=-1) 226 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt) 227 if not timed_out: 228 # should have a valid response 229 status, result = self.store_cb.GetResponse(req.rsalt) 230 assert status, "Missing result but received replies" 231 if not self.filter_cb.consistent[req.rsalt]: 232 logging.warning("Inconsistent replies, not doing anything") 233 return 234 self.DoMaintenance(result.server_reply.answer) 235 else: 236 logging.warning("Confd query timed out, cannot do maintenance actions")
237
238 239 -class WatcherState(object):
240 """Interface to a state file recording restart attempts. 241 242 """
243 - def __init__(self, statefile):
244 """Open, lock, read and parse the file. 245 246 @type statefile: file 247 @param statefile: State file object 248 249 """ 250 self.statefile = statefile 251 252 try: 253 state_data = self.statefile.read() 254 if not state_data: 255 self._data = {} 256 else: 257 self._data = serializer.Load(state_data) 258 except Exception, msg: # pylint: disable-msg=W0703 259 # Ignore errors while loading the file and treat it as empty 260 self._data = {} 261 logging.warning(("Invalid state file. Using defaults." 262 " Error message: %s"), msg) 263 264 if "instance" not in self._data: 265 self._data["instance"] = {} 266 if "node" not in self._data: 267 self._data["node"] = {} 268 269 self._orig_data = serializer.Dump(self._data)
270
271 - def Save(self):
272 """Save state to file, then unlock and close it. 273 274 """ 275 assert self.statefile 276 277 serialized_form = serializer.Dump(self._data) 278 if self._orig_data == serialized_form: 279 logging.debug("Data didn't change, just touching status file") 280 os.utime(constants.WATCHER_STATEFILE, None) 281 return 282 283 # We need to make sure the file is locked before renaming it, otherwise 284 # starting ganeti-watcher again at the same time will create a conflict. 285 fd = utils.WriteFile(constants.WATCHER_STATEFILE, 286 data=serialized_form, 287 prewrite=utils.LockFile, close=False) 288 self.statefile = os.fdopen(fd, 'w+')
289
290 - def Close(self):
291 """Unlock configuration file and close it. 292 293 """ 294 assert self.statefile 295 296 # Files are automatically unlocked when closing them 297 self.statefile.close() 298 self.statefile = None
299
300 - def GetNodeBootID(self, name):
301 """Returns the last boot ID of a node or None. 302 303 """ 304 ndata = self._data["node"] 305 306 if name in ndata and KEY_BOOT_ID in ndata[name]: 307 return ndata[name][KEY_BOOT_ID] 308 return None
309
310 - def SetNodeBootID(self, name, bootid):
311 """Sets the boot ID of a node. 312 313 """ 314 assert bootid 315 316 ndata = self._data["node"] 317 318 if name not in ndata: 319 ndata[name] = {} 320 321 ndata[name][KEY_BOOT_ID] = bootid
322
323 - def NumberOfRestartAttempts(self, instance):
324 """Returns number of previous restart attempts. 325 326 @type instance: L{Instance} 327 @param instance: the instance to look up 328 329 """ 330 idata = self._data["instance"] 331 332 if instance.name in idata: 333 return idata[instance.name][KEY_RESTART_COUNT] 334 335 return 0
336
337 - def MaintainInstanceList(self, instances):
338 """Perform maintenance on the recorded instances. 339 340 @type instances: list of string 341 @param instances: the list of currently existing instances 342 343 """ 344 idict = self._data["instance"] 345 # First, delete obsolete instances 346 obsolete_instances = set(idict).difference(instances) 347 for inst in obsolete_instances: 348 logging.debug("Forgetting obsolete instance %s", inst) 349 del idict[inst] 350 351 # Second, delete expired records 352 earliest = time.time() - RETRY_EXPIRATION 353 expired_instances = [i for i in idict 354 if idict[i][KEY_RESTART_WHEN] < earliest] 355 for inst in expired_instances: 356 logging.debug("Expiring record for instance %s", inst) 357 del idict[inst]
358
359 - def RecordRestartAttempt(self, instance):
360 """Record a restart attempt. 361 362 @type instance: L{Instance} 363 @param instance: the instance being restarted 364 365 """ 366 idata = self._data["instance"] 367 368 if instance.name not in idata: 369 inst = idata[instance.name] = {} 370 else: 371 inst = idata[instance.name] 372 373 inst[KEY_RESTART_WHEN] = time.time() 374 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
375
376 - def RemoveInstance(self, instance):
377 """Update state to reflect that a machine is running. 378 379 This method removes the record for a named instance (as we only 380 track down instances). 381 382 @type instance: L{Instance} 383 @param instance: the instance to remove from books 384 385 """ 386 idata = self._data["instance"] 387 388 if instance.name in idata: 389 del idata[instance.name]
390
391 392 -class Instance(object):
393 """Abstraction for a Virtual Machine instance. 394 395 """
396 - def __init__(self, name, state, autostart, snodes):
397 self.name = name 398 self.state = state 399 self.autostart = autostart 400 self.snodes = snodes
401
402 - def Restart(self):
403 """Encapsulates the start of an instance. 404 405 """ 406 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 407 cli.SubmitOpCode(op, cl=client)
408
409 - def ActivateDisks(self):
410 """Encapsulates the activation of all disks of an instance. 411 412 """ 413 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 414 cli.SubmitOpCode(op, cl=client)
415
416 417 -def GetClusterData():
418 """Get a list of instances on this cluster. 419 420 """ 421 op1_fields = ["name", "status", "admin_state", "snodes"] 422 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[], 423 use_locking=True) 424 op2_fields = ["name", "bootid", "offline"] 425 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[], 426 use_locking=True) 427 428 job_id = client.SubmitJob([op1, op2]) 429 430 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) 431 432 logging.debug("Got data from cluster, writing instance status file") 433 434 result = all_results[0] 435 smap = {} 436 437 instances = {} 438 439 # write the upfile 440 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result]) 441 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data) 442 443 for fields in result: 444 (name, status, autostart, snodes) = fields 445 446 # update the secondary node map 447 for node in snodes: 448 if node not in smap: 449 smap[node] = [] 450 smap[node].append(name) 451 452 instances[name] = Instance(name, status, autostart, snodes) 453 454 nodes = dict([(name, (bootid, offline)) 455 for name, bootid, offline in all_results[1]]) 456 457 client.ArchiveJob(job_id) 458 459 return instances, nodes, smap
460
461 462 -class Watcher(object):
463 """Encapsulate the logic for restarting erroneously halted virtual machines. 464 465 The calling program should periodically instantiate me and call Run(). 466 This will traverse the list of instances, and make up to MAXTRIES attempts 467 to restart machines that are down. 468 469 """
470 - def __init__(self, opts, notepad):
471 self.notepad = notepad 472 master = client.QueryConfigValues(["master_node"])[0] 473 if master != netutils.Hostname.GetSysName(): 474 raise NotMasterError("This is not the master node") 475 # first archive old jobs 476 self.ArchiveJobs(opts.job_age) 477 # and only then submit new ones 478 self.instances, self.bootids, self.smap = GetClusterData() 479 self.started_instances = set() 480 self.opts = opts
481
482 - def Run(self):
483 """Watcher run sequence. 484 485 """ 486 notepad = self.notepad 487 self.CheckInstances(notepad) 488 self.CheckDisks(notepad) 489 self.VerifyDisks()
490 491 @staticmethod
492 - def ArchiveJobs(age):
493 """Archive old jobs. 494 495 """ 496 arch_count, left_count = client.AutoArchiveJobs(age) 497 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
498
499 - def CheckDisks(self, notepad):
500 """Check all nodes for restarted ones. 501 502 """ 503 check_nodes = [] 504 for name, (new_id, offline) in self.bootids.iteritems(): 505 old = notepad.GetNodeBootID(name) 506 if new_id is None: 507 # Bad node, not returning a boot id 508 if not offline: 509 logging.debug("Node %s missing boot id, skipping secondary checks", 510 name) 511 continue 512 if old != new_id: 513 # Node's boot ID has changed, proably through a reboot. 514 check_nodes.append(name) 515 516 if check_nodes: 517 # Activate disks for all instances with any of the checked nodes as a 518 # secondary node. 519 for node in check_nodes: 520 if node not in self.smap: 521 continue 522 for instance_name in self.smap[node]: 523 instance = self.instances[instance_name] 524 if not instance.autostart: 525 logging.info(("Skipping disk activation for non-autostart" 526 " instance %s"), instance.name) 527 continue 528 if instance.name in self.started_instances: 529 # we already tried to start the instance, which should have 530 # activated its drives (if they can be at all) 531 continue 532 try: 533 logging.info("Activating disks for instance %s", instance.name) 534 instance.ActivateDisks() 535 except Exception: # pylint: disable-msg=W0703 536 logging.exception("Error while activating disks for instance %s", 537 instance.name) 538 539 # Keep changed boot IDs 540 for name in check_nodes: 541 notepad.SetNodeBootID(name, self.bootids[name][0])
542
543 - def CheckInstances(self, notepad):
544 """Make a pass over the list of instances, restarting downed ones. 545 546 """ 547 notepad.MaintainInstanceList(self.instances.keys()) 548 549 for instance in self.instances.values(): 550 if instance.state in BAD_STATES: 551 n = notepad.NumberOfRestartAttempts(instance) 552 553 if n > MAXTRIES: 554 logging.warning("Not restarting instance %s, retries exhausted", 555 instance.name) 556 continue 557 elif n < MAXTRIES: 558 last = " (Attempt #%d)" % (n + 1) 559 else: 560 notepad.RecordRestartAttempt(instance) 561 logging.error("Could not restart %s after %d attempts, giving up", 562 instance.name, MAXTRIES) 563 continue 564 try: 565 logging.info("Restarting %s%s", 566 instance.name, last) 567 instance.Restart() 568 self.started_instances.add(instance.name) 569 except Exception: # pylint: disable-msg=W0703 570 logging.exception("Error while restarting instance %s", 571 instance.name) 572 573 notepad.RecordRestartAttempt(instance) 574 elif instance.state in HELPLESS_STATES: 575 if notepad.NumberOfRestartAttempts(instance): 576 notepad.RemoveInstance(instance) 577 else: 578 if notepad.NumberOfRestartAttempts(instance): 579 notepad.RemoveInstance(instance) 580 logging.info("Restart of %s succeeded", instance.name)
581
582 - def _CheckForOfflineNodes(self, instance):
583 """Checks if given instances has any secondary in offline status. 584 585 @param instance: The instance object 586 @return: True if any of the secondary is offline, False otherwise 587 588 """ 589 bootids = [] 590 for node in instance.snodes: 591 bootids.append(self.bootids[node]) 592 593 return compat.any(offline for (_, offline) in bootids)
594
595 - def VerifyDisks(self):
596 """Run gnt-cluster verify-disks. 597 598 """ 599 op = opcodes.OpClusterVerifyDisks() 600 job_id = client.SubmitJob([op]) 601 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] 602 client.ArchiveJob(job_id) 603 if not isinstance(result, (tuple, list)): 604 logging.error("Can't get a valid result from verify-disks") 605 return 606 offline_disk_instances = result[1] 607 if not offline_disk_instances: 608 # nothing to do 609 return 610 logging.debug("Will activate disks for instances %s", 611 utils.CommaJoin(offline_disk_instances)) 612 # we submit only one job, and wait for it. not optimal, but spams 613 # less the job queue 614 job = [] 615 for name in offline_disk_instances: 616 instance = self.instances[name] 617 if (instance.state in HELPLESS_STATES or 618 self._CheckForOfflineNodes(instance)): 619 logging.info("Skip instance %s because it is in helpless state or has" 620 " one offline secondary", name) 621 continue 622 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 623 624 if job: 625 job_id = cli.SendJob(job, cl=client) 626 627 try: 628 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) 629 except Exception: # pylint: disable-msg=W0703 630 logging.exception("Error while activating disks")
631
632 633 -def OpenStateFile(path):
634 """Opens the state file and acquires a lock on it. 635 636 @type path: string 637 @param path: Path to state file 638 639 """ 640 # The two-step dance below is necessary to allow both opening existing 641 # file read/write and creating if not existing. Vanilla open will truncate 642 # an existing file -or- allow creating if not existing. 643 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT) 644 645 # Try to acquire lock on state file. If this fails, another watcher instance 646 # might already be running or another program is temporarily blocking the 647 # watcher from running. 648 try: 649 utils.LockFile(statefile_fd) 650 except errors.LockError, err: 651 logging.error("Can't acquire lock on state file %s: %s", path, err) 652 return None 653 654 return os.fdopen(statefile_fd, "w+")
655
656 657 -def IsRapiResponding(hostname):
658 """Connects to RAPI port and does a simple test. 659 660 Connects to RAPI port of hostname and does a simple test. At this time, the 661 test is GetVersion. 662 663 @type hostname: string 664 @param hostname: hostname of the node to connect to. 665 @rtype: bool 666 @return: Whether RAPI is working properly 667 668 """ 669 curl_config = rapi.client.GenericCurlConfig() 670 rapi_client = rapi.client.GanetiRapiClient(hostname, 671 curl_config_fn=curl_config) 672 try: 673 master_version = rapi_client.GetVersion() 674 except rapi.client.CertificateError, err: 675 logging.warning("RAPI Error: CertificateError (%s)", err) 676 return False 677 except rapi.client.GanetiApiError, err: 678 logging.warning("RAPI Error: GanetiApiError (%s)", err) 679 return False 680 logging.debug("RAPI Result: master_version is %s", master_version) 681 return master_version == constants.RAPI_VERSION
682
683 684 -def ParseOptions():
685 """Parse the command line options. 686 687 @return: (options, args) as from OptionParser.parse_args() 688 689 """ 690 parser = OptionParser(description="Ganeti cluster watcher", 691 usage="%prog [-d]", 692 version="%%prog (ganeti) %s" % 693 constants.RELEASE_VERSION) 694 695 parser.add_option(cli.DEBUG_OPT) 696 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 697 help="Autoarchive jobs older than this age (default" 698 " 6 hours)") 699 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 700 action="store_true", help="Ignore cluster pause setting") 701 options, args = parser.parse_args() 702 options.job_age = cli.ParseTimespec(options.job_age) 703 704 if args: 705 parser.error("No arguments expected") 706 707 return (options, args)
708 709 710 @rapi.client.UsesRapiClient
711 -def Main():
712 """Main function. 713 714 """ 715 global client # pylint: disable-msg=W0603 716 717 (options, _) = ParseOptions() 718 719 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0], 720 debug=options.debug, stderr_logging=options.debug) 721 722 if ShouldPause() and not options.ignore_pause: 723 logging.debug("Pause has been set, exiting") 724 return constants.EXIT_SUCCESS 725 726 statefile = OpenStateFile(constants.WATCHER_STATEFILE) 727 if not statefile: 728 return constants.EXIT_FAILURE 729 730 update_file = False 731 try: 732 StartNodeDaemons() 733 RunWatcherHooks() 734 # run node maintenance in all cases, even if master, so that old 735 # masters can be properly cleaned up too 736 if NodeMaintenance.ShouldRun(): 737 NodeMaintenance().Exec() 738 739 notepad = WatcherState(statefile) 740 try: 741 try: 742 client = cli.GetClient() 743 except errors.OpPrereqError: 744 # this is, from cli.GetClient, a not-master case 745 logging.debug("Not on master, exiting") 746 update_file = True 747 return constants.EXIT_SUCCESS 748 except luxi.NoMasterError, err: 749 logging.warning("Master seems to be down (%s), trying to restart", 750 str(err)) 751 if not utils.EnsureDaemon(constants.MASTERD): 752 logging.critical("Can't start the master, exiting") 753 return constants.EXIT_FAILURE 754 # else retry the connection 755 client = cli.GetClient() 756 757 # we are on master now 758 utils.EnsureDaemon(constants.RAPI) 759 760 # If RAPI isn't responding to queries, try one restart. 761 logging.debug("Attempting to talk with RAPI.") 762 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 763 logging.warning("Couldn't get answer from Ganeti RAPI daemon." 764 " Restarting Ganeti RAPI.") 765 utils.StopDaemon(constants.RAPI) 766 utils.EnsureDaemon(constants.RAPI) 767 logging.debug("Second attempt to talk with RAPI") 768 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 769 logging.fatal("RAPI is not responding. Please investigate.") 770 logging.debug("Successfully talked to RAPI.") 771 772 try: 773 watcher = Watcher(options, notepad) 774 except errors.ConfigurationError: 775 # Just exit if there's no configuration 776 update_file = True 777 return constants.EXIT_SUCCESS 778 779 watcher.Run() 780 update_file = True 781 782 finally: 783 if update_file: 784 notepad.Save() 785 else: 786 logging.debug("Not updating status file due to failure") 787 except SystemExit: 788 raise 789 except NotMasterError: 790 logging.debug("Not master, exiting") 791 return constants.EXIT_NOTMASTER 792 except errors.ResolverError, err: 793 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0]) 794 return constants.EXIT_NODESETUP_ERROR 795 except errors.JobQueueFull: 796 logging.error("Job queue is full, can't query cluster state") 797 except errors.JobQueueDrainError: 798 logging.error("Job queue is drained, can't maintain cluster state") 799 except Exception, err: 800 logging.exception(str(err)) 801 return constants.EXIT_FAILURE 802 803 return constants.EXIT_SUCCESS
804