Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Tool to restart erroneously downed virtual machines. 
 23   
 24  This program and set of classes implement a watchdog to restart 
 25  virtual machines in a Ganeti cluster that have crashed or been killed 
 26  by a node reboot.  Run from cron or similar. 
 27   
 28  """ 
 29   
 30  import os 
 31  import os.path 
 32  import sys 
 33  import time 
 34  import logging 
 35  import operator 
 36  import errno 
 37  from optparse import OptionParser 
 38   
 39  from ganeti import utils 
 40  from ganeti import constants 
 41  from ganeti import compat 
 42  from ganeti import errors 
 43  from ganeti import opcodes 
 44  from ganeti import cli 
 45  from ganeti import luxi 
 46  from ganeti import rapi 
 47  from ganeti import netutils 
 48  from ganeti import qlang 
 49  from ganeti import objects 
 50  from ganeti import ssconf 
 51  from ganeti import ht 
 52  from ganeti import pathutils 
 53   
 54  import ganeti.rapi.client # pylint: disable=W0611 
 55  from ganeti.rapi.client import UsesRapiClient 
 56   
 57  from ganeti.watcher import nodemaint 
 58  from ganeti.watcher import state 
 59   
 60   
 61  MAXTRIES = 5 
 62  BAD_STATES = compat.UniqueFrozenset([ 
 63    constants.INSTST_ERRORDOWN, 
 64    ]) 
 65  HELPLESS_STATES = compat.UniqueFrozenset([ 
 66    constants.INSTST_NODEDOWN, 
 67    constants.INSTST_NODEOFFLINE, 
 68    ]) 
 69  NOTICE = "NOTICE" 
 70  ERROR = "ERROR" 
 71   
 72  #: Number of seconds to wait between starting child processes for node groups 
 73  CHILD_PROCESS_DELAY = 1.0 
 74   
 75  #: How many seconds to wait for instance status file lock 
 76  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
77 78 79 -class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
81
82 83 -def ShouldPause():
84 """Check whether we should pause. 85 86 """ 87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89 90 -def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes. 92 93 """ 94 # on master or not, try to start the node daemon 95 utils.EnsureDaemon(constants.NODED) 96 # start confd as well. On non candidates it will be in disabled mode. 97 if constants.ENABLE_CONFD: 98 utils.EnsureDaemon(constants.CONFD) 99 # start mond as well: all nodes need monitoring 100 if constants.ENABLE_MOND: 101 utils.EnsureDaemon(constants.MOND)
102
103 104 -def RunWatcherHooks():
105 """Run the watcher hooks. 106 107 """ 108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 109 constants.HOOKS_NAME_WATCHER) 110 if not os.path.isdir(hooks_dir): 111 return 112 113 try: 114 results = utils.RunParts(hooks_dir) 115 except Exception, err: # pylint: disable=W0703 116 logging.exception("RunParts %s failed: %s", hooks_dir, err) 117 return 118 119 for (relname, status, runresult) in results: 120 if status == constants.RUNPARTS_SKIP: 121 logging.debug("Watcher hook %s: skipped", relname) 122 elif status == constants.RUNPARTS_ERR: 123 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 124 elif status == constants.RUNPARTS_RUN: 125 if runresult.failed: 126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 127 relname, runresult.exit_code, runresult.output) 128 else: 129 logging.debug("Watcher hook %s: success (output: %s)", relname, 130 runresult.output) 131 else: 132 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 133 status)
134
135 136 -class Instance(object):
137 """Abstraction for a Virtual Machine instance. 138 139 """
140 - def __init__(self, name, status, disks_active, snodes):
141 self.name = name 142 self.status = status 143 self.disks_active = disks_active 144 self.snodes = snodes
145
146 - def Restart(self, cl):
147 """Encapsulates the start of an instance. 148 149 """ 150 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 151 cli.SubmitOpCode(op, cl=cl)
152
153 - def ActivateDisks(self, cl):
154 """Encapsulates the activation of all disks of an instance. 155 156 """ 157 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 158 cli.SubmitOpCode(op, cl=cl)
159
160 161 -class Node:
162 """Data container representing cluster node. 163 164 """
165 - def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class. 167 168 """ 169 self.name = name 170 self.bootid = bootid 171 self.offline = offline 172 self.secondaries = secondaries
173
174 175 -def _CheckInstances(cl, notepad, instances):
176 """Make a pass over the list of instances, restarting downed ones. 177 178 """ 179 notepad.MaintainInstanceList(instances.keys()) 180 181 started = set() 182 183 for inst in instances.values(): 184 if inst.status in BAD_STATES: 185 n = notepad.NumberOfRestartAttempts(inst.name) 186 187 if n > MAXTRIES: 188 logging.warning("Not restarting instance '%s', retries exhausted", 189 inst.name) 190 continue 191 192 if n == MAXTRIES: 193 notepad.RecordRestartAttempt(inst.name) 194 logging.error("Could not restart instance '%s' after %s attempts," 195 " giving up", inst.name, MAXTRIES) 196 continue 197 198 try: 199 logging.info("Restarting instance '%s' (attempt #%s)", 200 inst.name, n + 1) 201 inst.Restart(cl) 202 except Exception: # pylint: disable=W0703 203 logging.exception("Error while restarting instance '%s'", inst.name) 204 else: 205 started.add(inst.name) 206 207 notepad.RecordRestartAttempt(inst.name) 208 209 else: 210 if notepad.NumberOfRestartAttempts(inst.name): 211 notepad.RemoveInstance(inst.name) 212 if inst.status not in HELPLESS_STATES: 213 logging.info("Restart of instance '%s' succeeded", inst.name) 214 215 return started
216
217 218 -def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones. 220 221 """ 222 check_nodes = [] 223 224 for node in nodes.values(): 225 old = notepad.GetNodeBootID(node.name) 226 if not node.bootid: 227 # Bad node, not returning a boot id 228 if not node.offline: 229 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 230 node.name) 231 continue 232 233 if old != node.bootid: 234 # Node's boot ID has changed, probably through a reboot 235 check_nodes.append(node) 236 237 if check_nodes: 238 # Activate disks for all instances with any of the checked nodes as a 239 # secondary node. 240 for node in check_nodes: 241 for instance_name in node.secondaries: 242 try: 243 inst = instances[instance_name] 244 except KeyError: 245 logging.info("Can't find instance '%s', maybe it was ignored", 246 instance_name) 247 continue 248 249 if not inst.disks_active: 250 logging.info("Skipping disk activation for instance with not" 251 " activated disks '%s'", inst.name) 252 continue 253 254 if inst.name in started: 255 # we already tried to start the instance, which should have 256 # activated its drives (if they can be at all) 257 logging.debug("Skipping disk activation for instance '%s' as" 258 " it was already started", inst.name) 259 continue 260 261 try: 262 logging.info("Activating disks for instance '%s'", inst.name) 263 inst.ActivateDisks(cl) 264 except Exception: # pylint: disable=W0703 265 logging.exception("Error while activating disks for instance '%s'", 266 inst.name) 267 268 # Keep changed boot IDs 269 for node in check_nodes: 270 notepad.SetNodeBootID(node.name, node.bootid)
271
272 273 -def _CheckForOfflineNodes(nodes, instance):
274 """Checks if given instances has any secondary in offline status. 275 276 @param instance: The instance object 277 @return: True if any of the secondary is offline, False otherwise 278 279 """ 280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
282 283 -def _VerifyDisks(cl, uuid, nodes, instances):
284 """Run a per-group "gnt-cluster verify-disks". 285 286 """ 287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks( 288 group_name=uuid, priority=constants.OP_PRIO_LOW)]) 289 ((_, offline_disk_instances, _), ) = \ 290 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 291 cl.ArchiveJob(job_id) 292 293 if not offline_disk_instances: 294 # nothing to do 295 logging.debug("Verify-disks reported no offline disks, nothing to do") 296 return 297 298 logging.debug("Will activate disks for instance(s) %s", 299 utils.CommaJoin(offline_disk_instances)) 300 301 # We submit only one job, and wait for it. Not optimal, but this puts less 302 # load on the job queue. 303 job = [] 304 for name in offline_disk_instances: 305 try: 306 inst = instances[name] 307 except KeyError: 308 logging.info("Can't find instance '%s', maybe it was ignored", name) 309 continue 310 311 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 312 logging.info("Skipping instance '%s' because it is in a helpless state" 313 " or has offline secondaries", name) 314 continue 315 316 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 317 318 if job: 319 job_id = cli.SendJob(job, cl=cl) 320 321 try: 322 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 323 except Exception: # pylint: disable=W0703 324 logging.exception("Error while activating disks")
325
326 327 -def IsRapiResponding(hostname):
328 """Connects to RAPI port and does a simple test. 329 330 Connects to RAPI port of hostname and does a simple test. At this time, the 331 test is GetVersion. 332 333 If RAPI responds with error code "401 Unauthorized", the test is successful, 334 because the aim of this function is to assess whether RAPI is responding, not 335 if it is accessible. 336 337 @type hostname: string 338 @param hostname: hostname of the node to connect to. 339 @rtype: bool 340 @return: Whether RAPI is working properly 341 342 """ 343 curl_config = rapi.client.GenericCurlConfig() 344 rapi_client = rapi.client.GanetiRapiClient(hostname, 345 curl_config_fn=curl_config) 346 try: 347 master_version = rapi_client.GetVersion() 348 except rapi.client.CertificateError, err: 349 logging.warning("RAPI certificate error: %s", err) 350 return False 351 except rapi.client.GanetiApiError, err: 352 if err.code == 401: 353 # Unauthorized, but RAPI is alive and responding 354 return True 355 else: 356 logging.warning("RAPI error: %s", err) 357 return False 358 else: 359 logging.debug("Reported RAPI version %s", master_version) 360 return master_version == constants.RAPI_VERSION
361
362 363 -def ParseOptions():
364 """Parse the command line options. 365 366 @return: (options, args) as from OptionParser.parse_args() 367 368 """ 369 parser = OptionParser(description="Ganeti cluster watcher", 370 usage="%prog [-d]", 371 version="%%prog (ganeti) %s" % 372 constants.RELEASE_VERSION) 373 374 parser.add_option(cli.DEBUG_OPT) 375 parser.add_option(cli.NODEGROUP_OPT) 376 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 377 help="Autoarchive jobs older than this age (default" 378 " 6 hours)") 379 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 380 action="store_true", help="Ignore cluster pause setting") 381 parser.add_option("--wait-children", dest="wait_children", 382 action="store_true", help="Wait for child processes") 383 parser.add_option("--no-wait-children", dest="wait_children", 384 action="store_false", 385 help="Don't wait for child processes") 386 # See optparse documentation for why default values are not set by options 387 parser.set_defaults(wait_children=True) 388 options, args = parser.parse_args() 389 options.job_age = cli.ParseTimespec(options.job_age) 390 391 if args: 392 parser.error("No arguments expected") 393 394 return (options, args)
395
396 397 -def _WriteInstanceStatus(filename, data):
398 """Writes the per-group instance status file. 399 400 The entries are sorted. 401 402 @type filename: string 403 @param filename: Path to instance status file 404 @type data: list of tuple; (instance name as string, status as string) 405 @param data: Instance name and status 406 407 """ 408 logging.debug("Updating instance status file '%s' with %s instances", 409 filename, len(data)) 410 411 utils.WriteFile(filename, 412 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 413 sorted(data))))
414
415 416 -def _UpdateInstanceStatus(filename, instances):
417 """Writes an instance status file from L{Instance} objects. 418 419 @type filename: string 420 @param filename: Path to status file 421 @type instances: list of L{Instance} 422 423 """ 424 _WriteInstanceStatus(filename, [(inst.name, inst.status) 425 for inst in instances])
426
427 428 -def _ReadInstanceStatus(filename):
429 """Reads an instance status file. 430 431 @type filename: string 432 @param filename: Path to status file 433 @rtype: tuple; (None or number, list of lists containing instance name and 434 status) 435 @return: File's mtime and instance status contained in the file; mtime is 436 C{None} if file can't be read 437 438 """ 439 logging.debug("Reading per-group instance status from '%s'", filename) 440 441 statcb = utils.FileStatHelper() 442 try: 443 content = utils.ReadFile(filename, preread=statcb) 444 except EnvironmentError, err: 445 if err.errno == errno.ENOENT: 446 logging.error("Can't read '%s', does not exist (yet)", filename) 447 else: 448 logging.exception("Unable to read '%s', ignoring", filename) 449 return (None, None) 450 else: 451 return (statcb.st.st_mtime, [line.split(None, 1) 452 for line in content.splitlines()])
453
454 455 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
456 """Merges all per-group instance status files into a global one. 457 458 @type filename: string 459 @param filename: Path to global instance status file 460 @type pergroup_filename: string 461 @param pergroup_filename: Path to per-group status files, must contain "%s" 462 to be replaced with group UUID 463 @type groups: sequence 464 @param groups: UUIDs of known groups 465 466 """ 467 # Lock global status file in exclusive mode 468 lock = utils.FileLock.Open(filename) 469 try: 470 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 471 except errors.LockError, err: 472 # All per-group processes will lock and update the file. None of them 473 # should take longer than 10 seconds (the value of 474 # INSTANCE_STATUS_LOCK_TIMEOUT). 475 logging.error("Can't acquire lock on instance status file '%s', not" 476 " updating: %s", filename, err) 477 return 478 479 logging.debug("Acquired exclusive lock on '%s'", filename) 480 481 data = {} 482 483 # Load instance status from all groups 484 for group_uuid in groups: 485 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 486 487 if mtime is not None: 488 for (instance_name, status) in instdata: 489 data.setdefault(instance_name, []).append((mtime, status)) 490 491 # Select last update based on file mtime 492 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 493 for (instance_name, status) in data.items()] 494 495 # Write the global status file. Don't touch file after it's been 496 # updated--there is no lock anymore. 497 _WriteInstanceStatus(filename, inststatus)
498
499 500 -def GetLuxiClient(try_restart):
501 """Tries to connect to the master daemon. 502 503 @type try_restart: bool 504 @param try_restart: Whether to attempt to restart the master daemon 505 506 """ 507 try: 508 return cli.GetClient() 509 except errors.OpPrereqError, err: 510 # this is, from cli.GetClient, a not-master case 511 raise NotMasterError("Not on master node (%s)" % err) 512 513 except luxi.NoMasterError, err: 514 if not try_restart: 515 raise 516 517 logging.warning("Master daemon seems to be down (%s), trying to restart", 518 err) 519 520 if not utils.EnsureDaemon(constants.MASTERD): 521 raise errors.GenericError("Can't start the master daemon") 522 523 # Retry the connection 524 return cli.GetClient()
525
526 527 -def _StartGroupChildren(cl, wait):
528 """Starts a new instance of the watcher for every node group. 529 530 """ 531 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 532 for arg in sys.argv) 533 534 result = cl.QueryGroups([], ["name", "uuid"], False) 535 536 children = [] 537 538 for (idx, (name, uuid)) in enumerate(result): 539 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 540 541 if idx > 0: 542 # Let's not kill the system 543 time.sleep(CHILD_PROCESS_DELAY) 544 545 logging.debug("Spawning child for group '%s' (%s), arguments %s", 546 name, uuid, args) 547 548 try: 549 # TODO: Should utils.StartDaemon be used instead? 550 pid = os.spawnv(os.P_NOWAIT, args[0], args) 551 except Exception: # pylint: disable=W0703 552 logging.exception("Failed to start child for group '%s' (%s)", 553 name, uuid) 554 else: 555 logging.debug("Started with PID %s", pid) 556 children.append(pid) 557 558 if wait: 559 for pid in children: 560 logging.debug("Waiting for child PID %s", pid) 561 try: 562 result = utils.RetryOnSignal(os.waitpid, pid, 0) 563 except EnvironmentError, err: 564 result = str(err) 565 566 logging.debug("Child PID %s exited with status %s", pid, result)
567
568 569 -def _ArchiveJobs(cl, age):
570 """Archives old jobs. 571 572 """ 573 (arch_count, left_count) = cl.AutoArchiveJobs(age) 574 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
575
576 577 -def _CheckMaster(cl):
578 """Ensures current host is master node. 579 580 """ 581 (master, ) = cl.QueryConfigValues(["master_node"]) 582 if master != netutils.Hostname.GetSysName(): 583 raise NotMasterError("This is not the master node")
584
585 586 @UsesRapiClient 587 -def _GlobalWatcher(opts):
588 """Main function for global watcher. 589 590 At the end child processes are spawned for every node group. 591 592 """ 593 StartNodeDaemons() 594 RunWatcherHooks() 595 596 # Run node maintenance in all cases, even if master, so that old masters can 597 # be properly cleaned up 598 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 599 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 600 601 try: 602 client = GetLuxiClient(True) 603 except NotMasterError: 604 # Don't proceed on non-master nodes 605 return constants.EXIT_SUCCESS 606 607 # we are on master now 608 utils.EnsureDaemon(constants.RAPI) 609 610 # If RAPI isn't responding to queries, try one restart 611 logging.debug("Attempting to talk to remote API on %s", 612 constants.IP4_ADDRESS_LOCALHOST) 613 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 614 logging.warning("Couldn't get answer from remote API, restaring daemon") 615 utils.StopDaemon(constants.RAPI) 616 utils.EnsureDaemon(constants.RAPI) 617 logging.debug("Second attempt to talk to remote API") 618 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 619 logging.fatal("RAPI is not responding") 620 logging.debug("Successfully talked to remote API") 621 622 _CheckMaster(client) 623 _ArchiveJobs(client, opts.job_age) 624 625 # Spawn child processes for all node groups 626 _StartGroupChildren(client, opts.wait_children) 627 628 return constants.EXIT_SUCCESS
629
630 631 -def _GetGroupData(cl, uuid):
632 """Retrieves instances and nodes per node group. 633 634 """ 635 job = [ 636 # Get all primary instances in group 637 opcodes.OpQuery(what=constants.QR_INSTANCE, 638 fields=["name", "status", "disks_active", "snodes", 639 "pnode.group.uuid", "snodes.group.uuid"], 640 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 641 use_locking=True, 642 priority=constants.OP_PRIO_LOW), 643 644 # Get all nodes in group 645 opcodes.OpQuery(what=constants.QR_NODE, 646 fields=["name", "bootid", "offline"], 647 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid], 648 use_locking=True, 649 priority=constants.OP_PRIO_LOW), 650 ] 651 652 job_id = cl.SubmitJob(job) 653 results = map(objects.QueryResponse.FromDict, 654 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 655 cl.ArchiveJob(job_id) 656 657 results_data = map(operator.attrgetter("data"), results) 658 659 # Ensure results are tuples with two values 660 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 661 662 # Extract values ignoring result status 663 (raw_instances, raw_nodes) = [[map(compat.snd, values) 664 for values in res] 665 for res in results_data] 666 667 secondaries = {} 668 instances = [] 669 670 # Load all instances 671 for (name, status, disks_active, snodes, pnode_group_uuid, 672 snodes_group_uuid) in raw_instances: 673 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 674 logging.error("Ignoring split instance '%s', primary group %s, secondary" 675 " groups %s", name, pnode_group_uuid, 676 utils.CommaJoin(snodes_group_uuid)) 677 else: 678 instances.append(Instance(name, status, disks_active, snodes)) 679 680 for node in snodes: 681 secondaries.setdefault(node, set()).add(name) 682 683 # Load all nodes 684 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 685 for (name, bootid, offline) in raw_nodes] 686 687 return (dict((node.name, node) for node in nodes), 688 dict((inst.name, inst) for inst in instances))
689
690 691 -def _LoadKnownGroups():
692 """Returns a list of all node groups known by L{ssconf}. 693 694 """ 695 groups = ssconf.SimpleStore().GetNodegroupList() 696 697 result = list(line.split(None, 1)[0] for line in groups 698 if line.strip()) 699 700 if not compat.all(map(utils.UUID_RE.match, result)): 701 raise errors.GenericError("Ssconf contains invalid group UUID") 702 703 return result
704
705 706 -def _GroupWatcher(opts):
707 """Main function for per-group watcher process. 708 709 """ 710 group_uuid = opts.nodegroup.lower() 711 712 if not utils.UUID_RE.match(group_uuid): 713 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 714 " got '%s'" % 715 (cli.NODEGROUP_OPT_NAME, group_uuid)) 716 717 logging.info("Watcher for node group '%s'", group_uuid) 718 719 known_groups = _LoadKnownGroups() 720 721 # Check if node group is known 722 if group_uuid not in known_groups: 723 raise errors.GenericError("Node group '%s' is not known by ssconf" % 724 group_uuid) 725 726 # Group UUID has been verified and should not contain any dangerous 727 # characters 728 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 729 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 730 731 logging.debug("Using state file %s", state_path) 732 733 # Global watcher 734 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 735 if not statefile: 736 return constants.EXIT_FAILURE 737 738 notepad = state.WatcherState(statefile) # pylint: disable=E0602 739 try: 740 # Connect to master daemon 741 client = GetLuxiClient(False) 742 743 _CheckMaster(client) 744 745 (nodes, instances) = _GetGroupData(client, group_uuid) 746 747 # Update per-group instance status file 748 _UpdateInstanceStatus(inst_status_path, instances.values()) 749 750 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 751 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 752 known_groups) 753 754 started = _CheckInstances(client, notepad, instances) 755 _CheckDisks(client, notepad, nodes, instances, started) 756 _VerifyDisks(client, group_uuid, nodes, instances) 757 except Exception, err: 758 logging.info("Not updating status file due to failure: %s", err) 759 raise 760 else: 761 # Save changes for next run 762 notepad.Save(state_path) 763 764 return constants.EXIT_SUCCESS
765
766 767 -def Main():
768 """Main function. 769 770 """ 771 (options, _) = ParseOptions() 772 773 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 774 debug=options.debug, stderr_logging=options.debug) 775 776 if ShouldPause() and not options.ignore_pause: 777 logging.debug("Pause has been set, exiting") 778 return constants.EXIT_SUCCESS 779 780 # Try to acquire global watcher lock in shared mode 781 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 782 try: 783 lock.Shared(blocking=False) 784 except (EnvironmentError, errors.LockError), err: 785 logging.error("Can't acquire lock on %s: %s", 786 pathutils.WATCHER_LOCK_FILE, err) 787 return constants.EXIT_SUCCESS 788 789 if options.nodegroup is None: 790 fn = _GlobalWatcher 791 else: 792 # Per-nodegroup watcher 793 fn = _GroupWatcher 794 795 try: 796 return fn(options) 797 except (SystemExit, KeyboardInterrupt): 798 raise 799 except NotMasterError: 800 logging.debug("Not master, exiting") 801 return constants.EXIT_NOTMASTER 802 except errors.ResolverError, err: 803 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 804 return constants.EXIT_NODESETUP_ERROR 805 except errors.JobQueueFull: 806 logging.error("Job queue is full, can't query cluster state") 807 except errors.JobQueueDrainError: 808 logging.error("Job queue is drained, can't maintain cluster state") 809 except Exception, err: 810 logging.exception(str(err)) 811 return constants.EXIT_FAILURE 812 813 return constants.EXIT_SUCCESS
814