Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  import os 
 31  import os.path 
 32  import sys 
 33  import time 
 34  import logging 
 35  import operator 
 36  import errno 
 37  from optparse import OptionParser 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import compat 
 42  from ganeti import errors 
 43  from ganeti import opcodes 
 44  from ganeti import cli 
 45  from ganeti import luxi 
 46  from ganeti import rapi 
 47  from ganeti import netutils 
 48  from ganeti import qlang 
 49  from ganeti import objects 
 50  from ganeti import ssconf 
 51  from ganeti import ht 
 52  from ganeti import pathutils 
 53   
 54  import ganeti.rapi.client # pylint: disable=W0611 
 55  from ganeti.rapi.client import UsesRapiClient 
 56   
 57  from ganeti.watcher import nodemaint 
 58  from ganeti.watcher import state 
 59   
 60   
 61  MAXTRIES = 5 
 62  BAD_STATES = compat.UniqueFrozenset([ 
 63    constants.INSTST_ERRORDOWN, 
 64    ]) 
 65  HELPLESS_STATES = compat.UniqueFrozenset([ 
 66    constants.INSTST_NODEDOWN, 
 67    constants.INSTST_NODEOFFLINE, 
 68    ]) 
 69  NOTICE = "NOTICE" 
 70  ERROR = "ERROR" 
 71   
 72  #: Number of seconds to wait between starting child processes for node groups 
 73  CHILD_PROCESS_DELAY = 1.0 
 74   
 75  #: How many seconds to wait for instance status file lock 
 76  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
