Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  import os 
 31  import os.path 
 32  import sys 
 33  import time 
 34  import logging 
 35  import operator 
 36  import errno 
 37  from optparse import OptionParser 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import compat 
 42  from ganeti import errors 
 43  from ganeti import opcodes 
 44  from ganeti import cli 
 45  from ganeti import luxi 
 46  from ganeti import rapi 
 47  from ganeti import netutils 
 48  from ganeti import qlang 
 49  from ganeti import objects 
 50  from ganeti import ssconf 
 51  from ganeti import ht 
 52   
 53  import ganeti.rapi.client # pylint: disable=W0611 
 54  from ganeti.rapi.client import UsesRapiClient 
 55   
 56  from ganeti.watcher import nodemaint 
 57  from ganeti.watcher import state 
 58   
 59   
 60  MAXTRIES = 5 
 61  BAD_STATES = frozenset([ 
 62    constants.INSTST_ERRORDOWN, 
 63    ]) 
 64  HELPLESS_STATES = frozenset([ 
 65    constants.INSTST_NODEDOWN, 
 66    constants.INSTST_NODEOFFLINE, 
 67    ]) 
 68  NOTICE = "NOTICE" 
 69  ERROR = "ERROR" 
 70   
 71  #: Number of seconds to wait between starting child processes for node groups 
 72  CHILD_PROCESS_DELAY = 1.0 
 73   
 74  #: How many seconds to wait for instance status file lock 
 75  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
