Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  import os 
 31  import os.path 
 32  import sys 
 33  import time 
 34  import logging 
 35  import operator 
 36  import errno 
 37  from optparse import OptionParser 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import compat 
 42  from ganeti import errors 
 43  from ganeti import opcodes 
 44  from ganeti import cli 
 45  from ganeti import luxi 
 46  from ganeti import rapi 
 47  from ganeti import netutils 
 48  from ganeti import qlang 
 49  from ganeti import objects 
 50  from ganeti import ssconf 
 51  from ganeti import ht 
 52   
 53  import ganeti.rapi.client # pylint: disable=W0611 
 54   
 55  from ganeti.watcher import nodemaint 
 56  from ganeti.watcher import state 
 57   
 58   
 59  MAXTRIES = 5 
 60  BAD_STATES = frozenset([ 
 61    constants.INSTST_ERRORDOWN, 
 62    ]) 
 63  HELPLESS_STATES = frozenset([ 
 64    constants.INSTST_NODEDOWN, 
 65    constants.INSTST_NODEOFFLINE, 
 66    ]) 
 67  NOTICE = "NOTICE" 
 68  ERROR = "ERROR" 
 69   
 70  #: Number of seconds to wait between starting child processes for node groups 
 71  CHILD_PROCESS_DELAY = 1.0 
 72   
 73  #: How many seconds to wait for instance status file lock 
 74  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