77 78 79 -class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
81
82 83 -def ShouldPause():
84 """Check whether we should pause. 85 86 """ 87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89 90 -def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes. 92 93 """ 94 # on master or not, try to start the node daemon 95 utils.EnsureDaemon(constants.NODED) 96 # start confd as well. On non candidates it will be in disabled mode. 97 if constants.ENABLE_CONFD: 98 utils.EnsureDaemon(constants.CONFD) 99 # start mond as well: all nodes need monitoring 100 if constants.ENABLE_MOND: 101 utils.EnsureDaemon(constants.MOND)
102
103 104 -def RunWatcherHooks():
105 """Run the watcher hooks. 106 107 """ 108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 109 constants.HOOKS_NAME_WATCHER) 110 if not os.path.isdir(hooks_dir): 111 return 112 113 try: 114 results = utils.RunParts(hooks_dir) 115 except Exception, err: # pylint: disable=W0703 116 logging.exception("RunParts %s failed: %s", hooks_dir, err) 117 return 118 119 for (relname, status, runresult) in results: 120 if status == constants.RUNPARTS_SKIP: 121 logging.debug("Watcher hook %s: skipped", relname) 122 elif status == constants.RUNPARTS_ERR: 123 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 124 elif status == constants.RUNPARTS_RUN: 125 if runresult.failed: 126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 127 relname, runresult.exit_code, runresult.output) 128 else: 129 logging.debug("Watcher hook %s: success (output: %s)", relname, 130 runresult.output) 131 else: 132 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 133 status)
134
135 136 -class Instance(object):
137 """Abstraction for a Virtual Machine instance. 138 139 """
140 - def __init__(self, name, status, disks_active, snodes):
141 self.name = name 142 self.status = status 143 self.disks_active = disks_active 144 self.snodes = snodes
145
146 - def Restart(self, cl):
147 """Encapsulates the start of an instance. 148 149 """ 150 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 151 cli.SubmitOpCode(op, cl=cl)
152
153 - def ActivateDisks(self, cl):
154 """Encapsulates the activation of all disks of an instance. 155 156 """ 157 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 158 cli.SubmitOpCode(op, cl=cl)
159
160 161 -class Node:
162 """Data container representing cluster node. 163 164 """
165 - def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class. 167 168 """ 169 self.name = name 170 self.bootid = bootid 171 self.offline = offline 172 self.secondaries = secondaries
173
174 175 -def _CheckInstances(cl, notepad, instances):
176 """Make a pass over the list of instances, restarting downed ones. 177 178 """ 179 notepad.MaintainInstanceList(instances.keys()) 180 181 started = set() 182 183 for inst in instances.values(): 184 if inst.status in BAD_STATES: 185 n = notepad.NumberOfRestartAttempts(inst.name) 186 187 if n > MAXTRIES: 188 logging.warning("Not restarting instance '%s', retries exhausted", 189 inst.name) 190 continue 191 192 if n == MAXTRIES: 193 notepad.RecordRestartAttempt(inst.name) 194 logging.error("Could not restart instance '%s' after %s attempts," 195 " giving up", inst.name, MAXTRIES) 196 continue 197 198 try: 199 logging.info("Restarting instance '%s' (attempt #%s)", 200 inst.name, n + 1) 201 inst.Restart(cl) 202 except Exception: # pylint: disable=W0703 203 logging.exception("Error while restarting instance '%s'", inst.name) 204 else: 205 started.add(inst.name) 206 207 notepad.RecordRestartAttempt(inst.name) 208 209 else: 210 if notepad.NumberOfRestartAttempts(inst.name): 211 notepad.RemoveInstance(inst.name) 212 if inst.status not in HELPLESS_STATES: 213 logging.info("Restart of instance '%s' succeeded", inst.name) 214 215 return started
216
217 218 -def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones. 220 221 """ 222 check_nodes = [] 223 224 for node in nodes.values(): 225 old = notepad.GetNodeBootID(node.name) 226 if not node.bootid: 227 # Bad node, not returning a boot id 228 if not node.offline: 229 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 230 node.name) 231 continue 232 233 if old != node.bootid: 234 # Node's boot ID has changed, probably through a reboot 235 check_nodes.append(node) 236 237 if check_nodes: 238 # Activate disks for all instances with any of the checked nodes as a 239 # secondary node. 240 for node in check_nodes: 241 for instance_name in node.secondaries: 242 try: 243 inst = instances[instance_name] 244 except KeyError: 245 logging.info("Can't find instance '%s', maybe it was ignored", 246 instance_name) 247 continue 248 249 if not inst.disks_active: 250 logging.info("Skipping disk activation for instance with not" 251 " activated disks '%s'", inst.name) 252 continue 253 254 if inst.name in started: 255 # we already tried to start the instance, which should have 256 # activated its drives (if they can be at all) 257 logging.debug("Skipping disk activation for instance '%s' as" 258 " it was already started", inst.name) 259 continue 260 261 try: 262 logging.info("Activating disks for instance '%s'", inst.name) 263 inst.ActivateDisks(cl) 264 except Exception: # pylint: disable=W0703 265 logging.exception("Error while activating disks for instance '%s'", 266 inst.name) 267 268 # Keep changed boot IDs 269 for node in check_nodes: 270 notepad.SetNodeBootID(node.name, node.bootid)
271
272 273 -def _CheckForOfflineNodes(nodes, instance):
274 """Checks if given instances has any secondary in offline status. 275 276 @param instance: The instance object 277 @return: True if any of the secondary is offline, False otherwise 278 279 """ 280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
282 283 -def _VerifyDisks(cl, uuid, nodes, instances):
284 """Run a per-group "gnt-cluster verify-disks". 285 286 """ 287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) 288 ((_, offline_disk_instances, _), ) = \ 289 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 290 cl.ArchiveJob(job_id) 291 292 if not offline_disk_instances: 293 # nothing to do 294 logging.debug("Verify-disks reported no offline disks, nothing to do") 295 return 296 297 logging.debug("Will activate disks for instance(s) %s", 298 utils.CommaJoin(offline_disk_instances)) 299 300 # We submit only one job, and wait for it. Not optimal, but this puts less 301 # load on the job queue. 302 job = [] 303 for name in offline_disk_instances: 304 try: 305 inst = instances[name] 306 except KeyError: 307 logging.info("Can't find instance '%s', maybe it was ignored", name) 308 continue 309 310 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 311 logging.info("Skipping instance '%s' because it is in a helpless state" 312 " or has offline secondaries", name) 313 continue 314 315 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 316 317 if job: 318 job_id = cli.SendJob(job, cl=cl) 319 320 try: 321 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 322 except Exception: # pylint: disable=W0703 323 logging.exception("Error while activating disks")
324
325 326 -def IsRapiResponding(hostname):
327 """Connects to RAPI port and does a simple test. 328 329 Connects to RAPI port of hostname and does a simple test. At this time, the 330 test is GetVersion. 331 332 If RAPI responds with error code "401 Unauthorized", the test is successful, 333 because the aim of this function is to assess whether RAPI is responding, not 334 if it is accessible. 335 336 @type hostname: string 337 @param hostname: hostname of the node to connect to. 338 @rtype: bool 339 @return: Whether RAPI is working properly 340 341 """ 342 curl_config = rapi.client.GenericCurlConfig() 343 rapi_client = rapi.client.GanetiRapiClient(hostname, 344 curl_config_fn=curl_config) 345 try: 346 master_version = rapi_client.GetVersion() 347 except rapi.client.CertificateError, err: 348 logging.warning("RAPI certificate error: %s", err) 349 return False 350 except rapi.client.GanetiApiError, err: 351 if err.code == 401: 352 # Unauthorized, but RAPI is alive and responding 353 return True 354 else: 355 logging.warning("RAPI error: %s", err) 356 return False 357 else: 358 logging.debug("Reported RAPI version %s", master_version) 359 return master_version == constants.RAPI_VERSION
360
361 362 -def ParseOptions():
363 """Parse the command line options. 364 365 @return: (options, args) as from OptionParser.parse_args() 366 367 """ 368 parser = OptionParser(description="Ganeti cluster watcher", 369 usage="%prog [-d]", 370 version="%%prog (ganeti) %s" % 371 constants.RELEASE_VERSION) 372 373 parser.add_option(cli.DEBUG_OPT) 374 parser.add_option(cli.NODEGROUP_OPT) 375 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 376 help="Autoarchive jobs older than this age (default" 377 " 6 hours)") 378 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 379 action="store_true", help="Ignore cluster pause setting") 380 parser.add_option("--wait-children", dest="wait_children", 381 action="store_true", help="Wait for child processes") 382 parser.add_option("--no-wait-children", dest="wait_children", 383 action="store_false", 384 help="Don't wait for child processes") 385 # See optparse documentation for why default values are not set by options 386 parser.set_defaults(wait_children=True) 387 options, args = parser.parse_args() 388 options.job_age = cli.ParseTimespec(options.job_age) 389 390 if args: 391 parser.error("No arguments expected") 392 393 return (options, args)
394
395 396 -def _WriteInstanceStatus(filename, data):
397 """Writes the per-group instance status file. 398 399 The entries are sorted. 400 401 @type filename: string 402 @param filename: Path to instance status file 403 @type data: list of tuple; (instance name as string, status as string) 404 @param data: Instance name and status 405 406 """ 407 logging.debug("Updating instance status file '%s' with %s instances", 408 filename, len(data)) 409 410 utils.WriteFile(filename, 411 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 412 sorted(data))))
413
414 415 -def _UpdateInstanceStatus(filename, instances):
416 """Writes an instance status file from L{Instance} objects. 417 418 @type filename: string 419 @param filename: Path to status file 420 @type instances: list of L{Instance} 421 422 """ 423 _WriteInstanceStatus(filename, [(inst.name, inst.status) 424 for inst in instances])
425
426 427 -def _ReadInstanceStatus(filename):
428 """Reads an instance status file. 429 430 @type filename: string 431 @param filename: Path to status file 432 @rtype: tuple; (None or number, list of lists containing instance name and 433 status) 434 @return: File's mtime and instance status contained in the file; mtime is 435 C{None} if file can't be read 436 437 """ 438 logging.debug("Reading per-group instance status from '%s'", filename) 439 440 statcb = utils.FileStatHelper() 441 try: 442 content = utils.ReadFile(filename, preread=statcb) 443 except EnvironmentError, err: 444 if err.errno == errno.ENOENT: 445 logging.error("Can't read '%s', does not exist (yet)", filename) 446 else: 447 logging.exception("Unable to read '%s', ignoring", filename) 448 return (None, None) 449 else: 450 return (statcb.st.st_mtime, [line.split(None, 1) 451 for line in content.splitlines()])
452
453 454 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
455 """Merges all per-group instance status files into a global one. 456 457 @type filename: string 458 @param filename: Path to global instance status file 459 @type pergroup_filename: string 460 @param pergroup_filename: Path to per-group status files, must contain "%s" 461 to be replaced with group UUID 462 @type groups: sequence 463 @param groups: UUIDs of known groups 464 465 """ 466 # Lock global status file in exclusive mode 467 lock = utils.FileLock.Open(filename) 468 try: 469 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 470 except errors.LockError, err: 471 # All per-group processes will lock and update the file. None of them 472 # should take longer than 10 seconds (the value of 473 # INSTANCE_STATUS_LOCK_TIMEOUT). 474 logging.error("Can't acquire lock on instance status file '%s', not" 475 " updating: %s", filename, err) 476 return 477 478 logging.debug("Acquired exclusive lock on '%s'", filename) 479 480 data = {} 481 482 # Load instance status from all groups 483 for group_uuid in groups: 484 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 485 486 if mtime is not None: 487 for (instance_name, status) in instdata: 488 data.setdefault(instance_name, []).append((mtime, status)) 489 490 # Select last update based on file mtime 491 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 492 for (instance_name, status) in data.items()] 493 494 # Write the global status file. Don't touch file after it's been 495 # updated--there is no lock anymore. 496 _WriteInstanceStatus(filename, inststatus)
497
498 499 -def GetLuxiClient(try_restart):
500 """Tries to connect to the master daemon. 501 502 @type try_restart: bool 503 @param try_restart: Whether to attempt to restart the master daemon 504 505 """ 506 try: 507 return cli.GetClient() 508 except errors.OpPrereqError, err: 509 # this is, from cli.GetClient, a not-master case 510 raise NotMasterError("Not on master node (%s)" % err) 511 512 except luxi.NoMasterError, err: 513 if not try_restart: 514 raise 515 516 logging.warning("Master daemon seems to be down (%s), trying to restart", 517 err) 518 519 if not utils.EnsureDaemon(constants.MASTERD): 520 raise errors.GenericError("Can't start the master daemon") 521 522 # Retry the connection 523 return cli.GetClient()
524
525 526 -def _StartGroupChildren(cl, wait):
527 """Starts a new instance of the watcher for every node group. 528 529 """ 530 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 531 for arg in sys.argv) 532 533 result = cl.QueryGroups([], ["name", "uuid"], False) 534 535 children = [] 536 537 for (idx, (name, uuid)) in enumerate(result): 538 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 539 540 if idx > 0: 541 # Let's not kill the system 542 time.sleep(CHILD_PROCESS_DELAY) 543 544 logging.debug("Spawning child for group '%s' (%s), arguments %s", 545 name, uuid, args) 546 547 try: 548 # TODO: Should utils.StartDaemon be used instead? 549 pid = os.spawnv(os.P_NOWAIT, args[0], args) 550 except Exception: # pylint: disable=W0703 551 logging.exception("Failed to start child for group '%s' (%s)", 552 name, uuid) 553 else: 554 logging.debug("Started with PID %s", pid) 555 children.append(pid) 556 557 if wait: 558 for pid in children: 559 logging.debug("Waiting for child PID %s", pid) 560 try: 561 result = utils.RetryOnSignal(os.waitpid, pid, 0) 562 except EnvironmentError, err: 563 result = str(err) 564 565 logging.debug("Child PID %s exited with status %s", pid, result)
566
567 568 -def _ArchiveJobs(cl, age):
569 """Archives old jobs. 570 571 """ 572 (arch_count, left_count) = cl.AutoArchiveJobs(age) 573 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
574
575 576 -def _CheckMaster(cl):
577 """Ensures current host is master node. 578 579 """ 580 (master, ) = cl.QueryConfigValues(["master_node"]) 581 if master != netutils.Hostname.GetSysName(): 582 raise NotMasterError("This is not the master node")
583
584 585 @UsesRapiClient 586 -def _GlobalWatcher(opts):
587 """Main function for global watcher. 588 589 At the end child processes are spawned for every node group. 590 591 """ 592 StartNodeDaemons() 593 RunWatcherHooks() 594 595 # Run node maintenance in all cases, even if master, so that old masters can 596 # be properly cleaned up 597 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 598 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 599 600 try: 601 client = GetLuxiClient(True) 602 except NotMasterError: 603 # Don't proceed on non-master nodes 604 return constants.EXIT_SUCCESS 605 606 # we are on master now 607 utils.EnsureDaemon(constants.RAPI) 608 609 # If RAPI isn't responding to queries, try one restart 610 logging.debug("Attempting to talk to remote API on %s", 611 constants.IP4_ADDRESS_LOCALHOST) 612 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 613 logging.warning("Couldn't get answer from remote API, restaring daemon") 614 utils.StopDaemon(constants.RAPI) 615 utils.EnsureDaemon(constants.RAPI) 616 logging.debug("Second attempt to talk to remote API") 617 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 618 logging.fatal("RAPI is not responding") 619 logging.debug("Successfully talked to remote API") 620 621 _CheckMaster(client) 622 _ArchiveJobs(client, opts.job_age) 623 624 # Spawn child processes for all node groups 625 _StartGroupChildren(client, opts.wait_children) 626 627 return constants.EXIT_SUCCESS
628
629 630 -def _GetGroupData(cl, uuid):
631 """Retrieves instances and nodes per node group. 632 633 """ 634 job = [ 635 # Get all primary instances in group 636 opcodes.OpQuery(what=constants.QR_INSTANCE, 637 fields=["name", "status", "disks_active", "snodes", 638 "pnode.group.uuid", "snodes.group.uuid"], 639 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 640 use_locking=True), 641 642 # Get all nodes in group 643 opcodes.OpQuery(what=constants.QR_NODE, 644 fields=["name", "bootid", "offline"], 645 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid], 646 use_locking=True), 647 ] 648 649 job_id = cl.SubmitJob(job) 650 results = map(objects.QueryResponse.FromDict, 651 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 652 cl.ArchiveJob(job_id) 653 654 results_data = map(operator.attrgetter("data"), results) 655 656 # Ensure results are tuples with two values 657 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 658 659 # Extract values ignoring result status 660 (raw_instances, raw_nodes) = [[map(compat.snd, values) 661 for values in res] 662 for res in results_data] 663 664 secondaries = {} 665 instances = [] 666 667 # Load all instances 668 for (name, status, disks_active, snodes, pnode_group_uuid, 669 snodes_group_uuid) in raw_instances: 670 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 671 logging.error("Ignoring split instance '%s', primary group %s, secondary" 672 " groups %s", name, pnode_group_uuid, 673 utils.CommaJoin(snodes_group_uuid)) 674 else: 675 instances.append(Instance(name, status, disks_active, snodes)) 676 677 for node in snodes: 678 secondaries.setdefault(node, set()).add(name) 679 680 # Load all nodes 681 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 682 for (name, bootid, offline) in raw_nodes] 683 684 return (dict((node.name, node) for node in nodes), 685 dict((inst.name, inst) for inst in instances))
686
687 688 -def _LoadKnownGroups():
689 """Returns a list of all node groups known by L{ssconf}. 690 691 """ 692 groups = ssconf.SimpleStore().GetNodegroupList() 693 694 result = list(line.split(None, 1)[0] for line in groups 695 if line.strip()) 696 697 if not compat.all(map(utils.UUID_RE.match, result)): 698 raise errors.GenericError("Ssconf contains invalid group UUID") 699 700 return result
701
702 703 -def _GroupWatcher(opts):
704 """Main function for per-group watcher process. 705 706 """ 707 group_uuid = opts.nodegroup.lower() 708 709 if not utils.UUID_RE.match(group_uuid): 710 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 711 " got '%s'" % 712 (cli.NODEGROUP_OPT_NAME, group_uuid)) 713 714 logging.info("Watcher for node group '%s'", group_uuid) 715 716 known_groups = _LoadKnownGroups() 717 718 # Check if node group is known 719 if group_uuid not in known_groups: 720 raise errors.GenericError("Node group '%s' is not known by ssconf" % 721 group_uuid) 722 723 # Group UUID has been verified and should not contain any dangerous 724 # characters 725 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 726 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 727 728 logging.debug("Using state file %s", state_path) 729 730 # Global watcher 731 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 732 if not statefile: 733 return constants.EXIT_FAILURE 734 735 notepad = state.WatcherState(statefile) # pylint: disable=E0602 736 try: 737 # Connect to master daemon 738 client = GetLuxiClient(False) 739 740 _CheckMaster(client) 741 742 (nodes, instances) = _GetGroupData(client, group_uuid) 743 744 # Update per-group instance status file 745 _UpdateInstanceStatus(inst_status_path, instances.values()) 746 747 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 748 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 749 known_groups) 750 751 started = _CheckInstances(client, notepad, instances) 752 _CheckDisks(client, notepad, nodes, instances, started) 753 _VerifyDisks(client, group_uuid, nodes, instances) 754 except Exception, err: 755 logging.info("Not updating status file due to failure: %s", err) 756 raise 757 else: 758 # Save changes for next run 759 notepad.Save(state_path) 760 761 return constants.EXIT_SUCCESS
762
763 764 -def Main():
765 """Main function. 766 767 """ 768 (options, _) = ParseOptions() 769 770 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 771 debug=options.debug, stderr_logging=options.debug) 772 773 if ShouldPause() and not options.ignore_pause: 774 logging.debug("Pause has been set, exiting") 775 return constants.EXIT_SUCCESS 776 777 # Try to acquire global watcher lock in shared mode 778 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 779 try: 780 lock.Shared(blocking=False) 781 except (EnvironmentError, errors.LockError), err: 782 logging.error("Can't acquire lock on %s: %s", 783 pathutils.WATCHER_LOCK_FILE, err) 784 return constants.EXIT_SUCCESS 785 786 if options.nodegroup is None: 787 fn = _GlobalWatcher 788 else: 789 # Per-nodegroup watcher 790 fn = _GroupWatcher 791 792 try: 793 return fn(options) 794 except (SystemExit, KeyboardInterrupt): 795 raise 796 except NotMasterError: 797 logging.debug("Not master, exiting") 798 return constants.EXIT_NOTMASTER 799 except errors.ResolverError, err: 800 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 801 return constants.EXIT_NODESETUP_ERROR 802 except errors.JobQueueFull: 803 logging.error("Job queue is full, can't query cluster state") 804 except errors.JobQueueDrainError: 805 logging.error("Job queue is drained, can't maintain cluster state") 806 except Exception, err: 807 logging.exception(str(err)) 808 return constants.EXIT_FAILURE 809 810 return constants.EXIT_SUCCESS
811