76 77 78 -class NotMasterError(errors.GenericError):
79 """Exception raised when this host is not the master."""
80
81 82 -def ShouldPause():
83 """Check whether we should pause. 84 85 """ 86 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87
88 89 -def StartNodeDaemons():
90 """Start all the daemons that should be running on all nodes. 91 92 """ 93 # on master or not, try to start the node daemon 94 utils.EnsureDaemon(constants.NODED) 95 # start confd as well. On non candidates it will be in disabled mode. 96 if constants.ENABLE_CONFD: 97 utils.EnsureDaemon(constants.CONFD)
98
99 100 -def RunWatcherHooks():
101 """Run the watcher hooks. 102 103 """ 104 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR, 105 constants.HOOKS_NAME_WATCHER) 106 if not os.path.isdir(hooks_dir): 107 return 108 109 try: 110 results = utils.RunParts(hooks_dir) 111 except Exception, err: # pylint: disable=W0703 112 logging.exception("RunParts %s failed: %s", hooks_dir, err) 113 return 114 115 for (relname, status, runresult) in results: 116 if status == constants.RUNPARTS_SKIP: 117 logging.debug("Watcher hook %s: skipped", relname) 118 elif status == constants.RUNPARTS_ERR: 119 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 120 elif status == constants.RUNPARTS_RUN: 121 if runresult.failed: 122 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 123 relname, runresult.exit_code, runresult.output) 124 else: 125 logging.debug("Watcher hook %s: success (output: %s)", relname, 126 runresult.output) 127 else: 128 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 129 status)
130
131 132 -class Instance(object):
133 """Abstraction for a Virtual Machine instance. 134 135 """
136 - def __init__(self, name, status, autostart, snodes):
137 self.name = name 138 self.status = status 139 self.autostart = autostart 140 self.snodes = snodes
141
142 - def Restart(self, cl):
143 """Encapsulates the start of an instance. 144 145 """ 146 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 147 cli.SubmitOpCode(op, cl=cl)
148
149 - def ActivateDisks(self, cl):
150 """Encapsulates the activation of all disks of an instance. 151 152 """ 153 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 154 cli.SubmitOpCode(op, cl=cl)
155
156 157 -class Node:
158 """Data container representing cluster node. 159 160 """
161 - def __init__(self, name, bootid, offline, secondaries):
162 """Initializes this class. 163 164 """ 165 self.name = name 166 self.bootid = bootid 167 self.offline = offline 168 self.secondaries = secondaries
169
170 171 -def _CheckInstances(cl, notepad, instances):
172 """Make a pass over the list of instances, restarting downed ones. 173 174 """ 175 notepad.MaintainInstanceList(instances.keys()) 176 177 started = set() 178 179 for inst in instances.values(): 180 if inst.status in BAD_STATES: 181 n = notepad.NumberOfRestartAttempts(inst.name) 182 183 if n > MAXTRIES: 184 logging.warning("Not restarting instance '%s', retries exhausted", 185 inst.name) 186 continue 187 188 if n == MAXTRIES: 189 notepad.RecordRestartAttempt(inst.name) 190 logging.error("Could not restart instance '%s' after %s attempts," 191 " giving up", inst.name, MAXTRIES) 192 continue 193 194 try: 195 logging.info("Restarting instance '%s' (attempt #%s)", 196 inst.name, n + 1) 197 inst.Restart(cl) 198 except Exception: # pylint: disable=W0703 199 logging.exception("Error while restarting instance '%s'", inst.name) 200 else: 201 started.add(inst.name) 202 203 notepad.RecordRestartAttempt(inst.name) 204 205 else: 206 if notepad.NumberOfRestartAttempts(inst.name): 207 notepad.RemoveInstance(inst.name) 208 if inst.status not in HELPLESS_STATES: 209 logging.info("Restart of instance '%s' succeeded", inst.name) 210 211 return started
212
213 214 -def _CheckDisks(cl, notepad, nodes, instances, started):
215 """Check all nodes for restarted ones. 216 217 """ 218 check_nodes = [] 219 220 for node in nodes.values(): 221 old = notepad.GetNodeBootID(node.name) 222 if not node.bootid: 223 # Bad node, not returning a boot id 224 if not node.offline: 225 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 226 node.name) 227 continue 228 229 if old != node.bootid: 230 # Node's boot ID has changed, probably through a reboot 231 check_nodes.append(node) 232 233 if check_nodes: 234 # Activate disks for all instances with any of the checked nodes as a 235 # secondary node. 236 for node in check_nodes: 237 for instance_name in node.secondaries: 238 try: 239 inst = instances[instance_name] 240 except KeyError: 241 logging.info("Can't find instance '%s', maybe it was ignored", 242 instance_name) 243 continue 244 245 if not inst.autostart: 246 logging.info("Skipping disk activation for non-autostart" 247 " instance '%s'", inst.name) 248 continue 249 250 if inst.name in started: 251 # we already tried to start the instance, which should have 252 # activated its drives (if they can be at all) 253 logging.debug("Skipping disk activation for instance '%s' as" 254 " it was already started", inst.name) 255 continue 256 257 try: 258 logging.info("Activating disks for instance '%s'", inst.name) 259 inst.ActivateDisks(cl) 260 except Exception: # pylint: disable=W0703 261 logging.exception("Error while activating disks for instance '%s'", 262 inst.name) 263 264 # Keep changed boot IDs 265 for node in check_nodes: 266 notepad.SetNodeBootID(node.name, node.bootid)
267
268 269 -def _CheckForOfflineNodes(nodes, instance):
270 """Checks if given instances has any secondary in offline status. 271 272 @param instance: The instance object 273 @return: True if any of the secondary is offline, False otherwise 274 275 """ 276 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277
278 279 -def _VerifyDisks(cl, uuid, nodes, instances):
280 """Run a per-group "gnt-cluster verify-disks". 281 282 """ 283 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) 284 ((_, offline_disk_instances, _), ) = \ 285 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 286 cl.ArchiveJob(job_id) 287 288 if not offline_disk_instances: 289 # nothing to do 290 logging.debug("Verify-disks reported no offline disks, nothing to do") 291 return 292 293 logging.debug("Will activate disks for instance(s) %s", 294 utils.CommaJoin(offline_disk_instances)) 295 296 # We submit only one job, and wait for it. Not optimal, but this puts less 297 # load on the job queue. 298 job = [] 299 for name in offline_disk_instances: 300 try: 301 inst = instances[name] 302 except KeyError: 303 logging.info("Can't find instance '%s', maybe it was ignored", name) 304 continue 305 306 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 307 logging.info("Skipping instance '%s' because it is in a helpless state" 308 " or has offline secondaries", name) 309 continue 310 311 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 312 313 if job: 314 job_id = cli.SendJob(job, cl=cl) 315 316 try: 317 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 318 except Exception: # pylint: disable=W0703 319 logging.exception("Error while activating disks")
320
321 322 -def IsRapiResponding(hostname):
323 """Connects to RAPI port and does a simple test. 324 325 Connects to RAPI port of hostname and does a simple test. At this time, the 326 test is GetVersion. 327 328 @type hostname: string 329 @param hostname: hostname of the node to connect to. 330 @rtype: bool 331 @return: Whether RAPI is working properly 332 333 """ 334 curl_config = rapi.client.GenericCurlConfig() 335 rapi_client = rapi.client.GanetiRapiClient(hostname, 336 curl_config_fn=curl_config) 337 try: 338 master_version = rapi_client.GetVersion() 339 except rapi.client.CertificateError, err: 340 logging.warning("RAPI certificate error: %s", err) 341 return False 342 except rapi.client.GanetiApiError, err: 343 logging.warning("RAPI error: %s", err) 344 return False 345 else: 346 logging.debug("Reported RAPI version %s", master_version) 347 return master_version == constants.RAPI_VERSION
348
349 350 -def ParseOptions():
351 """Parse the command line options. 352 353 @return: (options, args) as from OptionParser.parse_args() 354 355 """ 356 parser = OptionParser(description="Ganeti cluster watcher", 357 usage="%prog [-d]", 358 version="%%prog (ganeti) %s" % 359 constants.RELEASE_VERSION) 360 361 parser.add_option(cli.DEBUG_OPT) 362 parser.add_option(cli.NODEGROUP_OPT) 363 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 364 help="Autoarchive jobs older than this age (default" 365 " 6 hours)") 366 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 367 action="store_true", help="Ignore cluster pause setting") 368 parser.add_option("--wait-children", dest="wait_children", 369 action="store_true", help="Wait for child processes") 370 parser.add_option("--no-wait-children", dest="wait_children", 371 action="store_false", 372 help="Don't wait for child processes") 373 # See optparse documentation for why default values are not set by options 374 parser.set_defaults(wait_children=True) 375 options, args = parser.parse_args() 376 options.job_age = cli.ParseTimespec(options.job_age) 377 378 if args: 379 parser.error("No arguments expected") 380 381 return (options, args)
382
383 384 -def _WriteInstanceStatus(filename, data):
385 """Writes the per-group instance status file. 386 387 The entries are sorted. 388 389 @type filename: string 390 @param filename: Path to instance status file 391 @type data: list of tuple; (instance name as string, status as string) 392 @param data: Instance name and status 393 394 """ 395 logging.debug("Updating instance status file '%s' with %s instances", 396 filename, len(data)) 397 398 utils.WriteFile(filename, 399 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 400 sorted(data))))
401
402 403 -def _UpdateInstanceStatus(filename, instances):
404 """Writes an instance status file from L{Instance} objects. 405 406 @type filename: string 407 @param filename: Path to status file 408 @type instances: list of L{Instance} 409 410 """ 411 _WriteInstanceStatus(filename, [(inst.name, inst.status) 412 for inst in instances])
413
414 415 -def _ReadInstanceStatus(filename):
416 """Reads an instance status file. 417 418 @type filename: string 419 @param filename: Path to status file 420 @rtype: tuple; (None or number, list of lists containing instance name and 421 status) 422 @return: File's mtime and instance status contained in the file; mtime is 423 C{None} if file can't be read 424 425 """ 426 logging.debug("Reading per-group instance status from '%s'", filename) 427 428 statcb = utils.FileStatHelper() 429 try: 430 content = utils.ReadFile(filename, preread=statcb) 431 except EnvironmentError, err: 432 if err.errno == errno.ENOENT: 433 logging.error("Can't read '%s', does not exist (yet)", filename) 434 else: 435 logging.exception("Unable to read '%s', ignoring", filename) 436 return (None, None) 437 else: 438 return (statcb.st.st_mtime, [line.split(None, 1) 439 for line in content.splitlines()])
440
441 442 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
443 """Merges all per-group instance status files into a global one. 444 445 @type filename: string 446 @param filename: Path to global instance status file 447 @type pergroup_filename: string 448 @param pergroup_filename: Path to per-group status files, must contain "%s" 449 to be replaced with group UUID 450 @type groups: sequence 451 @param groups: UUIDs of known groups 452 453 """ 454 # Lock global status file in exclusive mode 455 lock = utils.FileLock.Open(filename) 456 try: 457 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 458 except errors.LockError, err: 459 # All per-group processes will lock and update the file. None of them 460 # should take longer than 10 seconds (the value of 461 # INSTANCE_STATUS_LOCK_TIMEOUT). 462 logging.error("Can't acquire lock on instance status file '%s', not" 463 " updating: %s", filename, err) 464 return 465 466 logging.debug("Acquired exclusive lock on '%s'", filename) 467 468 data = {} 469 470 # Load instance status from all groups 471 for group_uuid in groups: 472 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 473 474 if mtime is not None: 475 for (instance_name, status) in instdata: 476 data.setdefault(instance_name, []).append((mtime, status)) 477 478 # Select last update based on file mtime 479 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 480 for (instance_name, status) in data.items()] 481 482 # Write the global status file. Don't touch file after it's been 483 # updated--there is no lock anymore. 484 _WriteInstanceStatus(filename, inststatus)
485
486 487 -def GetLuxiClient(try_restart):
488 """Tries to connect to the master daemon. 489 490 @type try_restart: bool 491 @param try_restart: Whether to attempt to restart the master daemon 492 493 """ 494 try: 495 return cli.GetClient() 496 except errors.OpPrereqError, err: 497 # this is, from cli.GetClient, a not-master case 498 raise NotMasterError("Not on master node (%s)" % err) 499 500 except luxi.NoMasterError, err: 501 if not try_restart: 502 raise 503 504 logging.warning("Master daemon seems to be down (%s), trying to restart", 505 err) 506 507 if not utils.EnsureDaemon(constants.MASTERD): 508 raise errors.GenericError("Can't start the master daemon") 509 510 # Retry the connection 511 return cli.GetClient()
512
513 514 -def _StartGroupChildren(cl, wait):
515 """Starts a new instance of the watcher for every node group. 516 517 """ 518 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 519 for arg in sys.argv) 520 521 result = cl.QueryGroups([], ["name", "uuid"], False) 522 523 children = [] 524 525 for (idx, (name, uuid)) in enumerate(result): 526 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 527 528 if idx > 0: 529 # Let's not kill the system 530 time.sleep(CHILD_PROCESS_DELAY) 531 532 logging.debug("Spawning child for group '%s' (%s), arguments %s", 533 name, uuid, args) 534 535 try: 536 # TODO: Should utils.StartDaemon be used instead? 537 pid = os.spawnv(os.P_NOWAIT, args[0], args) 538 except Exception: # pylint: disable=W0703 539 logging.exception("Failed to start child for group '%s' (%s)", 540 name, uuid) 541 else: 542 logging.debug("Started with PID %s", pid) 543 children.append(pid) 544 545 if wait: 546 for pid in children: 547 logging.debug("Waiting for child PID %s", pid) 548 try: 549 result = utils.RetryOnSignal(os.waitpid, pid, 0) 550 except EnvironmentError, err: 551 result = str(err) 552 553 logging.debug("Child PID %s exited with status %s", pid, result)
554
555 556 -def _ArchiveJobs(cl, age):
557 """Archives old jobs. 558 559 """ 560 (arch_count, left_count) = cl.AutoArchiveJobs(age) 561 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
562
563 564 -def _CheckMaster(cl):
565 """Ensures current host is master node. 566 567 """ 568 (master, ) = cl.QueryConfigValues(["master_node"]) 569 if master != netutils.Hostname.GetSysName(): 570 raise NotMasterError("This is not the master node")
571
572 573 @UsesRapiClient 574 -def _GlobalWatcher(opts):
575 """Main function for global watcher. 576 577 At the end child processes are spawned for every node group. 578 579 """ 580 StartNodeDaemons() 581 RunWatcherHooks() 582 583 # Run node maintenance in all cases, even if master, so that old masters can 584 # be properly cleaned up 585 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 586 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 587 588 try: 589 client = GetLuxiClient(True) 590 except NotMasterError: 591 # Don't proceed on non-master nodes 592 return constants.EXIT_SUCCESS 593 594 # we are on master now 595 utils.EnsureDaemon(constants.RAPI) 596 597 # If RAPI isn't responding to queries, try one restart 598 logging.debug("Attempting to talk to remote API on %s", 599 constants.IP4_ADDRESS_LOCALHOST) 600 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 601 logging.warning("Couldn't get answer from remote API, restaring daemon") 602 utils.StopDaemon(constants.RAPI) 603 utils.EnsureDaemon(constants.RAPI) 604 logging.debug("Second attempt to talk to remote API") 605 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 606 logging.fatal("RAPI is not responding") 607 logging.debug("Successfully talked to remote API") 608 609 _CheckMaster(client) 610 _ArchiveJobs(client, opts.job_age) 611 612 # Spawn child processes for all node groups 613 _StartGroupChildren(client, opts.wait_children) 614 615 return constants.EXIT_SUCCESS
616
617 618 -def _GetGroupData(cl, uuid):
619 """Retrieves instances and nodes per node group. 620 621 """ 622 job = [ 623 # Get all primary instances in group 624 opcodes.OpQuery(what=constants.QR_INSTANCE, 625 fields=["name", "status", "admin_state", "snodes", 626 "pnode.group.uuid", "snodes.group.uuid"], 627 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 628 use_locking=True), 629 630 # Get all nodes in group 631 opcodes.OpQuery(what=constants.QR_NODE, 632 fields=["name", "bootid", "offline"], 633 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid], 634 use_locking=True), 635 ] 636 637 job_id = cl.SubmitJob(job) 638 results = map(objects.QueryResponse.FromDict, 639 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 640 cl.ArchiveJob(job_id) 641 642 results_data = map(operator.attrgetter("data"), results) 643 644 # Ensure results are tuples with two values 645 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 646 647 # Extract values ignoring result status 648 (raw_instances, raw_nodes) = [[map(compat.snd, values) 649 for values in res] 650 for res in results_data] 651 652 secondaries = {} 653 instances = [] 654 655 # Load all instances 656 for (name, status, autostart, snodes, pnode_group_uuid, 657 snodes_group_uuid) in raw_instances: 658 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 659 logging.error("Ignoring split instance '%s', primary group %s, secondary" 660 " groups %s", name, pnode_group_uuid, 661 utils.CommaJoin(snodes_group_uuid)) 662 else: 663 instances.append(Instance(name, status, autostart, snodes)) 664 665 for node in snodes: 666 secondaries.setdefault(node, set()).add(name) 667 668 # Load all nodes 669 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 670 for (name, bootid, offline) in raw_nodes] 671 672 return (dict((node.name, node) for node in nodes), 673 dict((inst.name, inst) for inst in instances))
674
675 676 -def _LoadKnownGroups():
677 """Returns a list of all node groups known by L{ssconf}. 678 679 """ 680 groups = ssconf.SimpleStore().GetNodegroupList() 681 682 result = list(line.split(None, 1)[0] for line in groups 683 if line.strip()) 684 685 if not compat.all(map(utils.UUID_RE.match, result)): 686 raise errors.GenericError("Ssconf contains invalid group UUID") 687 688 return result
689
690 691 -def _GroupWatcher(opts):
692 """Main function for per-group watcher process. 693 694 """ 695 group_uuid = opts.nodegroup.lower() 696 697 if not utils.UUID_RE.match(group_uuid): 698 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 699 " got '%s'" % 700 (cli.NODEGROUP_OPT_NAME, group_uuid)) 701 702 logging.info("Watcher for node group '%s'", group_uuid) 703 704 known_groups = _LoadKnownGroups() 705 706 # Check if node group is known 707 if group_uuid not in known_groups: 708 raise errors.GenericError("Node group '%s' is not known by ssconf" % 709 group_uuid) 710 711 # Group UUID has been verified and should not contain any dangerous 712 # characters 713 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid 714 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 715 716 logging.debug("Using state file %s", state_path) 717 718 # Global watcher 719 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 720 if not statefile: 721 return constants.EXIT_FAILURE 722 723 notepad = state.WatcherState(statefile) # pylint: disable=E0602 724 try: 725 # Connect to master daemon 726 client = GetLuxiClient(False) 727 728 _CheckMaster(client) 729 730 (nodes, instances) = _GetGroupData(client, group_uuid) 731 732 # Update per-group instance status file 733 _UpdateInstanceStatus(inst_status_path, instances.values()) 734 735 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE, 736 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE, 737 known_groups) 738 739 started = _CheckInstances(client, notepad, instances) 740 _CheckDisks(client, notepad, nodes, instances, started) 741 _VerifyDisks(client, group_uuid, nodes, instances) 742 except Exception, err: 743 logging.info("Not updating status file due to failure: %s", err) 744 raise 745 else: 746 # Save changes for next run 747 notepad.Save(state_path) 748 749 return constants.EXIT_SUCCESS
750
751 752 -def Main():
753 """Main function. 754 755 """ 756 (options, _) = ParseOptions() 757 758 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0], 759 debug=options.debug, stderr_logging=options.debug) 760 761 if ShouldPause() and not options.ignore_pause: 762 logging.debug("Pause has been set, exiting") 763 return constants.EXIT_SUCCESS 764 765 # Try to acquire global watcher lock in shared mode 766 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE) 767 try: 768 lock.Shared(blocking=False) 769 except (EnvironmentError, errors.LockError), err: 770 logging.error("Can't acquire lock on %s: %s", 771 constants.WATCHER_LOCK_FILE, err) 772 return constants.EXIT_SUCCESS 773 774 if options.nodegroup is None: 775 fn = _GlobalWatcher 776 else: 777 # Per-nodegroup watcher 778 fn = _GroupWatcher 779 780 try: 781 return fn(options) 782 except (SystemExit, KeyboardInterrupt): 783 raise 784 except NotMasterError: 785 logging.debug("Not master, exiting") 786 return constants.EXIT_NOTMASTER 787 except errors.ResolverError, err: 788 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 789 return constants.EXIT_NODESETUP_ERROR 790 except errors.JobQueueFull: 791 logging.error("Job queue is full, can't query cluster state") 792 except errors.JobQueueDrainError: 793 logging.error("Job queue is drained, can't maintain cluster state") 794 except Exception, err: 795 logging.exception(str(err)) 796 return constants.EXIT_FAILURE 797 798 return constants.EXIT_SUCCESS
799