Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Tool to restart erroneously downed virtual machines. 
 32   
 33  This program and set of classes implement a watchdog to restart 
 34  virtual machines in a Ganeti cluster that have crashed or been killed 
 35  by a node reboot.  Run from cron or similar. 
 36   
 37  """ 
 38   
 39  import os 
 40  import os.path 
 41  import sys 
 42  import time 
 43  import logging 
 44  import operator 
 45  import errno 
 46  from optparse import OptionParser 
 47   
 48  from ganeti import utils 
 49  from ganeti import constants 
 50  from ganeti import compat 
 51  from ganeti import errors 
 52  from ganeti import opcodes 
 53  from ganeti import cli 
 54  from ganeti import luxi 
 55  from ganeti import rapi 
 56  from ganeti import netutils 
 57  from ganeti import qlang 
 58  from ganeti import objects 
 59  from ganeti import ssconf 
 60  from ganeti import ht 
 61  from ganeti import pathutils 
 62   
 63  import ganeti.rapi.client # pylint: disable=W0611 
 64  from ganeti.rapi.client import UsesRapiClient 
 65   
 66  from ganeti.watcher import nodemaint 
 67  from ganeti.watcher import state 
 68   
 69   
 70  MAXTRIES = 5 
 71  BAD_STATES = compat.UniqueFrozenset([ 
 72    constants.INSTST_ERRORDOWN, 
 73    ]) 
 74  HELPLESS_STATES = compat.UniqueFrozenset([ 
 75    constants.INSTST_NODEDOWN, 
 76    constants.INSTST_NODEOFFLINE, 
 77    ]) 
 78  NOTICE = "NOTICE" 
 79  ERROR = "ERROR" 
 80   
 81  #: Number of seconds to wait between starting child processes for node groups 
 82  CHILD_PROCESS_DELAY = 1.0 
 83   
 84  #: How many seconds to wait for instance status file lock 
 85  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
86 87 88 -class NotMasterError(errors.GenericError):
89 """Exception raised when this host is not the master."""
90
91 92 -def ShouldPause():
93 """Check whether we should pause. 94 95 """ 96 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
97
98 99 -def StartNodeDaemons():
100 """Start all the daemons that should be running on all nodes. 101 102 """ 103 # on master or not, try to start the node daemon 104 utils.EnsureDaemon(constants.NODED) 105 # start confd as well. On non candidates it will be in disabled mode. 106 if constants.ENABLE_CONFD: 107 utils.EnsureDaemon(constants.CONFD) 108 # start mond as well: all nodes need monitoring 109 if constants.ENABLE_MOND: 110 utils.EnsureDaemon(constants.MOND)
111
112 113 -def RunWatcherHooks():
114 """Run the watcher hooks. 115 116 """ 117 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 118 constants.HOOKS_NAME_WATCHER) 119 if not os.path.isdir(hooks_dir): 120 return 121 122 try: 123 results = utils.RunParts(hooks_dir) 124 except Exception, err: # pylint: disable=W0703 125 logging.exception("RunParts %s failed: %s", hooks_dir, err) 126 return 127 128 for (relname, status, runresult) in results: 129 if status == constants.RUNPARTS_SKIP: 130 logging.debug("Watcher hook %s: skipped", relname) 131 elif status == constants.RUNPARTS_ERR: 132 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 133 elif status == constants.RUNPARTS_RUN: 134 if runresult.failed: 135 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 136 relname, runresult.exit_code, runresult.output) 137 else: 138 logging.debug("Watcher hook %s: success (output: %s)", relname, 139 runresult.output) 140 else: 141 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 142 status)
143
144 145 -class Instance(object):
146 """Abstraction for a Virtual Machine instance. 147 148 """
149 - def __init__(self, name, status, disks_active, snodes):
150 self.name = name 151 self.status = status 152 self.disks_active = disks_active 153 self.snodes = snodes
154
155 - def Restart(self, cl):
156 """Encapsulates the start of an instance. 157 158 """ 159 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 160 cli.SubmitOpCode(op, cl=cl)
161
162 - def ActivateDisks(self, cl):
163 """Encapsulates the activation of all disks of an instance. 164 165 """ 166 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 167 cli.SubmitOpCode(op, cl=cl)
168
169 170 -class Node(object):
171 """Data container representing cluster node. 172 173 """
174 - def __init__(self, name, bootid, offline, secondaries):
175 """Initializes this class. 176 177 """ 178 self.name = name 179 self.bootid = bootid 180 self.offline = offline 181 self.secondaries = secondaries
182
183 184 -def _CheckInstances(cl, notepad, instances):
185 """Make a pass over the list of instances, restarting downed ones. 186 187 """ 188 notepad.MaintainInstanceList(instances.keys()) 189 190 started = set() 191 192 for inst in instances.values(): 193 if inst.status in BAD_STATES: 194 n = notepad.NumberOfRestartAttempts(inst.name) 195 196 if n > MAXTRIES: 197 logging.warning("Not restarting instance '%s', retries exhausted", 198 inst.name) 199 continue 200 201 if n == MAXTRIES: 202 notepad.RecordRestartAttempt(inst.name) 203 logging.error("Could not restart instance '%s' after %s attempts," 204 " giving up", inst.name, MAXTRIES) 205 continue 206 207 try: 208 logging.info("Restarting instance '%s' (attempt #%s)", 209 inst.name, n + 1) 210 inst.Restart(cl) 211 except Exception: # pylint: disable=W0703 212 logging.exception("Error while restarting instance '%s'", inst.name) 213 else: 214 started.add(inst.name) 215 216 notepad.RecordRestartAttempt(inst.name) 217 218 else: 219 if notepad.NumberOfRestartAttempts(inst.name): 220 notepad.RemoveInstance(inst.name) 221 if inst.status not in HELPLESS_STATES: 222 logging.info("Restart of instance '%s' succeeded", inst.name) 223 224 return started
225
226 227 -def _CheckDisks(cl, notepad, nodes, instances, started):
228 """Check all nodes for restarted ones. 229 230 """ 231 check_nodes = [] 232 233 for node in nodes.values(): 234 old = notepad.GetNodeBootID(node.name) 235 if not node.bootid: 236 # Bad node, not returning a boot id 237 if not node.offline: 238 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 239 node.name) 240 continue 241 242 if old != node.bootid: 243 # Node's boot ID has changed, probably through a reboot 244 check_nodes.append(node) 245 246 if check_nodes: 247 # Activate disks for all instances with any of the checked nodes as a 248 # secondary node. 249 for node in check_nodes: 250 for instance_name in node.secondaries: 251 try: 252 inst = instances[instance_name] 253 except KeyError: 254 logging.info("Can't find instance '%s', maybe it was ignored", 255 instance_name) 256 continue 257 258 if not inst.disks_active: 259 logging.info("Skipping disk activation for instance with not" 260 " activated disks '%s'", inst.name) 261 continue 262 263 if inst.name in started: 264 # we already tried to start the instance, which should have 265 # activated its drives (if they can be at all) 266 logging.debug("Skipping disk activation for instance '%s' as" 267 " it was already started", inst.name) 268 continue 269 270 try: 271 logging.info("Activating disks for instance '%s'", inst.name) 272 inst.ActivateDisks(cl) 273 except Exception: # pylint: disable=W0703 274 logging.exception("Error while activating disks for instance '%s'", 275 inst.name) 276 277 # Keep changed boot IDs 278 for node in check_nodes: 279 notepad.SetNodeBootID(node.name, node.bootid)
280
281 282 -def _CheckForOfflineNodes(nodes, instance):
283 """Checks if given instances has any secondary in offline status. 284 285 @param instance: The instance object 286 @return: True if any of the secondary is offline, False otherwise 287 288 """ 289 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
290
291 292 -def _VerifyDisks(cl, uuid, nodes, instances):
293 """Run a per-group "gnt-cluster verify-disks". 294 295 """ 296 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks( 297 group_name=uuid, priority=constants.OP_PRIO_LOW)]) 298 ((_, offline_disk_instances, _), ) = \ 299 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 300 cl.ArchiveJob(job_id) 301 302 if not offline_disk_instances: 303 # nothing to do 304 logging.debug("Verify-disks reported no offline disks, nothing to do") 305 return 306 307 logging.debug("Will activate disks for instance(s) %s", 308 utils.CommaJoin(offline_disk_instances)) 309 310 # We submit only one job, and wait for it. Not optimal, but this puts less 311 # load on the job queue. 312 job = [] 313 for name in offline_disk_instances: 314 try: 315 inst = instances[name] 316 except KeyError: 317 logging.info("Can't find instance '%s', maybe it was ignored", name) 318 continue 319 320 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 321 logging.info("Skipping instance '%s' because it is in a helpless state" 322 " or has offline secondaries", name) 323 continue 324 325 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 326 327 if job: 328 job_id = cli.SendJob(job, cl=cl) 329 330 try: 331 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 332 except Exception: # pylint: disable=W0703 333 logging.exception("Error while activating disks")
334
335 336 -def IsRapiResponding(hostname):
337 """Connects to RAPI port and does a simple test. 338 339 Connects to RAPI port of hostname and does a simple test. At this time, the 340 test is GetVersion. 341 342 If RAPI responds with error code "401 Unauthorized", the test is successful, 343 because the aim of this function is to assess whether RAPI is responding, not 344 if it is accessible. 345 346 @type hostname: string 347 @param hostname: hostname of the node to connect to. 348 @rtype: bool 349 @return: Whether RAPI is working properly 350 351 """ 352 curl_config = rapi.client.GenericCurlConfig() 353 rapi_client = rapi.client.GanetiRapiClient(hostname, 354 curl_config_fn=curl_config) 355 try: 356 master_version = rapi_client.GetVersion() 357 except rapi.client.CertificateError, err: 358 logging.warning("RAPI certificate error: %s", err) 359 return False 360 except rapi.client.GanetiApiError, err: 361 if err.code == 401: 362 # Unauthorized, but RAPI is alive and responding 363 return True 364 else: 365 logging.warning("RAPI error: %s", err) 366 return False 367 else: 368 logging.debug("Reported RAPI version %s", master_version) 369 return master_version == constants.RAPI_VERSION
370
371 372 -def ParseOptions():
373 """Parse the command line options. 374 375 @return: (options, args) as from OptionParser.parse_args() 376 377 """ 378 parser = OptionParser(description="Ganeti cluster watcher", 379 usage="%prog [-d]", 380 version="%%prog (ganeti) %s" % 381 constants.RELEASE_VERSION) 382 383 parser.add_option(cli.DEBUG_OPT) 384 parser.add_option(cli.NODEGROUP_OPT) 385 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 386 help="Autoarchive jobs older than this age (default" 387 " 6 hours)") 388 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 389 action="store_true", help="Ignore cluster pause setting") 390 parser.add_option("--wait-children", dest="wait_children", 391 action="store_true", help="Wait for child processes") 392 parser.add_option("--no-wait-children", dest="wait_children", 393 action="store_false", 394 help="Don't wait for child processes") 395 # See optparse documentation for why default values are not set by options 396 parser.set_defaults(wait_children=True) 397 options, args = parser.parse_args() 398 options.job_age = cli.ParseTimespec(options.job_age) 399 400 if args: 401 parser.error("No arguments expected") 402 403 return (options, args)
404
405 406 -def _WriteInstanceStatus(filename, data):
407 """Writes the per-group instance status file. 408 409 The entries are sorted. 410 411 @type filename: string 412 @param filename: Path to instance status file 413 @type data: list of tuple; (instance name as string, status as string) 414 @param data: Instance name and status 415 416 """ 417 logging.debug("Updating instance status file '%s' with %s instances", 418 filename, len(data)) 419 420 utils.WriteFile(filename, 421 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 422 sorted(data))))
423
424 425 -def _UpdateInstanceStatus(filename, instances):
426 """Writes an instance status file from L{Instance} objects. 427 428 @type filename: string 429 @param filename: Path to status file 430 @type instances: list of L{Instance} 431 432 """ 433 _WriteInstanceStatus(filename, [(inst.name, inst.status) 434 for inst in instances])
435
436 437 -def _ReadInstanceStatus(filename):
438 """Reads an instance status file. 439 440 @type filename: string 441 @param filename: Path to status file 442 @rtype: tuple; (None or number, list of lists containing instance name and 443 status) 444 @return: File's mtime and instance status contained in the file; mtime is 445 C{None} if file can't be read 446 447 """ 448 logging.debug("Reading per-group instance status from '%s'", filename) 449 450 statcb = utils.FileStatHelper() 451 try: 452 content = utils.ReadFile(filename, preread=statcb) 453 except EnvironmentError, err: 454 if err.errno == errno.ENOENT: 455 logging.error("Can't read '%s', does not exist (yet)", filename) 456 else: 457 logging.exception("Unable to read '%s', ignoring", filename) 458 return (None, None) 459 else: 460 return (statcb.st.st_mtime, [line.split(None, 1) 461 for line in content.splitlines()])
462
463 464 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
465 """Merges all per-group instance status files into a global one. 466 467 @type filename: string 468 @param filename: Path to global instance status file 469 @type pergroup_filename: string 470 @param pergroup_filename: Path to per-group status files, must contain "%s" 471 to be replaced with group UUID 472 @type groups: sequence 473 @param groups: UUIDs of known groups 474 475 """ 476 # Lock global status file in exclusive mode 477 lock = utils.FileLock.Open(filename) 478 try: 479 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 480 except errors.LockError, err: 481 # All per-group processes will lock and update the file. None of them 482 # should take longer than 10 seconds (the value of 483 # INSTANCE_STATUS_LOCK_TIMEOUT). 484 logging.error("Can't acquire lock on instance status file '%s', not" 485 " updating: %s", filename, err) 486 return 487 488 logging.debug("Acquired exclusive lock on '%s'", filename) 489 490 data = {} 491 492 # Load instance status from all groups 493 for group_uuid in groups: 494 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 495 496 if mtime is not None: 497 for (instance_name, status) in instdata: 498 data.setdefault(instance_name, []).append((mtime, status)) 499 500 # Select last update based on file mtime 501 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 502 for (instance_name, status) in data.items()] 503 504 # Write the global status file. Don't touch file after it's been 505 # updated--there is no lock anymore. 506 _WriteInstanceStatus(filename, inststatus)
507
508 509 -def GetLuxiClient(try_restart):
510 """Tries to connect to the master daemon. 511 512 @type try_restart: bool 513 @param try_restart: Whether to attempt to restart the master daemon 514 515 """ 516 try: 517 return cli.GetClient() 518 except errors.OpPrereqError, err: 519 # this is, from cli.GetClient, a not-master case 520 raise NotMasterError("Not on master node (%s)" % err) 521 522 except luxi.NoMasterError, err: 523 if not try_restart: 524 raise 525 526 logging.warning("Master daemon seems to be down (%s), trying to restart", 527 err) 528 529 if not utils.EnsureDaemon(constants.MASTERD): 530 raise errors.GenericError("Can't start the master daemon") 531 532 # Retry the connection 533 return cli.GetClient()
534
535 536 -def _StartGroupChildren(cl, wait):
537 """Starts a new instance of the watcher for every node group. 538 539 """ 540 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 541 for arg in sys.argv) 542 543 result = cl.QueryGroups([], ["name", "uuid"], False) 544 545 children = [] 546 547 for (idx, (name, uuid)) in enumerate(result): 548 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 549 550 if idx > 0: 551 # Let's not kill the system 552 time.sleep(CHILD_PROCESS_DELAY) 553 554 logging.debug("Spawning child for group '%s' (%s), arguments %s", 555 name, uuid, args) 556 557 try: 558 # TODO: Should utils.StartDaemon be used instead? 559 pid = os.spawnv(os.P_NOWAIT, args[0], args) 560 except Exception: # pylint: disable=W0703 561 logging.exception("Failed to start child for group '%s' (%s)", 562 name, uuid) 563 else: 564 logging.debug("Started with PID %s", pid) 565 children.append(pid) 566 567 if wait: 568 for pid in children: 569 logging.debug("Waiting for child PID %s", pid) 570 try: 571 result = utils.RetryOnSignal(os.waitpid, pid, 0) 572 except EnvironmentError, err: 573 result = str(err) 574 575 logging.debug("Child PID %s exited with status %s", pid, result)
576
577 578 -def _ArchiveJobs(cl, age):
579 """Archives old jobs. 580 581 """ 582 (arch_count, left_count) = cl.AutoArchiveJobs(age) 583 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
584
585 586 -def _CheckMaster(cl):
587 """Ensures current host is master node. 588 589 """ 590 (master, ) = cl.QueryConfigValues(["master_node"]) 591 if master != netutils.Hostname.GetSysName(): 592 raise NotMasterError("This is not the master node")
593
594 595 @UsesRapiClient 596 -def _GlobalWatcher(opts):
597 """Main function for global watcher. 598 599 At the end child processes are spawned for every node group. 600 601 """ 602 StartNodeDaemons() 603 RunWatcherHooks() 604 605 # Run node maintenance in all cases, even if master, so that old masters can 606 # be properly cleaned up 607 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 608 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 609 610 try: 611 client = GetLuxiClient(True) 612 except NotMasterError: 613 # Don't proceed on non-master nodes 614 return constants.EXIT_SUCCESS 615 616 # we are on master now 617 utils.EnsureDaemon(constants.RAPI) 618 619 # If RAPI isn't responding to queries, try one restart 620 logging.debug("Attempting to talk to remote API on %s", 621 constants.IP4_ADDRESS_LOCALHOST) 622 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 623 logging.warning("Couldn't get answer from remote API, restaring daemon") 624 utils.StopDaemon(constants.RAPI) 625 utils.EnsureDaemon(constants.RAPI) 626 logging.debug("Second attempt to talk to remote API") 627 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 628 logging.fatal("RAPI is not responding") 629 logging.debug("Successfully talked to remote API") 630 631 _CheckMaster(client) 632 _ArchiveJobs(client, opts.job_age) 633 634 # Spawn child processes for all node groups 635 _StartGroupChildren(client, opts.wait_children) 636 637 return constants.EXIT_SUCCESS
638
639 640 -def _GetGroupData(cl, uuid):
641 """Retrieves instances and nodes per node group. 642 643 """ 644 job = [ 645 # Get all primary instances in group 646 opcodes.OpQuery(what=constants.QR_INSTANCE, 647 fields=["name", "status", "disks_active", "snodes", 648 "pnode.group.uuid", "snodes.group.uuid"], 649 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid], 650 use_locking=True, 651 priority=constants.OP_PRIO_LOW), 652 653 # Get all nodes in group 654 opcodes.OpQuery(what=constants.QR_NODE, 655 fields=["name", "bootid", "offline"], 656 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid], 657 use_locking=True, 658 priority=constants.OP_PRIO_LOW), 659 ] 660 661 job_id = cl.SubmitJob(job) 662 results = map(objects.QueryResponse.FromDict, 663 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) 664 cl.ArchiveJob(job_id) 665 666 results_data = map(operator.attrgetter("data"), results) 667 668 # Ensure results are tuples with two values 669 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 670 671 # Extract values ignoring result status 672 (raw_instances, raw_nodes) = [[map(compat.snd, values) 673 for values in res] 674 for res in results_data] 675 676 secondaries = {} 677 instances = [] 678 679 # Load all instances 680 for (name, status, disks_active, snodes, pnode_group_uuid, 681 snodes_group_uuid) in raw_instances: 682 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 683 logging.error("Ignoring split instance '%s', primary group %s, secondary" 684 " groups %s", name, pnode_group_uuid, 685 utils.CommaJoin(snodes_group_uuid)) 686 else: 687 instances.append(Instance(name, status, disks_active, snodes)) 688 689 for node in snodes: 690 secondaries.setdefault(node, set()).add(name) 691 692 # Load all nodes 693 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 694 for (name, bootid, offline) in raw_nodes] 695 696 return (dict((node.name, node) for node in nodes), 697 dict((inst.name, inst) for inst in instances))
698
699 700 -def _LoadKnownGroups():
701 """Returns a list of all node groups known by L{ssconf}. 702 703 """ 704 groups = ssconf.SimpleStore().GetNodegroupList() 705 706 result = list(line.split(None, 1)[0] for line in groups 707 if line.strip()) 708 709 if not compat.all(map(utils.UUID_RE.match, result)): 710 raise errors.GenericError("Ssconf contains invalid group UUID") 711 712 return result
713
714 715 -def _GroupWatcher(opts):
716 """Main function for per-group watcher process. 717 718 """ 719 group_uuid = opts.nodegroup.lower() 720 721 if not utils.UUID_RE.match(group_uuid): 722 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 723 " got '%s'" % 724 (cli.NODEGROUP_OPT_NAME, group_uuid)) 725 726 logging.info("Watcher for node group '%s'", group_uuid) 727 728 known_groups = _LoadKnownGroups() 729 730 # Check if node group is known 731 if group_uuid not in known_groups: 732 raise errors.GenericError("Node group '%s' is not known by ssconf" % 733 group_uuid) 734 735 # Group UUID has been verified and should not contain any dangerous 736 # characters 737 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 738 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 739 740 logging.debug("Using state file %s", state_path) 741 742 # Global watcher 743 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 744 if not statefile: 745 return constants.EXIT_FAILURE 746 747 notepad = state.WatcherState(statefile) # pylint: disable=E0602 748 try: 749 # Connect to master daemon 750 client = GetLuxiClient(False) 751 752 _CheckMaster(client) 753 754 (nodes, instances) = _GetGroupData(client, group_uuid) 755 756 # Update per-group instance status file 757 _UpdateInstanceStatus(inst_status_path, instances.values()) 758 759 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 760 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 761 known_groups) 762 763 started = _CheckInstances(client, notepad, instances) 764 _CheckDisks(client, notepad, nodes, instances, started) 765 _VerifyDisks(client, group_uuid, nodes, instances) 766 except Exception, err: 767 logging.info("Not updating status file due to failure: %s", err) 768 raise 769 else: 770 # Save changes for next run 771 notepad.Save(state_path) 772 773 return constants.EXIT_SUCCESS
774
775 776 -def Main():
777 """Main function. 778 779 """ 780 (options, _) = ParseOptions() 781 782 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 783 debug=options.debug, stderr_logging=options.debug) 784 785 if ShouldPause() and not options.ignore_pause: 786 logging.debug("Pause has been set, exiting") 787 return constants.EXIT_SUCCESS 788 789 # Try to acquire global watcher lock in shared mode 790 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 791 try: 792 lock.Shared(blocking=False) 793 except (EnvironmentError, errors.LockError), err: 794 logging.error("Can't acquire lock on %s: %s", 795 pathutils.WATCHER_LOCK_FILE, err) 796 return constants.EXIT_SUCCESS 797 798 if options.nodegroup is None: 799 fn = _GlobalWatcher 800 else: 801 # Per-nodegroup watcher 802 fn = _GroupWatcher 803 804 try: 805 return fn(options) 806 except (SystemExit, KeyboardInterrupt): 807 raise 808 except NotMasterError: 809 logging.debug("Not master, exiting") 810 return constants.EXIT_NOTMASTER 811 except errors.ResolverError, err: 812 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 813 return constants.EXIT_NODESETUP_ERROR 814 except errors.JobQueueFull: 815 logging.error("Job queue is full, can't query cluster state") 816 except errors.JobQueueDrainError: 817 logging.error("Job queue is drained, can't maintain cluster state") 818 except Exception, err: 819 logging.exception(str(err)) 820 return constants.EXIT_FAILURE 821 822 return constants.EXIT_SUCCESS
823