Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  import os 
 31  import os.path 
 32  import sys 
 33  import time 
 34  import logging 
 35  import operator 
 36  import errno 
 37  from optparse import OptionParser 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import compat 
 42  from ganeti import errors 
 43  from ganeti import opcodes 
 44  from ganeti import cli 
 45  from ganeti import luxi 
 46  from ganeti import rapi 
 47  from ganeti import netutils 
 48  from ganeti import qlang 
 49  from ganeti import objects 
 50  from ganeti import ssconf 
 51  from ganeti import ht 
 52  from ganeti import pathutils 
 53   
 54  import ganeti.rapi.client # pylint: disable=W0611 
 55  from ganeti.rapi.client import UsesRapiClient 
 56   
 57  from ganeti.watcher import nodemaint 
 58  from ganeti.watcher import state 
 59   
 60   
 61  MAXTRIES = 5 
 62  BAD_STATES = compat.UniqueFrozenset([ 
 63    constants.INSTST_ERRORDOWN, 
 64    ]) 
 65  HELPLESS_STATES = compat.UniqueFrozenset([ 
 66    constants.INSTST_NODEDOWN, 
 67    constants.INSTST_NODEOFFLINE, 
 68    ]) 
 69  NOTICE = "NOTICE" 
 70  ERROR = "ERROR" 
 71   
 72  #: Number of seconds to wait between starting child processes for node groups 
 73  CHILD_PROCESS_DELAY = 1.0 
 74   
 75  #: How many seconds to wait for instance status file lock 
 76  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
