Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Tool to restart erroneously downed virtual machines. 
 32   
 33  This program and set of classes implement a watchdog to restart 
 34  virtual machines in a Ganeti cluster that have crashed or been killed 
 35  by a node reboot.  Run from cron or similar. 
 36   
 37  """ 
 38   
 39  import os 
 40  import os.path 
 41  import sys 
 42  import time 
 43  import logging 
 44  import operator 
 45  import errno 
 46  from optparse import OptionParser 
 47   
 48  from ganeti import utils 
 49  from ganeti import wconfd 
 50  from ganeti import constants 
 51  from ganeti import compat 
 52  from ganeti import errors 
 53  from ganeti import opcodes 
 54  from ganeti import cli 
 55  import ganeti.rpc.node as rpc 
 56  import ganeti.rpc.errors as rpcerr 
 57  from ganeti import rapi 
 58  from ganeti import netutils 
 59  from ganeti import qlang 
 60  from ganeti import objects 
 61  from ganeti import ssconf 
 62  from ganeti import ht 
 63  from ganeti import pathutils 
 64   
 65  import ganeti.rapi.client # pylint: disable=W0611 
 66  from ganeti.rapi.client import UsesRapiClient 
 67   
 68  from ganeti.watcher import nodemaint 
 69  from ganeti.watcher import state 
 70   
 71   
 72  MAXTRIES = 5 
 73  BAD_STATES = compat.UniqueFrozenset([ 
 74    constants.INSTST_ERRORDOWN, 
 75    ]) 
 76  HELPLESS_STATES = compat.UniqueFrozenset([ 
 77    constants.INSTST_NODEDOWN, 
 78    constants.INSTST_NODEOFFLINE, 
 79    ]) 
 80  NOTICE = "NOTICE" 
 81  ERROR = "ERROR" 
 82   
 83  #: Number of seconds to wait between starting child processes for node groups 
 84  CHILD_PROCESS_DELAY = 1.0 
 85   
 86  #: How many seconds to wait for instance status file lock 
 87  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
88 89 90 -class NotMasterError(errors.GenericError):
91 """Exception raised when this host is not the master."""
92
93 94 -def ShouldPause():
95 """Check whether we should pause. 96 97 """ 98 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
99
100 101 -def StartNodeDaemons():
102 """Start all the daemons that should be running on all nodes. 103 104 """ 105 # on master or not, try to start the node daemon 106 utils.EnsureDaemon(constants.NODED) 107 # start confd as well. On non candidates it will be in disabled mode. 108 if constants.ENABLE_CONFD: 109 utils.EnsureDaemon(constants.CONFD) 110 # start mond as well: all nodes need monitoring 111 if constants.ENABLE_MOND: 112 utils.EnsureDaemon(constants.MOND) 113 # start kvmd, which will quit if not needed to run 114 utils.EnsureDaemon(constants.KVMD)
115
116 117 -def RunWatcherHooks():
118 """Run the watcher hooks. 119 120 """ 121 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 122 constants.HOOKS_NAME_WATCHER) 123 if not os.path.isdir(hooks_dir): 124 return 125 126 try: 127 results = utils.RunParts(hooks_dir) 128 except Exception, err: # pylint: disable=W0703 129 logging.exception("RunParts %s failed: %s", hooks_dir, err) 130 return 131 132 for (relname, status, runresult) in results: 133 if status == constants.RUNPARTS_SKIP: 134 logging.debug("Watcher hook %s: skipped", relname) 135 elif status == constants.RUNPARTS_ERR: 136 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 137 elif status == constants.RUNPARTS_RUN: 138 if runresult.failed: 139 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 140 relname, runresult.exit_code, runresult.output) 141 else: 142 logging.debug("Watcher hook %s: success (output: %s)", relname, 143 runresult.output) 144 else: 145 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 146 status)
147
148 149 -class Instance(object):
150 """Abstraction for a Virtual Machine instance. 151 152 """
153 - def __init__(self, name, status, config_state, config_state_source, 154 disks_active, snodes):
155 self.name = name 156 self.status = status 157 self.config_state = config_state 158 self.config_state_source = config_state_source 159 self.disks_active = disks_active 160 self.snodes = snodes
161
162 - def Restart(self, cl):
163 """Encapsulates the start of an instance. 164 165 """ 166 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 167 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 168 "Restarting instance %s" % self.name, 169 utils.EpochNano())] 170 cli.SubmitOpCode(op, cl=cl)
171
172 - def ActivateDisks(self, cl):
173 """Encapsulates the activation of all disks of an instance. 174 175 """ 176 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 177 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 178 "Activating disks for instance %s" % self.name, 179 utils.EpochNano())] 180 cli.SubmitOpCode(op, cl=cl)
181
182 - def NeedsCleanup(self):
183 """Determines whether the instance needs cleanup. 184 185 Determines whether the instance needs cleanup after having been 186 shutdown by the user. 187 188 @rtype: bool 189 @return: True if the instance needs cleanup, False otherwise. 190 191 """ 192 return self.status == constants.INSTST_USERDOWN and \ 193 self.config_state != constants.ADMINST_DOWN
194
195 196 -class Node(object):
197 """Data container representing cluster node. 198 199 """
200 - def __init__(self, name, bootid, offline, secondaries):
201 """Initializes this class. 202 203 """ 204 self.name = name 205 self.bootid = bootid 206 self.offline = offline 207 self.secondaries = secondaries
208
209 210 -def _CleanupInstance(cl, notepad, inst, locks):
211 n = notepad.NumberOfCleanupAttempts(inst.name) 212 213 if inst.name in locks: 214 logging.info("Not cleaning up instance '%s', instance is locked", 215 inst.name) 216 return 217 218 if n > MAXTRIES: 219 logging.warning("Not cleaning up instance '%s', retries exhausted", 220 inst.name) 221 return 222 223 logging.info("Instance '%s' was shutdown by the user, cleaning up instance", 224 inst.name) 225 op = opcodes.OpInstanceShutdown(instance_name=inst.name, 226 admin_state_source=constants.USER_SOURCE) 227 228 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 229 "Cleaning up instance %s" % inst.name, 230 utils.EpochNano())] 231 try: 232 cli.SubmitOpCode(op, cl=cl) 233 if notepad.NumberOfCleanupAttempts(inst.name): 234 notepad.RemoveInstance(inst.name) 235 except Exception: # pylint: disable=W0703 236 logging.exception("Error while cleaning up instance '%s'", inst.name) 237 notepad.RecordCleanupAttempt(inst.name)
238
239 240 -def _CheckInstances(cl, notepad, instances, locks):
241 """Make a pass over the list of instances, restarting downed ones. 242 243 """ 244 notepad.MaintainInstanceList(instances.keys()) 245 246 started = set() 247 248 for inst in instances.values(): 249 if inst.NeedsCleanup(): 250 _CleanupInstance(cl, notepad, inst, locks) 251 elif inst.status in BAD_STATES: 252 n = notepad.NumberOfRestartAttempts(inst.name) 253 254 if n > MAXTRIES: 255 logging.warning("Not restarting instance '%s', retries exhausted", 256 inst.name) 257 continue 258 259 if n == MAXTRIES: 260 notepad.RecordRestartAttempt(inst.name) 261 logging.error("Could not restart instance '%s' after %s attempts," 262 " giving up", inst.name, MAXTRIES) 263 continue 264 265 try: 266 logging.info("Restarting instance '%s' (attempt #%s)", 267 inst.name, n + 1) 268 inst.Restart(cl) 269 except Exception: # pylint: disable=W0703 270 logging.exception("Error while restarting instance '%s'", inst.name) 271 else: 272 started.add(inst.name) 273 274 notepad.RecordRestartAttempt(inst.name) 275 276 else: 277 if notepad.NumberOfRestartAttempts(inst.name): 278 notepad.RemoveInstance(inst.name) 279 if inst.status not in HELPLESS_STATES: 280 logging.info("Restart of instance '%s' succeeded", inst.name) 281 282 return started
283
284 285 -def _CheckDisks(cl, notepad, nodes, instances, started):
286 """Check all nodes for restarted ones. 287 288 """ 289 check_nodes = [] 290 291 for node in nodes.values(): 292 old = notepad.GetNodeBootID(node.name) 293 if not node.bootid: 294 # Bad node, not returning a boot id 295 if not node.offline: 296 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 297 node.name) 298 continue 299 300 if old != node.bootid: 301 # Node's boot ID has changed, probably through a reboot 302 check_nodes.append(node) 303 304 if check_nodes: 305 # Activate disks for all instances with any of the checked nodes as a 306 # secondary node. 307 for node in check_nodes: 308 for instance_name in node.secondaries: 309 try: 310 inst = instances[instance_name] 311 except KeyError: 312 logging.info("Can't find instance '%s', maybe it was ignored", 313 instance_name) 314 continue 315 316 if not inst.disks_active: 317 logging.info("Skipping disk activation for instance with not" 318 " activated disks '%s'", inst.name) 319 continue 320 321 if inst.name in started: 322 # we already tried to start the instance, which should have 323 # activated its drives (if they can be at all) 324 logging.debug("Skipping disk activation for instance '%s' as" 325 " it was already started", inst.name) 326 continue 327 328 try: 329 logging.info("Activating disks for instance '%s'", inst.name) 330 inst.ActivateDisks(cl) 331 except Exception: # pylint: disable=W0703 332 logging.exception("Error while activating disks for instance '%s'", 333 inst.name) 334 335 # Keep changed boot IDs 336 for node in check_nodes: 337 notepad.SetNodeBootID(node.name, node.bootid)
338
339 340 -def _CheckForOfflineNodes(nodes, instance):
341 """Checks if given instances has any secondary in offline status. 342 343 @param instance: The instance object 344 @return: True if any of the secondary is offline, False otherwise 345 346 """ 347 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
348
349 350 -def _VerifyDisks(cl, uuid, nodes, instances):
351 """Run a per-group "gnt-cluster verify-disks". 352 353 """ 354 op = opcodes.OpGroupVerifyDisks( 355 group_name=uuid, priority=constants.OP_PRIO_LOW) 356 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 357 "Verifying disks of group %s" % uuid, 358 utils.EpochNano())] 359 job_id = cl.SubmitJob([op]) 360 ((_, offline_disk_instances, _), ) = \ 361 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 362 cl.ArchiveJob(job_id) 363 364 if not offline_disk_instances: 365 # nothing to do 366 logging.debug("Verify-disks reported no offline disks, nothing to do") 367 return 368 369 logging.debug("Will activate disks for instance(s) %s", 370 utils.CommaJoin(offline_disk_instances)) 371 372 # We submit only one job, and wait for it. Not optimal, but this puts less 373 # load on the job queue. 374 job = [] 375 for name in offline_disk_instances: 376 try: 377 inst = instances[name] 378 except KeyError: 379 logging.info("Can't find instance '%s', maybe it was ignored", name) 380 continue 381 382 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 383 logging.info("Skipping instance '%s' because it is in a helpless state" 384 " or has offline secondaries", name) 385 continue 386 387 op = opcodes.OpInstanceActivateDisks(instance_name=name) 388 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 389 "Activating disks for instance %s" % name, 390 utils.EpochNano())] 391 job.append(op) 392 393 if job: 394 job_id = cli.SendJob(job, cl=cl) 395 396 try: 397 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 398 except Exception: # pylint: disable=W0703 399 logging.exception("Error while activating disks")
400
401 402 -def IsRapiResponding(hostname):
403 """Connects to RAPI port and does a simple test. 404 405 Connects to RAPI port of hostname and does a simple test. At this time, the 406 test is GetVersion. 407 408 If RAPI responds with error code "401 Unauthorized", the test is successful, 409 because the aim of this function is to assess whether RAPI is responding, not 410 if it is accessible. 411 412 @type hostname: string 413 @param hostname: hostname of the node to connect to. 414 @rtype: bool 415 @return: Whether RAPI is working properly 416 417 """ 418 curl_config = rapi.client.GenericCurlConfig() 419 rapi_client = rapi.client.GanetiRapiClient(hostname, 420 curl_config_fn=curl_config) 421 try: 422 master_version = rapi_client.GetVersion() 423 except rapi.client.CertificateError, err: 424 logging.warning("RAPI certificate error: %s", err) 425 return False 426 except rapi.client.GanetiApiError, err: 427 if err.code == 401: 428 # Unauthorized, but RAPI is alive and responding 429 return True 430 else: 431 logging.warning("RAPI error: %s", err) 432 return False 433 else: 434 logging.debug("Reported RAPI version %s", master_version) 435 return master_version == constants.RAPI_VERSION
436
437 438 -def IsWconfdResponding():
439 """Probes an echo RPC to WConfD. 440 441 """ 442 probe_string = "ganeti watcher probe %d" % time.time() 443 444 try: 445 result = wconfd.Client().Echo(probe_string) 446 except Exception, err: # pylint: disable=W0703 447 logging.warning("WConfd connection error: %s", err) 448 return False 449 450 if result != probe_string: 451 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result) 452 return False 453 454 return True
455
456 457 -def ParseOptions():
458 """Parse the command line options. 459 460 @return: (options, args) as from OptionParser.parse_args() 461 462 """ 463 parser = OptionParser(description="Ganeti cluster watcher", 464 usage="%prog [-d]", 465 version="%%prog (ganeti) %s" % 466 constants.RELEASE_VERSION) 467 468 parser.add_option(cli.DEBUG_OPT) 469 parser.add_option(cli.NODEGROUP_OPT) 470 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 471 help="Autoarchive jobs older than this age (default" 472 " 6 hours)") 473 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 474 action="store_true", help="Ignore cluster pause setting") 475 parser.add_option("--wait-children", dest="wait_children", 476 action="store_true", help="Wait for child processes") 477 parser.add_option("--no-wait-children", dest="wait_children", 478 action="store_false", 479 help="Don't wait for child processes") 480 parser.add_option("--rapi-ip", dest="rapi_ip", 481 default=constants.IP4_ADDRESS_LOCALHOST, 482 help="Use this IP to talk to RAPI.") 483 # See optparse documentation for why default values are not set by options 484 parser.set_defaults(wait_children=True) 485 options, args = parser.parse_args() 486 options.job_age = cli.ParseTimespec(options.job_age) 487 488 if args: 489 parser.error("No arguments expected") 490 491 return (options, args)
492
493 494 -def _WriteInstanceStatus(filename, data):
495 """Writes the per-group instance status file. 496 497 The entries are sorted. 498 499 @type filename: string 500 @param filename: Path to instance status file 501 @type data: list of tuple; (instance name as string, status as string) 502 @param data: Instance name and status 503 504 """ 505 logging.debug("Updating instance status file '%s' with %s instances", 506 filename, len(data)) 507 508 utils.WriteFile(filename, 509 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 510 sorted(data))))
511
512 513 -def _UpdateInstanceStatus(filename, instances):
514 """Writes an instance status file from L{Instance} objects. 515 516 @type filename: string 517 @param filename: Path to status file 518 @type instances: list of L{Instance} 519 520 """ 521 _WriteInstanceStatus(filename, [(inst.name, inst.status) 522 for inst in instances])
523
524 525 -def _ReadInstanceStatus(filename):
526 """Reads an instance status file. 527 528 @type filename: string 529 @param filename: Path to status file 530 @rtype: tuple; (None or number, list of lists containing instance name and 531 status) 532 @return: File's mtime and instance status contained in the file; mtime is 533 C{None} if file can't be read 534 535 """ 536 logging.debug("Reading per-group instance status from '%s'", filename) 537 538 statcb = utils.FileStatHelper() 539 try: 540 content = utils.ReadFile(filename, preread=statcb) 541 except EnvironmentError, err: 542 if err.errno == errno.ENOENT: 543 logging.error("Can't read '%s', does not exist (yet)", filename) 544 else: 545 logging.exception("Unable to read '%s', ignoring", filename) 546 return (None, None) 547 else: 548 return (statcb.st.st_mtime, [line.split(None, 1) 549 for line in content.splitlines()])
550
551 552 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
553 """Merges all per-group instance status files into a global one. 554 555 @type filename: string 556 @param filename: Path to global instance status file 557 @type pergroup_filename: string 558 @param pergroup_filename: Path to per-group status files, must contain "%s" 559 to be replaced with group UUID 560 @type groups: sequence 561 @param groups: UUIDs of known groups 562 563 """ 564 # Lock global status file in exclusive mode 565 lock = utils.FileLock.Open(filename) 566 try: 567 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 568 except errors.LockError, err: 569 # All per-group processes will lock and update the file. None of them 570 # should take longer than 10 seconds (the value of 571 # INSTANCE_STATUS_LOCK_TIMEOUT). 572 logging.error("Can't acquire lock on instance status file '%s', not" 573 " updating: %s", filename, err) 574 return 575 576 logging.debug("Acquired exclusive lock on '%s'", filename) 577 578 data = {} 579 580 # Load instance status from all groups 581 for group_uuid in groups: 582 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 583 584 if mtime is not None: 585 for (instance_name, status) in instdata: 586 data.setdefault(instance_name, []).append((mtime, status)) 587 588 # Select last update based on file mtime 589 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 590 for (instance_name, status) in data.items()] 591 592 # Write the global status file. Don't touch file after it's been 593 # updated--there is no lock anymore. 594 _WriteInstanceStatus(filename, inststatus)
595
596 597 -def GetLuxiClient(try_restart):
598 """Tries to connect to the luxi daemon. 599 600 @type try_restart: bool 601 @param try_restart: Whether to attempt to restart the master daemon 602 603 """ 604 try: 605 return cli.GetClient() 606 except errors.OpPrereqError, err: 607 # this is, from cli.GetClient, a not-master case 608 raise NotMasterError("Not on master node (%s)" % err) 609 610 except (rpcerr.NoMasterError, rpcerr.TimeoutError), err: 611 if not try_restart: 612 raise 613 614 logging.warning("Luxi daemon seems to be down (%s), trying to restart", 615 err) 616 617 if not utils.EnsureDaemon(constants.LUXID): 618 raise errors.GenericError("Can't start the master daemon") 619 620 # Retry the connection 621 return cli.GetClient()
622
623 624 -def _StartGroupChildren(cl, wait):
625 """Starts a new instance of the watcher for every node group. 626 627 """ 628 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 629 for arg in sys.argv) 630 631 result = cl.QueryGroups([], ["name", "uuid"], False) 632 633 children = [] 634 635 for (idx, (name, uuid)) in enumerate(result): 636 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 637 638 if idx > 0: 639 # Let's not kill the system 640 time.sleep(CHILD_PROCESS_DELAY) 641 642 logging.debug("Spawning child for group '%s' (%s), arguments %s", 643 name, uuid, args) 644 645 try: 646 # TODO: Should utils.StartDaemon be used instead? 647 pid = os.spawnv(os.P_NOWAIT, args[0], args) 648 except Exception: # pylint: disable=W0703 649 logging.exception("Failed to start child for group '%s' (%s)", 650 name, uuid) 651 else: 652 logging.debug("Started with PID %s", pid) 653 children.append(pid) 654 655 if wait: 656 for pid in children: 657 logging.debug("Waiting for child PID %s", pid) 658 try: 659 result = utils.RetryOnSignal(os.waitpid, pid, 0) 660 except EnvironmentError, err: 661 result = str(err) 662 663 logging.debug("Child PID %s exited with status %s", pid, result)
664
665 666 -def _ArchiveJobs(cl, age):
667 """Archives old jobs. 668 669 """ 670 (arch_count, left_count) = cl.AutoArchiveJobs(age) 671 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
672
673 674 -def _CheckMaster(cl):
675 """Ensures current host is master node. 676 677 """ 678 (master, ) = cl.QueryConfigValues(["master_node"]) 679 if master != netutils.Hostname.GetSysName(): 680 raise NotMasterError("This is not the master node")
681
682 683 @UsesRapiClient 684 -def _GlobalWatcher(opts):
685 """Main function for global watcher. 686 687 At the end child processes are spawned for every node group. 688 689 """ 690 StartNodeDaemons() 691 RunWatcherHooks() 692 693 # Run node maintenance in all cases, even if master, so that old masters can 694 # be properly cleaned up 695 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 696 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 697 698 try: 699 client = GetLuxiClient(True) 700 except NotMasterError: 701 # Don't proceed on non-master nodes 702 return constants.EXIT_SUCCESS 703 704 # we are on master now 705 utils.EnsureDaemon(constants.RAPI) 706 utils.EnsureDaemon(constants.WCONFD) 707 708 # If RAPI isn't responding to queries, try one restart 709 logging.debug("Attempting to talk to remote API on %s", 710 opts.rapi_ip) 711 if not IsRapiResponding(opts.rapi_ip): 712 logging.warning("Couldn't get answer from remote API, restaring daemon") 713 utils.StopDaemon(constants.RAPI) 714 utils.EnsureDaemon(constants.RAPI) 715 logging.debug("Second attempt to talk to remote API") 716 if not IsRapiResponding(opts.rapi_ip): 717 logging.fatal("RAPI is not responding") 718 logging.debug("Successfully talked to remote API") 719 720 # If WConfD isn't responding to queries, try one restart 721 logging.debug("Attempting to talk to WConfD") 722 if not IsWconfdResponding(): 723 logging.warning("WConfD not responsive, restarting daemon") 724 utils.StopDaemon(constants.WCONFD) 725 utils.EnsureDaemon(constants.WCONFD) 726 logging.debug("Second attempt to talk to WConfD") 727 if not IsWconfdResponding(): 728 logging.fatal("WConfD is not responding") 729 730 _CheckMaster(client) 731 _ArchiveJobs(client, opts.job_age) 732 733 # Spawn child processes for all node groups 734 _StartGroupChildren(client, opts.wait_children) 735 736 return constants.EXIT_SUCCESS
737
738 739 -def _GetGroupData(qcl, uuid):
740 """Retrieves instances and nodes per node group. 741 742 """ 743 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None) 744 745 prefix = "instance/" 746 prefix_len = len(prefix) 747 748 locked_instances = set() 749 750 for [[_, name], [_, lock]] in locks.data: 751 if name.startswith(prefix) and lock: 752 locked_instances.add(name[prefix_len:]) 753 754 queries = [ 755 (constants.QR_INSTANCE, 756 ["name", "status", "admin_state", "admin_state_source", "disks_active", 757 "snodes", "pnode.group.uuid", "snodes.group.uuid"], 758 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]), 759 (constants.QR_NODE, 760 ["name", "bootid", "offline"], 761 [qlang.OP_EQUAL, "group.uuid", uuid]), 762 ] 763 764 results = [] 765 for what, fields, qfilter in queries: 766 results.append(qcl.Query(what, fields, qfilter)) 767 768 results_data = map(operator.attrgetter("data"), results) 769 770 # Ensure results are tuples with two values 771 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 772 773 # Extract values ignoring result status 774 (raw_instances, raw_nodes) = [[map(compat.snd, values) 775 for values in res] 776 for res in results_data] 777 778 secondaries = {} 779 instances = [] 780 781 # Load all instances 782 for (name, status, config_state, config_state_source, disks_active, snodes, 783 pnode_group_uuid, snodes_group_uuid) in raw_instances: 784 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 785 logging.error("Ignoring split instance '%s', primary group %s, secondary" 786 " groups %s", name, pnode_group_uuid, 787 utils.CommaJoin(snodes_group_uuid)) 788 else: 789 instances.append(Instance(name, status, config_state, config_state_source, 790 disks_active, snodes)) 791 792 for node in snodes: 793 secondaries.setdefault(node, set()).add(name) 794 795 # Load all nodes 796 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 797 for (name, bootid, offline) in raw_nodes] 798 799 return (dict((node.name, node) for node in nodes), 800 dict((inst.name, inst) for inst in instances), 801 locked_instances)
802
803 804 -def _LoadKnownGroups():
805 """Returns a list of all node groups known by L{ssconf}. 806 807 """ 808 groups = ssconf.SimpleStore().GetNodegroupList() 809 810 result = list(line.split(None, 1)[0] for line in groups 811 if line.strip()) 812 813 if not compat.all(map(utils.UUID_RE.match, result)): 814 raise errors.GenericError("Ssconf contains invalid group UUID") 815 816 return result
817
818 819 -def _GroupWatcher(opts):
820 """Main function for per-group watcher process. 821 822 """ 823 group_uuid = opts.nodegroup.lower() 824 825 if not utils.UUID_RE.match(group_uuid): 826 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 827 " got '%s'" % 828 (cli.NODEGROUP_OPT_NAME, group_uuid)) 829 830 logging.info("Watcher for node group '%s'", group_uuid) 831 832 known_groups = _LoadKnownGroups() 833 834 # Check if node group is known 835 if group_uuid not in known_groups: 836 raise errors.GenericError("Node group '%s' is not known by ssconf" % 837 group_uuid) 838 839 # Group UUID has been verified and should not contain any dangerous 840 # characters 841 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 842 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 843 844 logging.debug("Using state file %s", state_path) 845 846 # Global watcher 847 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 848 if not statefile: 849 return constants.EXIT_FAILURE 850 851 notepad = state.WatcherState(statefile) # pylint: disable=E0602 852 try: 853 # Connect to master daemon 854 client = GetLuxiClient(False) 855 856 _CheckMaster(client) 857 858 (nodes, instances, locks) = _GetGroupData(client, group_uuid) 859 860 # Update per-group instance status file 861 _UpdateInstanceStatus(inst_status_path, instances.values()) 862 863 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 864 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 865 known_groups) 866 867 started = _CheckInstances(client, notepad, instances, locks) 868 _CheckDisks(client, notepad, nodes, instances, started) 869 _VerifyDisks(client, group_uuid, nodes, instances) 870 except Exception, err: 871 logging.info("Not updating status file due to failure: %s", err) 872 raise 873 else: 874 # Save changes for next run 875 notepad.Save(state_path) 876 877 return constants.EXIT_SUCCESS
878
879 880 -def Main():
881 """Main function. 882 883 """ 884 (options, _) = ParseOptions() 885 886 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 887 debug=options.debug, stderr_logging=options.debug) 888 889 if ShouldPause() and not options.ignore_pause: 890 logging.debug("Pause has been set, exiting") 891 return constants.EXIT_SUCCESS 892 893 # Try to acquire global watcher lock in shared mode 894 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 895 try: 896 lock.Shared(blocking=False) 897 except (EnvironmentError, errors.LockError), err: 898 logging.error("Can't acquire lock on %s: %s", 899 pathutils.WATCHER_LOCK_FILE, err) 900 return constants.EXIT_SUCCESS 901 902 if options.nodegroup is None: 903 fn = _GlobalWatcher 904 else: 905 # Per-nodegroup watcher 906 fn = _GroupWatcher 907 908 try: 909 return fn(options) 910 except (SystemExit, KeyboardInterrupt): 911 raise 912 except NotMasterError: 913 logging.debug("Not master, exiting") 914 return constants.EXIT_NOTMASTER 915 except errors.ResolverError, err: 916 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 917 return constants.EXIT_NODESETUP_ERROR 918 except errors.JobQueueFull: 919 logging.error("Job queue is full, can't query cluster state") 920 except errors.JobQueueDrainError: 921 logging.error("Job queue is drained, can't maintain cluster state") 922 except Exception, err: 923 logging.exception(str(err)) 924 return constants.EXIT_FAILURE 925 926 return constants.EXIT_SUCCESS
927