75 76 77 -class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
79
80 81 -def ShouldPause():
82 """Check whether we should pause. 83 84 """ 85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86
87 88 -def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes. 90 91 """ 92 # on master or not, try to start the node daemon 93 utils.EnsureDaemon(constants.NODED) 94 # start confd as well. On non candidates it will be in disabled mode. 95 utils.EnsureDaemon(constants.CONFD)
96
97 98 -def RunWatcherHooks():
99 """Run the watcher hooks. 100 101 """ 102 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR, 103 constants.HOOKS_NAME_WATCHER) 104 if not os.path.isdir(hooks_dir): 105 return 106 107 try: 108 results = utils.RunParts(hooks_dir) 109 except Exception, err: # pylint: disable=W0703 110 logging.exception("RunParts %s failed: %s", hooks_dir, err) 111 return 112 113 for (relname, status, runresult) in results: 114 if status == constants.RUNPARTS_SKIP: 115 logging.debug("Watcher hook %s: skipped", relname) 116 elif status == constants.RUNPARTS_ERR: 117 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 118 elif status == constants.RUNPARTS_RUN: 119 if runresult.failed: 120 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 121 relname, runresult.exit_code, runresult.output) 122 else: 123 logging.debug("Watcher hook %s: success (output: %s)", relname, 124 runresult.output) 125 else: 126 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 127 status)
128
129 130 -class Instance(object):
131 """Abstraction for a Virtual Machine instance. 132 133 """
134 - def __init__(self, name, status, autostart, snodes):
135 self.name = name 136 self.status = status 137 self.autostart = autostart 138 self.snodes = snodes
139
140 - def Restart(self, cl):
141 """Encapsulates the start of an instance. 142 143 """ 144 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 145 cli.SubmitOpCode(op, cl=cl)
146
147 - def ActivateDisks(self, cl):
148 """Encapsulates the activation of all disks of an instance. 149 150 """ 151 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 152 cli.SubmitOpCode(op, cl=cl)
153
154 155 -class Node:
156 """Data container representing cluster node. 157 158 """
159 - def __init__(self, name, bootid, offline, secondaries):
160 """Initializes this class. 161 162 """ 163 self.name = name 164 self.bootid = bootid 165 self.offline = offline 166 self.secondaries = secondaries
167
168 169 -def _CheckInstances(cl, notepad, instances):
170 """Make a pass over the list of instances, restarting downed ones. 171 172 """ 173 notepad.MaintainInstanceList(instances.keys()) 174 175 started = set() 176 177 for inst in instances.values(): 178 if inst.status in BAD_STATES: 179 n = notepad.NumberOfRestartAttempts(inst.name) 180 181 if n > MAXTRIES: 182 logging.warning("Not restarting instance '%s', retries exhausted", 183 inst.name) 184 continue 185 186 if n == MAXTRIES: 187 notepad.RecordRestartAttempt(inst.name) 188 logging.error("Could not restart instance '%s' after %s attempts," 189 " giving up", inst.name, MAXTRIES) 190 continue 191 192 try: 193 logging.info("Restarting instance '%s' (attempt #%s)", 194 inst.name, n + 1) 195 inst.Restart(cl) 196 except Exception: # pylint: disable=W0703 197 logging.exception("Error while restarting instance '%s'", inst.name) 198 else: 199 started.add(inst.name) 200 201 notepad.RecordRestartAttempt(inst.name) 202 203 else: 204 if notepad.NumberOfRestartAttempts(inst.name): 205 notepad.RemoveInstance(inst.name) 206 if inst.status not in HELPLESS_STATES: 207 logging.info("Restart of instance '%s' succeeded", inst.name) 208 209 return started
210
211 212 -def _CheckDisks(cl, notepad, nodes, instances, started):
213 """Check all nodes for restarted ones. 214 215 """ 216 check_nodes = [] 217 218 for node in nodes.values(): 219 old = notepad.GetNodeBootID(node.name) 220 if not node.bootid: 221 # Bad node, not returning a boot id 222 if not node.offline: 223 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 224 node.name) 225 continue 226 227 if old != node.bootid: 228 # Node's boot ID has changed, probably through a reboot 229 check_nodes.append(node) 230 231 if check_nodes: 232 # Activate disks for all instances with any of the checked nodes as a 233 # secondary node. 234 for node in check_nodes: 235 for instance_name in node.secondaries: 236 try: 237 inst = instances[instance_name] 238 except KeyError: 239 logging.info("Can't find instance '%s', maybe it was ignored", 240 instance_name) 241 continue 242 243 if not inst.autostart: 244 logging.info("Skipping disk activation for non-autostart" 245 " instance '%s'", inst.name) 246 continue 247 248 if inst.name in started: 249 # we already tried to start the instance, which should have 250 # activated its drives (if they can be at all) 251 logging.debug("Skipping disk activation for instance '%s' as" 252 " it was already started", inst.name) 253 continue 254 255 try: 256 logging.info("Activating disks for instance '%s'", inst.name) 257 inst.ActivateDisks(cl) 258 except Exception: # pylint: disable=W0703 259 logging.exception("Error while activating disks for instance '%s'", 260 inst.name) 261 262 # Keep changed boot IDs 263 for node in check_nodes: 264 notepad.SetNodeBootID(node.name, node.bootid)
265
266 267 -def _CheckForOfflineNodes(nodes, instance):
268 """Checks if given instances has any secondary in offline status. 269 270 @param instance: The instance object 271 @return: True if any of the secondary is offline, False otherwise 272 273 """ 274 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275
276 277 -def _VerifyDisks(cl, uuid, nodes, instances):
278 """Run a per-group "gnt-cluster verify-disks". 279 280 """ 281 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) 282 ((_, offline_disk_instances, _), ) = \ 283 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 284 cl.ArchiveJob(job_id) 285 286 if not offline_disk_instances: 287 # nothing to do 288 logging.debug("Verify-disks reported no offline disks, nothing to do") 289 return 290 291 logging.debug("Will activate disks for instance(s) %s", 292 utils.CommaJoin(offline_disk_instances)) 293 294 # We submit only one job, and wait for it. Not optimal, but this puts less 295 # load on the job queue. 296 job = [] 297 for name in offline_disk_instances: 298 try: 299 inst = instances[name] 300 except KeyError: 301 logging.info("Can't find instance '%s', maybe it was ignored", name) 302 continue 303 304 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 305 logging.info("Skipping instance '%s' because it is in a helpless state or" 306 " has offline secondaries", name) 307 continue 308 309 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 310 311 if job: 312 job_id = cli.SendJob(job, cl=cl) 313 314 try: 315 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 316 except Exception: # pylint: disable=W0703 317 logging.exception("Error while activating disks")
318
319 320 -def IsRapiResponding(hostname):
321 """Connects to RAPI port and does a simple test. 322 323 Connects to RAPI port of hostname and does a simple test. At this time, the 324 test is GetVersion. 325 326 @type hostname: string 327 @param hostname: hostname of the node to connect to. 328 @rtype: bool 329 @return: Whether RAPI is working properly 330 331 """ 332 curl_config = rapi.client.GenericCurlConfig() 333 rapi_client = rapi.client.GanetiRapiClient(hostname, 334 curl_config_fn=curl_config) 335 try: 336 master_version = rapi_client.GetVersion() 337 except rapi.client.CertificateError, err: 338 logging.warning("RAPI certificate error: %s", err) 339 return False 340 except rapi.client.GanetiApiError, err: 341 logging.warning("RAPI error: %s", err) 342 return False 343 else: 344 logging.debug("Reported RAPI version %s", master_version) 345 return master_version == constants.RAPI_VERSION
346
347 348 -def ParseOptions():
349 """Parse the command line options. 350 351 @return: (options, args) as from OptionParser.parse_args() 352 353 """ 354 parser = OptionParser(description="Ganeti cluster watcher", 355 usage="%prog [-d]", 356 version="%%prog (ganeti) %s" % 357 constants.RELEASE_VERSION) 358 359 parser.add_option(cli.DEBUG_OPT) 360 parser.add_option(cli.NODEGROUP_OPT) 361 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 362 help="Autoarchive jobs older than this age (default" 363 " 6 hours)") 364 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 365 action="store_true", help="Ignore cluster pause setting") 366 parser.add_option("--wait-children", dest="wait_children", 367 action="store_true", help="Wait for child processes") 368 parser.add_option("--no-wait-children", dest="wait_children", 369 action="store_false", help="Don't wait for child processes") 370 # See optparse documentation for why default values are not set by options 371 parser.set_defaults(wait_children=True) 372 options, args = parser.parse_args() 373 options.job_age = cli.ParseTimespec(options.job_age) 374 375 if args: 376 parser.error("No arguments expected") 377 378 return (options, args)
379
380 381 -def _WriteInstanceStatus(filename, data):
382 """Writes the per-group instance status file. 383 384 The entries are sorted. 385 386 @type filename: string 387 @param filename: Path to instance status file 388 @type data: list of tuple; (instance name as string, status as string) 389 @param data: Instance name and status 390 391 """ 392 logging.debug("Updating instance status file '%s' with %s instances", 393 filename, len(data)) 394 395 utils.WriteFile(filename, 396 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 397 sorted(data))))
398
399 400 -def _UpdateInstanceStatus(filename, instances):
401 """Writes an instance status file from L{Instance} objects. 402 403 @type filename: string 404 @param filename: Path to status file 405 @type instances: list of L{Instance} 406 407 """ 408 _WriteInstanceStatus(filename, [(inst.name, inst.status) 409 for inst in instances])
410
411 412 -class _StatCb:
413 """Helper to store file handle's C{fstat}. 414 415 """
416 - def __init__(self):
417 """Initializes this class. 418 419 """ 420 self.st = None
421
422 - def __call__(self, fh):
423 """Calls C{fstat} on file handle. 424 425 """ 426 self.st = os.fstat(fh.fileno())
427
428 429 -def _ReadInstanceStatus(filename):
430 """Reads an instance status file. 431 432 @type filename: string 433 @param filename: Path to status file 434 @rtype: tuple; (None or number, list of lists containing instance name and 435 status) 436 @return: File's mtime and instance status contained in the file; mtime is 437 C{None} if file can't be read 438 439 """ 440 logging.debug("Reading per-group instance status from '%s'", filename) 441 442 statcb = _StatCb() 443 try: 444 content = utils.ReadFile(filename, preread=statcb) 445 except EnvironmentError, err: 446 if err.errno == errno.ENOENT: 447 logging.error("Can't read '%s', does not exist (yet)", filename) 448 else: 449 logging.exception("Unable to read '%s', ignoring", filename) 450 return (None, None) 451 else: 452 return (statcb.st.st_mtime, [line.split(None, 1) 453 for line in content.splitlines()])
454
455 456 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
457 """Merges all per-group instance status files into a global one. 458 459 @type filename: string 460 @param filename: Path to global instance status file 461 @type pergroup_filename: string 462 @param pergroup_filename: Path to per-group status files, must contain "%s" 463 to be replaced with group UUID 464 @type groups: sequence 465 @param groups: UUIDs of known groups 466 467 """ 468 # Lock global status file in exclusive mode 469 lock = utils.FileLock.Open(filename) 470 try: 471 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 472 except errors.LockError, err: 473 # All per-group processes will lock and update the file. None of them 474 # should take longer than 10 seconds (the value of 475 # INSTANCE_STATUS_LOCK_TIMEOUT). 476 logging.error("Can't acquire lock on instance status file '%s', not" 477 " updating: %s", filename, err) 478 return 479 480 logging.debug("Acquired exclusive lock on '%s'", filename) 481 482 data = {} 483 484 # Load instance status from all groups 485 for group_uuid in groups: 486 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 487 488 if mtime is not None: 489 for (instance_name, status) in instdata: 490 data.setdefault(instance_name, []).append((mtime, status)) 491 492 # Select last update based on file mtime 493 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 494 for (instance_name, status) in data.items()] 495 496 # Write the global status file. Don't touch file after it's been 497 # updated--there is no lock anymore. 498 _WriteInstanceStatus(filename, inststatus)
499
500 501 -def GetLuxiClient(try_restart):
502 """Tries to connect to the master daemon. 503 504 @type try_restart: bool 505 @param try_restart: Whether to attempt to restart the master daemon 506 507 """ 508 try: 509 return cli.GetClient() 510 except errors.OpPrereqError, err: 511 # this is, from cli.GetClient, a not-master case 512 raise NotMasterError("Not on master node (%s)" % err) 513 514 except luxi.NoMasterError, err: 515 if not try_restart: 516 raise 517 518 logging.warning("Master daemon seems to be down (%s), trying to restart", 519 err) 520 521 if not utils.EnsureDaemon(constants.MASTERD): 522 raise errors.GenericError("Can't start the master daemon") 523 524 # Retry the connection 525 return cli.GetClient()
526
527 528 -def _StartGroupChildren(cl, wait):
529 """Starts a new instance of the watcher for every node group. 530 531 """ 532 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 533 for arg in sys.argv) 534 535 result = cl.QueryGroups([], ["name", "uuid"], False) 536 537 children = [] 538 539 for (idx, (name, uuid)) in enumerate(result): 540 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 541 542 if idx > 0: 543 # Let's not kill the system 544 time.sleep(CHILD_PROCESS_DELAY) 545 546 logging.debug("Spawning child for group '%s' (%s), arguments %s", 547 name, uuid, args) 548 549 try: 550 # TODO: Should utils.StartDaemon be used instead? 551 pid = os.spawnv(os.P_NOWAIT, args[0], args) 552 except Exception: # pylint: disable=W0703 553 logging.exception("Failed to start child for group '%s' (%s)", 554 name, uuid) 555 else: 556 logging.debug("Started with PID %s", pid) 557 children.append(pid) 558 559 if wait: 560 for pid in children: 561 logging.debug("Waiting for child PID %s", pid) 562 try: 563 result = utils.RetryOnSignal(os.waitpid, pid, 0) 564 except EnvironmentError, err: 565 result = str(err) 566 567 logging.debug("Child PID %s exited with status %s", pid, result)
568
569 570 -def _ArchiveJobs(cl, age):
571 """Archives old jobs. 572 573 """ 574 (arch_count, left_count) = cl.AutoArchiveJobs(age) 575 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
576
577 578 -def _CheckMaster(cl):
579 """Ensures current host is master node. 580 581 """ 582 (master, ) = cl.QueryConfigValues(["master_node"]) 583 if master != netutils.Hostname.GetSysName(): 584 raise NotMasterError("This is not the master node")
585 586 587 @rapi.client.UsesRapiClient
588 -def _GlobalWatcher(opts):
589 """Main function for global watcher. 590 591 At the end child processes are spawned for every node group. 592 593 """ 594 StartNodeDaemons() 595 RunWatcherHooks() 596 597 # Run node maintenance in all cases, even if master, so that old masters can 598 # be properly cleaned up 599 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 600 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 601 602 try: 603 client = GetLuxiClient(True) 604 except NotMasterError: 605 # Don't proceed on non-master nodes 606 return constants.EXIT_SUCCESS 607 608 # we are on master now 609 utils.EnsureDaemon(constants.RAPI) 610 611 # If RAPI isn't responding to queries, try one restart 612 logging.debug("Attempting to talk to remote API on %s", 613 constants.IP4_ADDRESS_LOCALHOST) 614 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 615 logging.warning("Couldn't get answer from remote API, restaring daemon") 616 utils.StopDaemon(constants.RAPI) 617 utils.EnsureDaemon(constants.RAPI) 618 logging.debug("Second attempt to talk to remote API") 619 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 620 logging.fatal("RAPI is not responding") 621 logging.debug("Successfully talked to remote API") 622 623 _CheckMaster(client) 624 _ArchiveJobs(client, opts.job_age) 625 626 # Spawn child processes for all node groups 627 _StartGroupChildren(client, opts.wait_children) 628 629 return constants.EXIT_SUCCESS
630
631 632 -def _GetGroupData(cl, uuid):
633 """Retrieves instances and nodes per node group. 634 635 """ 636 job = [ 637 # Get all primary instances in group 638 opcodes.OpQuery(what=constants.QR_INSTANCE, 639 fields=["name", "status", "admin_state", "snodes", 640 "pnode.group.uuid", "snodes.group.uuid"], 641 filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 642 use_locking=True), 643 644 # Get all nodes in group 645 opcodes.OpQuery(what=constants.QR_NODE, 646 fields=["name", "bootid", "offline"], 647 filter=[qlang.OP_EQUAL, "group.uuid", uuid], 648 use_locking=True), 649 ] 650 651 job_id = cl.SubmitJob(job) 652 results = map(objects.QueryResponse.FromDict, 653 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 654 cl.ArchiveJob(job_id) 655 656 results_data = map(operator.attrgetter("data"), results) 657 658 # Ensure results are tuples with two values 659 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 660 661 # Extract values ignoring result status 662 (raw_instances, raw_nodes) = [[map(compat.snd, values) 663 for values in res] 664 for res in results_data] 665 666 secondaries = {} 667 instances = [] 668 669 # Load all instances 670 for (name, status, autostart, snodes, pnode_group_uuid, 671 snodes_group_uuid) in raw_instances: 672 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 673 logging.error("Ignoring split instance '%s', primary group %s, secondary" 674 " groups %s", name, pnode_group_uuid, 675 utils.CommaJoin(snodes_group_uuid)) 676 else: 677 instances.append(Instance(name, status, autostart, snodes)) 678 679 for node in snodes: 680 secondaries.setdefault(node, set()).add(name) 681 682 # Load all nodes 683 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 684 for (name, bootid, offline) in raw_nodes] 685 686 return (dict((node.name, node) for node in nodes), 687 dict((inst.name, inst) for inst in instances))
688
689 690 -def _LoadKnownGroups():
691 """Returns a list of all node groups known by L{ssconf}. 692 693 """ 694 groups = ssconf.SimpleStore().GetNodegroupList() 695 696 result = list(line.split(None, 1)[0] for line in groups 697 if line.strip()) 698 699 if not compat.all(map(utils.UUID_RE.match, result)): 700 raise errors.GenericError("Ssconf contains invalid group UUID") 701 702 return result
703
704 705 -def _GroupWatcher(opts):
706 """Main function for per-group watcher process. 707 708 """ 709 group_uuid = opts.nodegroup.lower() 710 711 if not utils.UUID_RE.match(group_uuid): 712 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 713 " got '%s'" % 714 (cli.NODEGROUP_OPT_NAME, group_uuid)) 715 716 logging.info("Watcher for node group '%s'", group_uuid) 717 718 known_groups = _LoadKnownGroups() 719 720 # Check if node group is known 721 if group_uuid not in known_groups: 722 raise errors.GenericError("Node group '%s' is not known by ssconf" % 723 group_uuid) 724 725 # Group UUID has been verified and should not contain any dangerous characters 726 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid 727 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 728 729 logging.debug("Using state file %s", state_path) 730 731 # Global watcher 732 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 733 if not statefile: 734 return constants.EXIT_FAILURE 735 736 notepad = state.WatcherState(statefile) # pylint: disable=E0602 737 try: 738 # Connect to master daemon 739 client = GetLuxiClient(False) 740 741 _CheckMaster(client) 742 743 (nodes, instances) = _GetGroupData(client, group_uuid) 744 745 # Update per-group instance status file 746 _UpdateInstanceStatus(inst_status_path, instances.values()) 747 748 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE, 749 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE, 750 known_groups) 751 752 started = _CheckInstances(client, notepad, instances) 753 _CheckDisks(client, notepad, nodes, instances, started) 754 _VerifyDisks(client, group_uuid, nodes, instances) 755 except Exception, err: 756 logging.info("Not updating status file due to failure: %s", err) 757 raise 758 else: 759 # Save changes for next run 760 notepad.Save(state_path) 761 762 return constants.EXIT_SUCCESS
763
764 765 -def Main():
766 """Main function. 767 768 """ 769 (options, _) = ParseOptions() 770 771 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0], 772 debug=options.debug, stderr_logging=options.debug) 773 774 if ShouldPause() and not options.ignore_pause: 775 logging.debug("Pause has been set, exiting") 776 return constants.EXIT_SUCCESS 777 778 # Try to acquire global watcher lock in shared mode 779 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE) 780 try: 781 lock.Shared(blocking=False) 782 except (EnvironmentError, errors.LockError), err: 783 logging.error("Can't acquire lock on %s: %s", 784 constants.WATCHER_LOCK_FILE, err) 785 return constants.EXIT_SUCCESS 786 787 if options.nodegroup is None: 788 fn = _GlobalWatcher 789 else: 790 # Per-nodegroup watcher 791 fn = _GroupWatcher 792 793 try: 794 return fn(options) 795 except (SystemExit, KeyboardInterrupt): 796 raise 797 except NotMasterError: 798 logging.debug("Not master, exiting") 799 return constants.EXIT_NOTMASTER 800 except errors.ResolverError, err: 801 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 802 return constants.EXIT_NODESETUP_ERROR 803 except errors.JobQueueFull: 804 logging.error("Job queue is full, can't query cluster state") 805 except errors.JobQueueDrainError: 806 logging.error("Job queue is drained, can't maintain cluster state") 807 except Exception, err: 808 logging.exception(str(err)) 809 return constants.EXIT_FAILURE 810 811 return constants.EXIT_SUCCESS
812