77 78 79 -class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
81
82 83 -def ShouldPause():
84 """Check whether we should pause. 85 86 """ 87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89 90 -def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes. 92 93 """ 94 # on master or not, try to start the node daemon 95 utils.EnsureDaemon(constants.NODED) 96 # start confd as well. On non candidates it will be in disabled mode. 97 if constants.ENABLE_CONFD: 98 utils.EnsureDaemon(constants.CONFD)
99
100 101 -def RunWatcherHooks():
102 """Run the watcher hooks. 103 104 """ 105 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 106 constants.HOOKS_NAME_WATCHER) 107 if not os.path.isdir(hooks_dir): 108 return 109 110 try: 111 results = utils.RunParts(hooks_dir) 112 except Exception, err: # pylint: disable=W0703 113 logging.exception("RunParts %s failed: %s", hooks_dir, err) 114 return 115 116 for (relname, status, runresult) in results: 117 if status == constants.RUNPARTS_SKIP: 118 logging.debug("Watcher hook %s: skipped", relname) 119 elif status == constants.RUNPARTS_ERR: 120 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 121 elif status == constants.RUNPARTS_RUN: 122 if runresult.failed: 123 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 124 relname, runresult.exit_code, runresult.output) 125 else: 126 logging.debug("Watcher hook %s: success (output: %s)", relname, 127 runresult.output) 128 else: 129 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 130 status)
131
132 133 -class Instance(object):
134 """Abstraction for a Virtual Machine instance. 135 136 """
137 - def __init__(self, name, status, autostart, snodes):
138 self.name = name 139 self.status = status 140 self.autostart = autostart 141 self.snodes = snodes
142
143 - def Restart(self, cl):
144 """Encapsulates the start of an instance. 145 146 """ 147 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 148 cli.SubmitOpCode(op, cl=cl)
149
150 - def ActivateDisks(self, cl):
151 """Encapsulates the activation of all disks of an instance. 152 153 """ 154 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 155 cli.SubmitOpCode(op, cl=cl)
156
157 158 -class Node:
159 """Data container representing cluster node. 160 161 """
162 - def __init__(self, name, bootid, offline, secondaries):
163 """Initializes this class. 164 165 """ 166 self.name = name 167 self.bootid = bootid 168 self.offline = offline 169 self.secondaries = secondaries
170
171 172 -def _CheckInstances(cl, notepad, instances):
173 """Make a pass over the list of instances, restarting downed ones. 174 175 """ 176 notepad.MaintainInstanceList(instances.keys()) 177 178 started = set() 179 180 for inst in instances.values(): 181 if inst.status in BAD_STATES: 182 n = notepad.NumberOfRestartAttempts(inst.name) 183 184 if n > MAXTRIES: 185 logging.warning("Not restarting instance '%s', retries exhausted", 186 inst.name) 187 continue 188 189 if n == MAXTRIES: 190 notepad.RecordRestartAttempt(inst.name) 191 logging.error("Could not restart instance '%s' after %s attempts," 192 " giving up", inst.name, MAXTRIES) 193 continue 194 195 try: 196 logging.info("Restarting instance '%s' (attempt #%s)", 197 inst.name, n + 1) 198 inst.Restart(cl) 199 except Exception: # pylint: disable=W0703 200 logging.exception("Error while restarting instance '%s'", inst.name) 201 else: 202 started.add(inst.name) 203 204 notepad.RecordRestartAttempt(inst.name) 205 206 else: 207 if notepad.NumberOfRestartAttempts(inst.name): 208 notepad.RemoveInstance(inst.name) 209 if inst.status not in HELPLESS_STATES: 210 logging.info("Restart of instance '%s' succeeded", inst.name) 211 212 return started
213
214 215 -def _CheckDisks(cl, notepad, nodes, instances, started):
216 """Check all nodes for restarted ones. 217 218 """ 219 check_nodes = [] 220 221 for node in nodes.values(): 222 old = notepad.GetNodeBootID(node.name) 223 if not node.bootid: 224 # Bad node, not returning a boot id 225 if not node.offline: 226 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 227 node.name) 228 continue 229 230 if old != node.bootid: 231 # Node's boot ID has changed, probably through a reboot 232 check_nodes.append(node) 233 234 if check_nodes: 235 # Activate disks for all instances with any of the checked nodes as a 236 # secondary node. 237 for node in check_nodes: 238 for instance_name in node.secondaries: 239 try: 240 inst = instances[instance_name] 241 except KeyError: 242 logging.info("Can't find instance '%s', maybe it was ignored", 243 instance_name) 244 continue 245 246 if not inst.autostart: 247 logging.info("Skipping disk activation for non-autostart" 248 " instance '%s'", inst.name) 249 continue 250 251 if inst.name in started: 252 # we already tried to start the instance, which should have 253 # activated its drives (if they can be at all) 254 logging.debug("Skipping disk activation for instance '%s' as" 255 " it was already started", inst.name) 256 continue 257 258 try: 259 logging.info("Activating disks for instance '%s'", inst.name) 260 inst.ActivateDisks(cl) 261 except Exception: # pylint: disable=W0703 262 logging.exception("Error while activating disks for instance '%s'", 263 inst.name) 264 265 # Keep changed boot IDs 266 for node in check_nodes: 267 notepad.SetNodeBootID(node.name, node.bootid)
268
269 270 -def _CheckForOfflineNodes(nodes, instance):
271 """Checks if given instances has any secondary in offline status. 272 273 @param instance: The instance object 274 @return: True if any of the secondary is offline, False otherwise 275 276 """ 277 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278
279 280 -def _VerifyDisks(cl, uuid, nodes, instances):
281 """Run a per-group "gnt-cluster verify-disks". 282 283 """ 284 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) 285 ((_, offline_disk_instances, _), ) = \ 286 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 287 cl.ArchiveJob(job_id) 288 289 if not offline_disk_instances: 290 # nothing to do 291 logging.debug("Verify-disks reported no offline disks, nothing to do") 292 return 293 294 logging.debug("Will activate disks for instance(s) %s", 295 utils.CommaJoin(offline_disk_instances)) 296 297 # We submit only one job, and wait for it. Not optimal, but this puts less 298 # load on the job queue. 299 job = [] 300 for name in offline_disk_instances: 301 try: 302 inst = instances[name] 303 except KeyError: 304 logging.info("Can't find instance '%s', maybe it was ignored", name) 305 continue 306 307 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 308 logging.info("Skipping instance '%s' because it is in a helpless state" 309 " or has offline secondaries", name) 310 continue 311 312 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 313 314 if job: 315 job_id = cli.SendJob(job, cl=cl) 316 317 try: 318 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 319 except Exception: # pylint: disable=W0703 320 logging.exception("Error while activating disks")
321
322 323 -def IsRapiResponding(hostname):
324 """Connects to RAPI port and does a simple test. 325 326 Connects to RAPI port of hostname and does a simple test. At this time, the 327 test is GetVersion. 328 329 @type hostname: string 330 @param hostname: hostname of the node to connect to. 331 @rtype: bool 332 @return: Whether RAPI is working properly 333 334 """ 335 curl_config = rapi.client.GenericCurlConfig() 336 rapi_client = rapi.client.GanetiRapiClient(hostname, 337 curl_config_fn=curl_config) 338 try: 339 master_version = rapi_client.GetVersion() 340 except rapi.client.CertificateError, err: 341 logging.warning("RAPI certificate error: %s", err) 342 return False 343 except rapi.client.GanetiApiError, err: 344 logging.warning("RAPI error: %s", err) 345 return False 346 else: 347 logging.debug("Reported RAPI version %s", master_version) 348 return master_version == constants.RAPI_VERSION
349
350 351 -def ParseOptions():
352 """Parse the command line options. 353 354 @return: (options, args) as from OptionParser.parse_args() 355 356 """ 357 parser = OptionParser(description="Ganeti cluster watcher", 358 usage="%prog [-d]", 359 version="%%prog (ganeti) %s" % 360 constants.RELEASE_VERSION) 361 362 parser.add_option(cli.DEBUG_OPT) 363 parser.add_option(cli.NODEGROUP_OPT) 364 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 365 help="Autoarchive jobs older than this age (default" 366 " 6 hours)") 367 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 368 action="store_true", help="Ignore cluster pause setting") 369 parser.add_option("--wait-children", dest="wait_children", 370 action="store_true", help="Wait for child processes") 371 parser.add_option("--no-wait-children", dest="wait_children", 372 action="store_false", 373 help="Don't wait for child processes") 374 # See optparse documentation for why default values are not set by options 375 parser.set_defaults(wait_children=True) 376 options, args = parser.parse_args() 377 options.job_age = cli.ParseTimespec(options.job_age) 378 379 if args: 380 parser.error("No arguments expected") 381 382 return (options, args)
383
384 385 -def _WriteInstanceStatus(filename, data):
386 """Writes the per-group instance status file. 387 388 The entries are sorted. 389 390 @type filename: string 391 @param filename: Path to instance status file 392 @type data: list of tuple; (instance name as string, status as string) 393 @param data: Instance name and status 394 395 """ 396 logging.debug("Updating instance status file '%s' with %s instances", 397 filename, len(data)) 398 399 utils.WriteFile(filename, 400 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 401 sorted(data))))
402
403 404 -def _UpdateInstanceStatus(filename, instances):
405 """Writes an instance status file from L{Instance} objects. 406 407 @type filename: string 408 @param filename: Path to status file 409 @type instances: list of L{Instance} 410 411 """ 412 _WriteInstanceStatus(filename, [(inst.name, inst.status) 413 for inst in instances])
414
415 416 -def _ReadInstanceStatus(filename):
417 """Reads an instance status file. 418 419 @type filename: string 420 @param filename: Path to status file 421 @rtype: tuple; (None or number, list of lists containing instance name and 422 status) 423 @return: File's mtime and instance status contained in the file; mtime is 424 C{None} if file can't be read 425 426 """ 427 logging.debug("Reading per-group instance status from '%s'", filename) 428 429 statcb = utils.FileStatHelper() 430 try: 431 content = utils.ReadFile(filename, preread=statcb) 432 except EnvironmentError, err: 433 if err.errno == errno.ENOENT: 434 logging.error("Can't read '%s', does not exist (yet)", filename) 435 else: 436 logging.exception("Unable to read '%s', ignoring", filename) 437 return (None, None) 438 else: 439 return (statcb.st.st_mtime, [line.split(None, 1) 440 for line in content.splitlines()])
441
442 443 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
444 """Merges all per-group instance status files into a global one. 445 446 @type filename: string 447 @param filename: Path to global instance status file 448 @type pergroup_filename: string 449 @param pergroup_filename: Path to per-group status files, must contain "%s" 450 to be replaced with group UUID 451 @type groups: sequence 452 @param groups: UUIDs of known groups 453 454 """ 455 # Lock global status file in exclusive mode 456 lock = utils.FileLock.Open(filename) 457 try: 458 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 459 except errors.LockError, err: 460 # All per-group processes will lock and update the file. None of them 461 # should take longer than 10 seconds (the value of 462 # INSTANCE_STATUS_LOCK_TIMEOUT). 463 logging.error("Can't acquire lock on instance status file '%s', not" 464 " updating: %s", filename, err) 465 return 466 467 logging.debug("Acquired exclusive lock on '%s'", filename) 468 469 data = {} 470 471 # Load instance status from all groups 472 for group_uuid in groups: 473 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 474 475 if mtime is not None: 476 for (instance_name, status) in instdata: 477 data.setdefault(instance_name, []).append((mtime, status)) 478 479 # Select last update based on file mtime 480 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 481 for (instance_name, status) in data.items()] 482 483 # Write the global status file. Don't touch file after it's been 484 # updated--there is no lock anymore. 485 _WriteInstanceStatus(filename, inststatus)
486
487 488 -def GetLuxiClient(try_restart):
489 """Tries to connect to the master daemon. 490 491 @type try_restart: bool 492 @param try_restart: Whether to attempt to restart the master daemon 493 494 """ 495 try: 496 return cli.GetClient() 497 except errors.OpPrereqError, err: 498 # this is, from cli.GetClient, a not-master case 499 raise NotMasterError("Not on master node (%s)" % err) 500 501 except luxi.NoMasterError, err: 502 if not try_restart: 503 raise 504 505 logging.warning("Master daemon seems to be down (%s), trying to restart", 506 err) 507 508 if not utils.EnsureDaemon(constants.MASTERD): 509 raise errors.GenericError("Can't start the master daemon") 510 511 # Retry the connection 512 return cli.GetClient()
513
514 515 -def _StartGroupChildren(cl, wait):
516 """Starts a new instance of the watcher for every node group. 517 518 """ 519 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 520 for arg in sys.argv) 521 522 result = cl.QueryGroups([], ["name", "uuid"], False) 523 524 children = [] 525 526 for (idx, (name, uuid)) in enumerate(result): 527 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 528 529 if idx > 0: 530 # Let's not kill the system 531 time.sleep(CHILD_PROCESS_DELAY) 532 533 logging.debug("Spawning child for group '%s' (%s), arguments %s", 534 name, uuid, args) 535 536 try: 537 # TODO: Should utils.StartDaemon be used instead? 538 pid = os.spawnv(os.P_NOWAIT, args[0], args) 539 except Exception: # pylint: disable=W0703 540 logging.exception("Failed to start child for group '%s' (%s)", 541 name, uuid) 542 else: 543 logging.debug("Started with PID %s", pid) 544 children.append(pid) 545 546 if wait: 547 for pid in children: 548 logging.debug("Waiting for child PID %s", pid) 549 try: 550 result = utils.RetryOnSignal(os.waitpid, pid, 0) 551 except EnvironmentError, err: 552 result = str(err) 553 554 logging.debug("Child PID %s exited with status %s", pid, result)
555
556 557 -def _ArchiveJobs(cl, age):
558 """Archives old jobs. 559 560 """ 561 (arch_count, left_count) = cl.AutoArchiveJobs(age) 562 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
563
564 565 -def _CheckMaster(cl):
566 """Ensures current host is master node. 567 568 """ 569 (master, ) = cl.QueryConfigValues(["master_node"]) 570 if master != netutils.Hostname.GetSysName(): 571 raise NotMasterError("This is not the master node")
572
573 574 @UsesRapiClient 575 -def _GlobalWatcher(opts):
576 """Main function for global watcher. 577 578 At the end child processes are spawned for every node group. 579 580 """ 581 StartNodeDaemons() 582 RunWatcherHooks() 583 584 # Run node maintenance in all cases, even if master, so that old masters can 585 # be properly cleaned up 586 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 587 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 588 589 try: 590 client = GetLuxiClient(True) 591 except NotMasterError: 592 # Don't proceed on non-master nodes 593 return constants.EXIT_SUCCESS 594 595 # we are on master now 596 utils.EnsureDaemon(constants.RAPI) 597 598 # If RAPI isn't responding to queries, try one restart 599 logging.debug("Attempting to talk to remote API on %s", 600 constants.IP4_ADDRESS_LOCALHOST) 601 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 602 logging.warning("Couldn't get answer from remote API, restaring daemon") 603 utils.StopDaemon(constants.RAPI) 604 utils.EnsureDaemon(constants.RAPI) 605 logging.debug("Second attempt to talk to remote API") 606 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 607 logging.fatal("RAPI is not responding") 608 logging.debug("Successfully talked to remote API") 609 610 _CheckMaster(client) 611 _ArchiveJobs(client, opts.job_age) 612 613 # Spawn child processes for all node groups 614 _StartGroupChildren(client, opts.wait_children) 615 616 return constants.EXIT_SUCCESS
617
618 619 -def _GetGroupData(cl, uuid):
620 """Retrieves instances and nodes per node group. 621 622 """ 623 job = [ 624 # Get all primary instances in group 625 opcodes.OpQuery(what=constants.QR_INSTANCE, 626 fields=["name", "status", "admin_state", "snodes", 627 "pnode.group.uuid", "snodes.group.uuid"], 628 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 629 use_locking=True), 630 631 # Get all nodes in group 632 opcodes.OpQuery(what=constants.QR_NODE, 633 fields=["name", "bootid", "offline"], 634 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid], 635 use_locking=True), 636 ] 637 638 job_id = cl.SubmitJob(job) 639 results = map(objects.QueryResponse.FromDict, 640 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 641 cl.ArchiveJob(job_id) 642 643 results_data = map(operator.attrgetter("data"), results) 644 645 # Ensure results are tuples with two values 646 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 647 648 # Extract values ignoring result status 649 (raw_instances, raw_nodes) = [[map(compat.snd, values) 650 for values in res] 651 for res in results_data] 652 653 secondaries = {} 654 instances = [] 655 656 # Load all instances 657 for (name, status, autostart, snodes, pnode_group_uuid, 658 snodes_group_uuid) in raw_instances: 659 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 660 logging.error("Ignoring split instance '%s', primary group %s, secondary" 661 " groups %s", name, pnode_group_uuid, 662 utils.CommaJoin(snodes_group_uuid)) 663 else: 664 instances.append(Instance(name, status, autostart, snodes)) 665 666 for node in snodes: 667 secondaries.setdefault(node, set()).add(name) 668 669 # Load all nodes 670 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 671 for (name, bootid, offline) in raw_nodes] 672 673 return (dict((node.name, node) for node in nodes), 674 dict((inst.name, inst) for inst in instances))
675
676 677 -def _LoadKnownGroups():
678 """Returns a list of all node groups known by L{ssconf}. 679 680 """ 681 groups = ssconf.SimpleStore().GetNodegroupList() 682 683 result = list(line.split(None, 1)[0] for line in groups 684 if line.strip()) 685 686 if not compat.all(map(utils.UUID_RE.match, result)): 687 raise errors.GenericError("Ssconf contains invalid group UUID") 688 689 return result
690
691 692 -def _GroupWatcher(opts):
693 """Main function for per-group watcher process. 694 695 """ 696 group_uuid = opts.nodegroup.lower() 697 698 if not utils.UUID_RE.match(group_uuid): 699 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 700 " got '%s'" % 701 (cli.NODEGROUP_OPT_NAME, group_uuid)) 702 703 logging.info("Watcher for node group '%s'", group_uuid) 704 705 known_groups = _LoadKnownGroups() 706 707 # Check if node group is known 708 if group_uuid not in known_groups: 709 raise errors.GenericError("Node group '%s' is not known by ssconf" % 710 group_uuid) 711 712 # Group UUID has been verified and should not contain any dangerous 713 # characters 714 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 715 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 716 717 logging.debug("Using state file %s", state_path) 718 719 # Global watcher 720 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 721 if not statefile: 722 return constants.EXIT_FAILURE 723 724 notepad = state.WatcherState(statefile) # pylint: disable=E0602 725 try: 726 # Connect to master daemon 727 client = GetLuxiClient(False) 728 729 _CheckMaster(client) 730 731 (nodes, instances) = _GetGroupData(client, group_uuid) 732 733 # Update per-group instance status file 734 _UpdateInstanceStatus(inst_status_path, instances.values()) 735 736 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 737 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 738 known_groups) 739 740 started = _CheckInstances(client, notepad, instances) 741 _CheckDisks(client, notepad, nodes, instances, started) 742 _VerifyDisks(client, group_uuid, nodes, instances) 743 except Exception, err: 744 logging.info("Not updating status file due to failure: %s", err) 745 raise 746 else: 747 # Save changes for next run 748 notepad.Save(state_path) 749 750 return constants.EXIT_SUCCESS
751
752 753 -def Main():
754 """Main function. 755 756 """ 757 (options, _) = ParseOptions() 758 759 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 760 debug=options.debug, stderr_logging=options.debug) 761 762 if ShouldPause() and not options.ignore_pause: 763 logging.debug("Pause has been set, exiting") 764 return constants.EXIT_SUCCESS 765 766 # Try to acquire global watcher lock in shared mode 767 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 768 try: 769 lock.Shared(blocking=False) 770 except (EnvironmentError, errors.LockError), err: 771 logging.error("Can't acquire lock on %s: %s", 772 pathutils.WATCHER_LOCK_FILE, err) 773 return constants.EXIT_SUCCESS 774 775 if options.nodegroup is None: 776 fn = _GlobalWatcher 777 else: 778 # Per-nodegroup watcher 779 fn = _GroupWatcher 780 781 try: 782 return fn(options) 783 except (SystemExit, KeyboardInterrupt): 784 raise 785 except NotMasterError: 786 logging.debug("Not master, exiting") 787 return constants.EXIT_NOTMASTER 788 except errors.ResolverError, err: 789 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 790 return constants.EXIT_NODESETUP_ERROR 791 except errors.JobQueueFull: 792 logging.error("Job queue is full, can't query cluster state") 793 except errors.JobQueueDrainError: 794 logging.error("Job queue is drained, can't maintain cluster state") 795 except Exception, err: 796 logging.exception(str(err)) 797 return constants.EXIT_FAILURE 798 799 return constants.EXIT_SUCCESS
800