Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Tool to restart erroneously downed virtual machines. 
 32   
 33  This program and set of classes implement a watchdog to restart 
 34  virtual machines in a Ganeti cluster that have crashed or been killed 
 35  by a node reboot.  Run from cron or similar. 
 36   
 37  """ 
 38   
 39  import os 
 40  import os.path 
 41  import sys 
 42  import time 
 43  import logging 
 44  import operator 
 45  import errno 
 46  from optparse import OptionParser 
 47   
 48  from ganeti import utils 
 49  from ganeti import wconfd 
 50  from ganeti import constants 
 51  from ganeti import compat 
 52  from ganeti import errors 
 53  from ganeti import opcodes 
 54  from ganeti import cli 
 55  import ganeti.rpc.errors as rpcerr 
 56  from ganeti import rapi 
 57  from ganeti import netutils 
 58  from ganeti import qlang 
 59  from ganeti import ssconf 
 60  from ganeti import ht 
 61  from ganeti import pathutils 
 62   
 63  import ganeti.rapi.client # pylint: disable=W0611 
 64  from ganeti.rapi.client import UsesRapiClient 
 65   
 66  from ganeti.watcher import nodemaint 
 67  from ganeti.watcher import state 
 68   
 69   
 70  MAXTRIES = 5 
 71  BAD_STATES = compat.UniqueFrozenset([ 
 72    constants.INSTST_ERRORDOWN, 
 73    ]) 
 74  HELPLESS_STATES = compat.UniqueFrozenset([ 
 75    constants.INSTST_NODEDOWN, 
 76    constants.INSTST_NODEOFFLINE, 
 77    ]) 
 78  NOTICE = "NOTICE" 
 79  ERROR = "ERROR" 
 80   
 81  #: Number of seconds to wait between starting child processes for node groups 
 82  CHILD_PROCESS_DELAY = 1.0 
 83   
 84  #: How many seconds to wait for instance status file lock 
 85  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
86 87 88 -class NotMasterError(errors.GenericError):
89 """Exception raised when this host is not the master."""
90
91 92 -def ShouldPause():
93 """Check whether we should pause. 94 95 """ 96 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
97
98 99 -def StartNodeDaemons():
100 """Start all the daemons that should be running on all nodes. 101 102 """ 103 # on master or not, try to start the node daemon 104 utils.EnsureDaemon(constants.NODED) 105 # start confd as well. On non candidates it will be in disabled mode. 106 utils.EnsureDaemon(constants.CONFD) 107 # start mond as well: all nodes need monitoring 108 if constants.ENABLE_MOND: 109 utils.EnsureDaemon(constants.MOND) 110 # start kvmd, which will quit if not needed to run 111 utils.EnsureDaemon(constants.KVMD)
112
113 114 -def RunWatcherHooks():
115 """Run the watcher hooks. 116 117 """ 118 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 119 constants.HOOKS_NAME_WATCHER) 120 if not os.path.isdir(hooks_dir): 121 return 122 123 try: 124 results = utils.RunParts(hooks_dir) 125 except Exception, err: # pylint: disable=W0703 126 logging.exception("RunParts %s failed: %s", hooks_dir, err) 127 return 128 129 for (relname, status, runresult) in results: 130 if status == constants.RUNPARTS_SKIP: 131 logging.debug("Watcher hook %s: skipped", relname) 132 elif status == constants.RUNPARTS_ERR: 133 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 134 elif status == constants.RUNPARTS_RUN: 135 if runresult.failed: 136 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 137 relname, runresult.exit_code, runresult.output) 138 else: 139 logging.debug("Watcher hook %s: success (output: %s)", relname, 140 runresult.output) 141 else: 142 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 143 status)
144
145 146 -class Instance(object):
147 """Abstraction for a Virtual Machine instance. 148 149 """
150 - def __init__(self, name, status, config_state, config_state_source, 151 disks_active, snodes):
152 self.name = name 153 self.status = status 154 self.config_state = config_state 155 self.config_state_source = config_state_source 156 self.disks_active = disks_active 157 self.snodes = snodes
158
159 - def Restart(self, cl):
160 """Encapsulates the start of an instance. 161 162 """ 163 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 164 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 165 "Restarting instance %s" % self.name, 166 utils.EpochNano())] 167 cli.SubmitOpCode(op, cl=cl)
168
169 - def ActivateDisks(self, cl):
170 """Encapsulates the activation of all disks of an instance. 171 172 """ 173 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 174 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 175 "Activating disks for instance %s" % self.name, 176 utils.EpochNano())] 177 cli.SubmitOpCode(op, cl=cl)
178
179 - def NeedsCleanup(self):
180 """Determines whether the instance needs cleanup. 181 182 Determines whether the instance needs cleanup after having been 183 shutdown by the user. 184 185 @rtype: bool 186 @return: True if the instance needs cleanup, False otherwise. 187 188 """ 189 return self.status == constants.INSTST_USERDOWN and \ 190 self.config_state != constants.ADMINST_DOWN
191
192 193 -class Node(object):
194 """Data container representing cluster node. 195 196 """
197 - def __init__(self, name, bootid, offline, secondaries):
198 """Initializes this class. 199 200 """ 201 self.name = name 202 self.bootid = bootid 203 self.offline = offline 204 self.secondaries = secondaries
205
206 207 -def _CleanupInstance(cl, notepad, inst, locks):
208 n = notepad.NumberOfCleanupAttempts(inst.name) 209 210 if inst.name in locks: 211 logging.info("Not cleaning up instance '%s', instance is locked", 212 inst.name) 213 return 214 215 if n > MAXTRIES: 216 logging.warning("Not cleaning up instance '%s', retries exhausted", 217 inst.name) 218 return 219 220 logging.info("Instance '%s' was shutdown by the user, cleaning up instance", 221 inst.name) 222 op = opcodes.OpInstanceShutdown(instance_name=inst.name, 223 admin_state_source=constants.USER_SOURCE) 224 225 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 226 "Cleaning up instance %s" % inst.name, 227 utils.EpochNano())] 228 try: 229 cli.SubmitOpCode(op, cl=cl) 230 if notepad.NumberOfCleanupAttempts(inst.name): 231 notepad.RemoveInstance(inst.name) 232 except Exception: # pylint: disable=W0703 233 logging.exception("Error while cleaning up instance '%s'", inst.name) 234 notepad.RecordCleanupAttempt(inst.name)
235
236 237 -def _CheckInstances(cl, notepad, instances, locks):
238 """Make a pass over the list of instances, restarting downed ones. 239 240 """ 241 notepad.MaintainInstanceList(instances.keys()) 242 243 started = set() 244 245 for inst in instances.values(): 246 if inst.NeedsCleanup(): 247 _CleanupInstance(cl, notepad, inst, locks) 248 elif inst.status in BAD_STATES: 249 n = notepad.NumberOfRestartAttempts(inst.name) 250 251 if n > MAXTRIES: 252 logging.warning("Not restarting instance '%s', retries exhausted", 253 inst.name) 254 continue 255 256 if n == MAXTRIES: 257 notepad.RecordRestartAttempt(inst.name) 258 logging.error("Could not restart instance '%s' after %s attempts," 259 " giving up", inst.name, MAXTRIES) 260 continue 261 262 try: 263 logging.info("Restarting instance '%s' (attempt #%s)", 264 inst.name, n + 1) 265 inst.Restart(cl) 266 except Exception: # pylint: disable=W0703 267 logging.exception("Error while restarting instance '%s'", inst.name) 268 else: 269 started.add(inst.name) 270 271 notepad.RecordRestartAttempt(inst.name) 272 273 else: 274 if notepad.NumberOfRestartAttempts(inst.name): 275 notepad.RemoveInstance(inst.name) 276 if inst.status not in HELPLESS_STATES: 277 logging.info("Restart of instance '%s' succeeded", inst.name) 278 279 return started
280
281 282 -def _CheckDisks(cl, notepad, nodes, instances, started):
283 """Check all nodes for restarted ones. 284 285 """ 286 check_nodes = [] 287 288 for node in nodes.values(): 289 old = notepad.GetNodeBootID(node.name) 290 if not node.bootid: 291 # Bad node, not returning a boot id 292 if not node.offline: 293 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 294 node.name) 295 continue 296 297 if old != node.bootid: 298 # Node's boot ID has changed, probably through a reboot 299 check_nodes.append(node) 300 301 if check_nodes: 302 # Activate disks for all instances with any of the checked nodes as a 303 # secondary node. 304 for node in check_nodes: 305 for instance_name in node.secondaries: 306 try: 307 inst = instances[instance_name] 308 except KeyError: 309 logging.info("Can't find instance '%s', maybe it was ignored", 310 instance_name) 311 continue 312 313 if not inst.disks_active: 314 logging.info("Skipping disk activation for instance with not" 315 " activated disks '%s'", inst.name) 316 continue 317 318 if inst.name in started: 319 # we already tried to start the instance, which should have 320 # activated its drives (if they can be at all) 321 logging.debug("Skipping disk activation for instance '%s' as" 322 " it was already started", inst.name) 323 continue 324 325 try: 326 logging.info("Activating disks for instance '%s'", inst.name) 327 inst.ActivateDisks(cl) 328 except Exception: # pylint: disable=W0703 329 logging.exception("Error while activating disks for instance '%s'", 330 inst.name) 331 332 # Keep changed boot IDs 333 for node in check_nodes: 334 notepad.SetNodeBootID(node.name, node.bootid)
335
336 337 -def _CheckForOfflineNodes(nodes, instance):
338 """Checks if given instances has any secondary in offline status. 339 340 @param instance: The instance object 341 @return: True if any of the secondary is offline, False otherwise 342 343 """ 344 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
345
346 347 -def _VerifyDisks(cl, uuid, nodes, instances):
348 """Run a per-group "gnt-cluster verify-disks". 349 350 """ 351 op = opcodes.OpGroupVerifyDisks( 352 group_name=uuid, priority=constants.OP_PRIO_LOW) 353 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 354 "Verifying disks of group %s" % uuid, 355 utils.EpochNano())] 356 job_id = cl.SubmitJob([op]) 357 ((_, offline_disk_instances, _), ) = \ 358 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 359 cl.ArchiveJob(job_id) 360 361 if not offline_disk_instances: 362 # nothing to do 363 logging.debug("Verify-disks reported no offline disks, nothing to do") 364 return 365 366 logging.debug("Will activate disks for instance(s) %s", 367 utils.CommaJoin(offline_disk_instances)) 368 369 # We submit only one job, and wait for it. Not optimal, but this puts less 370 # load on the job queue. 371 job = [] 372 for name in offline_disk_instances: 373 try: 374 inst = instances[name] 375 except KeyError: 376 logging.info("Can't find instance '%s', maybe it was ignored", name) 377 continue 378 379 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 380 logging.info("Skipping instance '%s' because it is in a helpless state" 381 " or has offline secondaries", name) 382 continue 383 384 op = opcodes.OpInstanceActivateDisks(instance_name=name) 385 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 386 "Activating disks for instance %s" % name, 387 utils.EpochNano())] 388 job.append(op) 389 390 if job: 391 job_id = cli.SendJob(job, cl=cl) 392 393 try: 394 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 395 except Exception: # pylint: disable=W0703 396 logging.exception("Error while activating disks")
397
398 399 -def IsRapiResponding(hostname):
400 """Connects to RAPI port and does a simple test. 401 402 Connects to RAPI port of hostname and does a simple test. At this time, the 403 test is GetVersion. 404 405 If RAPI responds with error code "401 Unauthorized", the test is successful, 406 because the aim of this function is to assess whether RAPI is responding, not 407 if it is accessible. 408 409 @type hostname: string 410 @param hostname: hostname of the node to connect to. 411 @rtype: bool 412 @return: Whether RAPI is working properly 413 414 """ 415 curl_config = rapi.client.GenericCurlConfig() 416 rapi_client = rapi.client.GanetiRapiClient(hostname, 417 curl_config_fn=curl_config) 418 try: 419 master_version = rapi_client.GetVersion() 420 except rapi.client.CertificateError, err: 421 logging.warning("RAPI certificate error: %s", err) 422 return False 423 except rapi.client.GanetiApiError, err: 424 if err.code == 401: 425 # Unauthorized, but RAPI is alive and responding 426 return True 427 else: 428 logging.warning("RAPI error: %s", err) 429 return False 430 else: 431 logging.debug("Reported RAPI version %s", master_version) 432 return master_version == constants.RAPI_VERSION
433
434 435 -def IsWconfdResponding():
436 """Probes an echo RPC to WConfD. 437 438 """ 439 probe_string = "ganeti watcher probe %d" % time.time() 440 441 try: 442 result = wconfd.Client().Echo(probe_string) 443 except Exception, err: # pylint: disable=W0703 444 logging.warning("WConfd connection error: %s", err) 445 return False 446 447 if result != probe_string: 448 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result) 449 return False 450 451 return True
452
453 454 -def ParseOptions():
455 """Parse the command line options. 456 457 @return: (options, args) as from OptionParser.parse_args() 458 459 """ 460 parser = OptionParser(description="Ganeti cluster watcher", 461 usage="%prog [-d]", 462 version="%%prog (ganeti) %s" % 463 constants.RELEASE_VERSION) 464 465 parser.add_option(cli.DEBUG_OPT) 466 parser.add_option(cli.NODEGROUP_OPT) 467 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 468 help="Autoarchive jobs older than this age (default" 469 " 6 hours)") 470 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 471 action="store_true", help="Ignore cluster pause setting") 472 parser.add_option("--wait-children", dest="wait_children", 473 action="store_true", help="Wait for child processes") 474 parser.add_option("--no-wait-children", dest="wait_children", 475 action="store_false", 476 help="Don't wait for child processes") 477 parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False, 478 action="store_true", help="Do not verify disk status") 479 parser.add_option("--rapi-ip", dest="rapi_ip", 480 default=constants.IP4_ADDRESS_LOCALHOST, 481 help="Use this IP to talk to RAPI.") 482 # See optparse documentation for why default values are not set by options 483 parser.set_defaults(wait_children=True) 484 options, args = parser.parse_args() 485 options.job_age = cli.ParseTimespec(options.job_age) 486 487 if args: 488 parser.error("No arguments expected") 489 490 return (options, args)
491
492 493 -def _WriteInstanceStatus(filename, data):
494 """Writes the per-group instance status file. 495 496 The entries are sorted. 497 498 @type filename: string 499 @param filename: Path to instance status file 500 @type data: list of tuple; (instance name as string, status as string) 501 @param data: Instance name and status 502 503 """ 504 logging.debug("Updating instance status file '%s' with %s instances", 505 filename, len(data)) 506 507 utils.WriteFile(filename, 508 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 509 sorted(data))))
510
511 512 -def _UpdateInstanceStatus(filename, instances):
513 """Writes an instance status file from L{Instance} objects. 514 515 @type filename: string 516 @param filename: Path to status file 517 @type instances: list of L{Instance} 518 519 """ 520 _WriteInstanceStatus(filename, [(inst.name, inst.status) 521 for inst in instances])
522
523 524 -def _ReadInstanceStatus(filename):
525 """Reads an instance status file. 526 527 @type filename: string 528 @param filename: Path to status file 529 @rtype: tuple; (None or number, list of lists containing instance name and 530 status) 531 @return: File's mtime and instance status contained in the file; mtime is 532 C{None} if file can't be read 533 534 """ 535 logging.debug("Reading per-group instance status from '%s'", filename) 536 537 statcb = utils.FileStatHelper() 538 try: 539 content = utils.ReadFile(filename, preread=statcb) 540 except EnvironmentError, err: 541 if err.errno == errno.ENOENT: 542 logging.error("Can't read '%s', does not exist (yet)", filename) 543 else: 544 logging.exception("Unable to read '%s', ignoring", filename) 545 return (None, None) 546 else: 547 return (statcb.st.st_mtime, [line.split(None, 1) 548 for line in content.splitlines()])
549
550 551 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
552 """Merges all per-group instance status files into a global one. 553 554 @type filename: string 555 @param filename: Path to global instance status file 556 @type pergroup_filename: string 557 @param pergroup_filename: Path to per-group status files, must contain "%s" 558 to be replaced with group UUID 559 @type groups: sequence 560 @param groups: UUIDs of known groups 561 562 """ 563 # Lock global status file in exclusive mode 564 lock = utils.FileLock.Open(filename) 565 try: 566 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 567 except errors.LockError, err: 568 # All per-group processes will lock and update the file. None of them 569 # should take longer than 10 seconds (the value of 570 # INSTANCE_STATUS_LOCK_TIMEOUT). 571 logging.error("Can't acquire lock on instance status file '%s', not" 572 " updating: %s", filename, err) 573 return 574 575 logging.debug("Acquired exclusive lock on '%s'", filename) 576 577 data = {} 578 579 # Load instance status from all groups 580 for group_uuid in groups: 581 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 582 583 if mtime is not None: 584 for (instance_name, status) in instdata: 585 data.setdefault(instance_name, []).append((mtime, status)) 586 587 # Select last update based on file mtime 588 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 589 for (instance_name, status) in data.items()] 590 591 # Write the global status file. Don't touch file after it's been 592 # updated--there is no lock anymore. 593 _WriteInstanceStatus(filename, inststatus)
594
595 596 -def GetLuxiClient(try_restart):
597 """Tries to connect to the luxi daemon. 598 599 @type try_restart: bool 600 @param try_restart: Whether to attempt to restart the master daemon 601 602 """ 603 try: 604 return cli.GetClient() 605 except errors.OpPrereqError, err: 606 # this is, from cli.GetClient, a not-master case 607 raise NotMasterError("Not on master node (%s)" % err) 608 609 except (rpcerr.NoMasterError, rpcerr.TimeoutError), err: 610 if not try_restart: 611 raise 612 613 logging.warning("Luxi daemon seems to be down (%s), trying to restart", 614 err) 615 616 if not utils.EnsureDaemon(constants.LUXID): 617 raise errors.GenericError("Can't start the master daemon") 618 619 # Retry the connection 620 return cli.GetClient()
621
622 623 -def _StartGroupChildren(cl, wait):
624 """Starts a new instance of the watcher for every node group. 625 626 """ 627 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 628 for arg in sys.argv) 629 630 result = cl.QueryGroups([], ["name", "uuid"], False) 631 632 children = [] 633 634 for (idx, (name, uuid)) in enumerate(result): 635 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 636 637 if idx > 0: 638 # Let's not kill the system 639 time.sleep(CHILD_PROCESS_DELAY) 640 641 logging.debug("Spawning child for group '%s' (%s), arguments %s", 642 name, uuid, args) 643 644 try: 645 # TODO: Should utils.StartDaemon be used instead? 646 pid = os.spawnv(os.P_NOWAIT, args[0], args) 647 except Exception: # pylint: disable=W0703 648 logging.exception("Failed to start child for group '%s' (%s)", 649 name, uuid) 650 else: 651 logging.debug("Started with PID %s", pid) 652 children.append(pid) 653 654 if wait: 655 for pid in children: 656 logging.debug("Waiting for child PID %s", pid) 657 try: 658 result = utils.RetryOnSignal(os.waitpid, pid, 0) 659 except EnvironmentError, err: 660 result = str(err) 661 662 logging.debug("Child PID %s exited with status %s", pid, result)
663
664 665 -def _ArchiveJobs(cl, age):
666 """Archives old jobs. 667 668 """ 669 (arch_count, left_count) = cl.AutoArchiveJobs(age) 670 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
671
672 673 -def _CheckMaster(cl):
674 """Ensures current host is master node. 675 676 """ 677 (master, ) = cl.QueryConfigValues(["master_node"]) 678 if master != netutils.Hostname.GetSysName(): 679 raise NotMasterError("This is not the master node")
680
681 682 @UsesRapiClient 683 -def _GlobalWatcher(opts):
684 """Main function for global watcher. 685 686 At the end child processes are spawned for every node group. 687 688 """ 689 StartNodeDaemons() 690 RunWatcherHooks() 691 692 # Run node maintenance in all cases, even if master, so that old masters can 693 # be properly cleaned up 694 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 695 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 696 697 try: 698 client = GetLuxiClient(True) 699 except NotMasterError: 700 # Don't proceed on non-master nodes 701 return constants.EXIT_SUCCESS 702 703 # we are on master now 704 utils.EnsureDaemon(constants.RAPI) 705 utils.EnsureDaemon(constants.WCONFD) 706 707 # If RAPI isn't responding to queries, try one restart 708 logging.debug("Attempting to talk to remote API on %s", 709 opts.rapi_ip) 710 if not IsRapiResponding(opts.rapi_ip): 711 logging.warning("Couldn't get answer from remote API, restaring daemon") 712 utils.StopDaemon(constants.RAPI) 713 utils.EnsureDaemon(constants.RAPI) 714 logging.debug("Second attempt to talk to remote API") 715 if not IsRapiResponding(opts.rapi_ip): 716 logging.fatal("RAPI is not responding") 717 logging.debug("Successfully talked to remote API") 718 719 # If WConfD isn't responding to queries, try one restart 720 logging.debug("Attempting to talk to WConfD") 721 if not IsWconfdResponding(): 722 logging.warning("WConfD not responsive, restarting daemon") 723 utils.StopDaemon(constants.WCONFD) 724 utils.EnsureDaemon(constants.WCONFD) 725 logging.debug("Second attempt to talk to WConfD") 726 if not IsWconfdResponding(): 727 logging.fatal("WConfD is not responding") 728 729 _CheckMaster(client) 730 _ArchiveJobs(client, opts.job_age) 731 732 # Spawn child processes for all node groups 733 _StartGroupChildren(client, opts.wait_children) 734 735 return constants.EXIT_SUCCESS
736
737 738 -def _GetGroupData(qcl, uuid):
739 """Retrieves instances and nodes per node group. 740 741 """ 742 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None) 743 744 prefix = "instance/" 745 prefix_len = len(prefix) 746 747 locked_instances = set() 748 749 for [[_, name], [_, lock]] in locks.data: 750 if name.startswith(prefix) and lock: 751 locked_instances.add(name[prefix_len:]) 752 753 queries = [ 754 (constants.QR_INSTANCE, 755 ["name", "status", "admin_state", "admin_state_source", "disks_active", 756 "snodes", "pnode.group.uuid", "snodes.group.uuid"], 757 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]), 758 (constants.QR_NODE, 759 ["name", "bootid", "offline"], 760 [qlang.OP_EQUAL, "group.uuid", uuid]), 761 ] 762 763 results = [] 764 for what, fields, qfilter in queries: 765 results.append(qcl.Query(what, fields, qfilter)) 766 767 results_data = map(operator.attrgetter("data"), results) 768 769 # Ensure results are tuples with two values 770 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 771 772 # Extract values ignoring result status 773 (raw_instances, raw_nodes) = [[map(compat.snd, values) 774 for values in res] 775 for res in results_data] 776 777 secondaries = {} 778 instances = [] 779 780 # Load all instances 781 for (name, status, config_state, config_state_source, disks_active, snodes, 782 pnode_group_uuid, snodes_group_uuid) in raw_instances: 783 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 784 logging.error("Ignoring split instance '%s', primary group %s, secondary" 785 " groups %s", name, pnode_group_uuid, 786 utils.CommaJoin(snodes_group_uuid)) 787 else: 788 instances.append(Instance(name, status, config_state, config_state_source, 789 disks_active, snodes)) 790 791 for node in snodes: 792 secondaries.setdefault(node, set()).add(name) 793 794 # Load all nodes 795 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 796 for (name, bootid, offline) in raw_nodes] 797 798 return (dict((node.name, node) for node in nodes), 799 dict((inst.name, inst) for inst in instances), 800 locked_instances)
801
802 803 -def _LoadKnownGroups():
804 """Returns a list of all node groups known by L{ssconf}. 805 806 """ 807 groups = ssconf.SimpleStore().GetNodegroupList() 808 809 result = list(line.split(None, 1)[0] for line in groups 810 if line.strip()) 811 812 if not compat.all(map(utils.UUID_RE.match, result)): 813 raise errors.GenericError("Ssconf contains invalid group UUID") 814 815 return result
816
817 818 -def _GroupWatcher(opts):
819 """Main function for per-group watcher process. 820 821 """ 822 group_uuid = opts.nodegroup.lower() 823 824 if not utils.UUID_RE.match(group_uuid): 825 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 826 " got '%s'" % 827 (cli.NODEGROUP_OPT_NAME, group_uuid)) 828 829 logging.info("Watcher for node group '%s'", group_uuid) 830 831 known_groups = _LoadKnownGroups() 832 833 # Check if node group is known 834 if group_uuid not in known_groups: 835 raise errors.GenericError("Node group '%s' is not known by ssconf" % 836 group_uuid) 837 838 # Group UUID has been verified and should not contain any dangerous 839 # characters 840 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 841 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 842 843 logging.debug("Using state file %s", state_path) 844 845 # Global watcher 846 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 847 if not statefile: 848 return constants.EXIT_FAILURE 849 850 notepad = state.WatcherState(statefile) # pylint: disable=E0602 851 try: 852 # Connect to master daemon 853 client = GetLuxiClient(False) 854 855 _CheckMaster(client) 856 857 (nodes, instances, locks) = _GetGroupData(client, group_uuid) 858 859 # Update per-group instance status file 860 _UpdateInstanceStatus(inst_status_path, instances.values()) 861 862 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 863 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 864 known_groups) 865 866 started = _CheckInstances(client, notepad, instances, locks) 867 _CheckDisks(client, notepad, nodes, instances, started) 868 if not opts.no_verify_disks: 869 _VerifyDisks(client, group_uuid, nodes, instances) 870 except Exception, err: 871 logging.info("Not updating status file due to failure: %s", err) 872 raise 873 else: 874 # Save changes for next run 875 notepad.Save(state_path) 876 877 return constants.EXIT_SUCCESS
878
879 880 -def Main():
881 """Main function. 882 883 """ 884 (options, _) = ParseOptions() 885 886 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 887 debug=options.debug, stderr_logging=options.debug) 888 889 if ShouldPause() and not options.ignore_pause: 890 logging.debug("Pause has been set, exiting") 891 return constants.EXIT_SUCCESS 892 893 # Try to acquire global watcher lock in shared mode 894 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 895 try: 896 lock.Shared(blocking=False) 897 except (EnvironmentError, errors.LockError), err: 898 logging.error("Can't acquire lock on %s: %s", 899 pathutils.WATCHER_LOCK_FILE, err) 900 return constants.EXIT_SUCCESS 901 902 if options.nodegroup is None: 903 fn = _GlobalWatcher 904 else: 905 # Per-nodegroup watcher 906 fn = _GroupWatcher 907 908 try: 909 return fn(options) 910 except (SystemExit, KeyboardInterrupt): 911 raise 912 except NotMasterError: 913 logging.debug("Not master, exiting") 914 return constants.EXIT_NOTMASTER 915 except errors.ResolverError, err: 916 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 917 return constants.EXIT_NODESETUP_ERROR 918 except errors.JobQueueFull: 919 logging.error("Job queue is full, can't query cluster state") 920 except errors.JobQueueDrainError: 921 logging.error("Job queue is drained, can't maintain cluster state") 922 except Exception, err: 923 logging.exception(str(err)) 924 return constants.EXIT_FAILURE 925 926 return constants.EXIT_SUCCESS
927