Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Tool to restart erroneously downed virtual machines. 
 32   
 33  This program and set of classes implement a watchdog to restart 
 34  virtual machines in a Ganeti cluster that have crashed or been killed 
 35  by a node reboot.  Run from cron or similar. 
 36   
 37  """ 
 38   
 39  import os 
 40  import os.path 
 41  import sys 
 42  import time 
 43  import logging 
 44  import operator 
 45  import errno 
 46  from optparse import OptionParser 
 47   
 48  from ganeti import utils 
 49  from ganeti import wconfd 
 50  from ganeti import constants 
 51  from ganeti import compat 
 52  from ganeti import errors 
 53  from ganeti import opcodes 
 54  from ganeti import cli 
 55  import ganeti.rpc.errors as rpcerr 
 56  from ganeti import rapi 
 57  from ganeti import netutils 
 58  from ganeti import qlang 
 59  from ganeti import ssconf 
 60  from ganeti import ht 
 61  from ganeti import pathutils 
 62   
 63  import ganeti.rapi.client # pylint: disable=W0611 
 64  from ganeti.rapi.client import UsesRapiClient 
 65   
 66  from ganeti.watcher import nodemaint 
 67  from ganeti.watcher import state 
 68   
 69   
 70  MAXTRIES = 5 
 71  BAD_STATES = compat.UniqueFrozenset([ 
 72    constants.INSTST_ERRORDOWN, 
 73    ]) 
 74  HELPLESS_STATES = compat.UniqueFrozenset([ 
 75    constants.INSTST_NODEDOWN, 
 76    constants.INSTST_NODEOFFLINE, 
 77    ]) 
 78  NOTICE = "NOTICE" 
 79  ERROR = "ERROR" 
 80   
 81  #: Number of seconds to wait between starting child processes for node groups 
 82  CHILD_PROCESS_DELAY = 1.0 
 83   
 84  #: How many seconds to wait for instance status file lock 
 85  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
86 87 88 -class NotMasterError(errors.GenericError):
89 """Exception raised when this host is not the master."""
90
91 92 -def ShouldPause():
93 """Check whether we should pause. 94 95 """ 96 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
97
98 99 -def StartNodeDaemons():
100 """Start all the daemons that should be running on all nodes. 101 102 """ 103 # on master or not, try to start the node daemon 104 utils.EnsureDaemon(constants.NODED) 105 # start confd as well. On non candidates it will be in disabled mode. 106 utils.EnsureDaemon(constants.CONFD) 107 # start mond as well: all nodes need monitoring 108 if constants.ENABLE_MOND: 109 utils.EnsureDaemon(constants.MOND) 110 # start kvmd, which will quit if not needed to run 111 utils.EnsureDaemon(constants.KVMD)
112
113 114 -def RunWatcherHooks():
115 """Run the watcher hooks. 116 117 """ 118 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 119 constants.HOOKS_NAME_WATCHER) 120 if not os.path.isdir(hooks_dir): 121 return 122 123 try: 124 results = utils.RunParts(hooks_dir) 125 except Exception, err: # pylint: disable=W0703 126 logging.exception("RunParts %s failed: %s", hooks_dir, err) 127 return 128 129 for (relname, status, runresult) in results: 130 if status == constants.RUNPARTS_SKIP: 131 logging.debug("Watcher hook %s: skipped", relname) 132 elif status == constants.RUNPARTS_ERR: 133 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 134 elif status == constants.RUNPARTS_RUN: 135 if runresult.failed: 136 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 137 relname, runresult.exit_code, runresult.output) 138 else: 139 logging.debug("Watcher hook %s: success (output: %s)", relname, 140 runresult.output) 141 else: 142 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 143 status)
144
145 146 -class Instance(object):
147 """Abstraction for a Virtual Machine instance. 148 149 """
150 - def __init__(self, name, status, config_state, config_state_source, 151 disks_active, snodes):
152 self.name = name 153 self.status = status 154 self.config_state = config_state 155 self.config_state_source = config_state_source 156 self.disks_active = disks_active 157 self.snodes = snodes
158
159 - def Restart(self, cl):
160 """Encapsulates the start of an instance. 161 162 """ 163 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 164 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 165 "Restarting instance %s" % self.name, 166 utils.EpochNano())] 167 cli.SubmitOpCode(op, cl=cl)
168
169 - def ActivateDisks(self, cl):
170 """Encapsulates the activation of all disks of an instance. 171 172 """ 173 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 174 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 175 "Activating disks for instance %s" % self.name, 176 utils.EpochNano())] 177 cli.SubmitOpCode(op, cl=cl)
178
179 - def NeedsCleanup(self):
180 """Determines whether the instance needs cleanup. 181 182 Determines whether the instance needs cleanup after having been 183 shutdown by the user. 184 185 @rtype: bool 186 @return: True if the instance needs cleanup, False otherwise. 187 188 """ 189 return self.status == constants.INSTST_USERDOWN and \ 190 self.config_state != constants.ADMINST_DOWN
191
192 193 -class Node(object):
194 """Data container representing cluster node. 195 196 """
197 - def __init__(self, name, bootid, offline, secondaries):
198 """Initializes this class. 199 200 """ 201 self.name = name 202 self.bootid = bootid 203 self.offline = offline 204 self.secondaries = secondaries
205
206 207 -def _CleanupInstance(cl, notepad, inst, locks):
208 n = notepad.NumberOfCleanupAttempts(inst.name) 209 210 if inst.name in locks: 211 logging.info("Not cleaning up instance '%s', instance is locked", 212 inst.name) 213 return 214 215 if n > MAXTRIES: 216 logging.warning("Not cleaning up instance '%s', retries exhausted", 217 inst.name) 218 return 219 220 logging.info("Instance '%s' was shutdown by the user, cleaning up instance", 221 inst.name) 222 op = opcodes.OpInstanceShutdown(instance_name=inst.name, 223 admin_state_source=constants.USER_SOURCE) 224 225 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 226 "Cleaning up instance %s" % inst.name, 227 utils.EpochNano())] 228 try: 229 cli.SubmitOpCode(op, cl=cl) 230 if notepad.NumberOfCleanupAttempts(inst.name): 231 notepad.RemoveInstance(inst.name) 232 except Exception: # pylint: disable=W0703 233 logging.exception("Error while cleaning up instance '%s'", inst.name) 234 notepad.RecordCleanupAttempt(inst.name)
235
236 237 -def _CheckInstances(cl, notepad, instances, locks):
238 """Make a pass over the list of instances, restarting downed ones. 239 240 """ 241 notepad.MaintainInstanceList(instances.keys()) 242 243 started = set() 244 245 for inst in instances.values(): 246 if inst.NeedsCleanup(): 247 _CleanupInstance(cl, notepad, inst, locks) 248 elif inst.status in BAD_STATES: 249 n = notepad.NumberOfRestartAttempts(inst.name) 250 251 if n > MAXTRIES: 252 logging.warning("Not restarting instance '%s', retries exhausted", 253 inst.name) 254 continue 255 256 if n == MAXTRIES: 257 notepad.RecordRestartAttempt(inst.name) 258 logging.error("Could not restart instance '%s' after %s attempts," 259 " giving up", inst.name, MAXTRIES) 260 continue 261 262 try: 263 logging.info("Restarting instance '%s' (attempt #%s)", 264 inst.name, n + 1) 265 inst.Restart(cl) 266 except Exception: # pylint: disable=W0703 267 logging.exception("Error while restarting instance '%s'", inst.name) 268 else: 269 started.add(inst.name) 270 271 notepad.RecordRestartAttempt(inst.name) 272 273 else: 274 if notepad.NumberOfRestartAttempts(inst.name): 275 notepad.RemoveInstance(inst.name) 276 if inst.status not in HELPLESS_STATES: 277 logging.info("Restart of instance '%s' succeeded", inst.name) 278 279 return started
280
281 282 -def _CheckDisks(cl, notepad, nodes, instances, started):
283 """Check all nodes for restarted ones. 284 285 """ 286 check_nodes = [] 287 288 for node in nodes.values(): 289 old = notepad.GetNodeBootID(node.name) 290 if not node.bootid: 291 # Bad node, not returning a boot id 292 if not node.offline: 293 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 294 node.name) 295 continue 296 297 if old != node.bootid: 298 # Node's boot ID has changed, probably through a reboot 299 check_nodes.append(node) 300 301 if check_nodes: 302 # Activate disks for all instances with any of the checked nodes as a 303 # secondary node. 304 for node in check_nodes: 305 for instance_name in node.secondaries: 306 try: 307 inst = instances[instance_name] 308 except KeyError: 309 logging.info("Can't find instance '%s', maybe it was ignored", 310 instance_name) 311 continue 312 313 if not inst.disks_active: 314 logging.info("Skipping disk activation for instance with not" 315 " activated disks '%s'", inst.name) 316 continue 317 318 if inst.name in started: 319 # we already tried to start the instance, which should have 320 # activated its drives (if they can be at all) 321 logging.debug("Skipping disk activation for instance '%s' as" 322 " it was already started", inst.name) 323 continue 324 325 try: 326 logging.info("Activating disks for instance '%s'", inst.name) 327 inst.ActivateDisks(cl) 328 except Exception: # pylint: disable=W0703 329 logging.exception("Error while activating disks for instance '%s'", 330 inst.name) 331 332 # Keep changed boot IDs 333 for node in check_nodes: 334 notepad.SetNodeBootID(node.name, node.bootid)
335
336 337 -def _CheckForOfflineNodes(nodes, instance):
338 """Checks if given instances has any secondary in offline status. 339 340 @param instance: The instance object 341 @return: True if any of the secondary is offline, False otherwise 342 343 """ 344 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
345
346 347 -def _VerifyDisks(cl, uuid, nodes, instances):
348 """Run a per-group "gnt-cluster verify-disks". 349 350 """ 351 op = opcodes.OpGroupVerifyDisks( 352 group_name=uuid, priority=constants.OP_PRIO_LOW) 353 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 354 "Verifying disks of group %s" % uuid, 355 utils.EpochNano())] 356 job_id = cl.SubmitJob([op]) 357 ((_, offline_disk_instances, _), ) = \ 358 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 359 cl.ArchiveJob(job_id) 360 361 if not offline_disk_instances: 362 # nothing to do 363 logging.debug("Verify-disks reported no offline disks, nothing to do") 364 return 365 366 logging.debug("Will activate disks for instance(s) %s", 367 utils.CommaJoin(offline_disk_instances)) 368 369 # We submit only one job, and wait for it. Not optimal, but this puts less 370 # load on the job queue. 371 job = [] 372 for name in offline_disk_instances: 373 try: 374 inst = instances[name] 375 except KeyError: 376 logging.info("Can't find instance '%s', maybe it was ignored", name) 377 continue 378 379 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 380 logging.info("Skipping instance '%s' because it is in a helpless state" 381 " or has offline secondaries", name) 382 continue 383 384 op = opcodes.OpInstanceActivateDisks(instance_name=name) 385 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, 386 "Activating disks for instance %s" % name, 387 utils.EpochNano())] 388 job.append(op) 389 390 if job: 391 job_id = cli.SendJob(job, cl=cl) 392 393 try: 394 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 395 except Exception: # pylint: disable=W0703 396 logging.exception("Error while activating disks")
397
398 399 -def IsRapiResponding(hostname):
400 """Connects to RAPI port and does a simple test. 401 402 Connects to RAPI port of hostname and does a simple test. At this time, the 403 test is GetVersion. 404 405 If RAPI responds with error code "401 Unauthorized", the test is successful, 406 because the aim of this function is to assess whether RAPI is responding, not 407 if it is accessible. 408 409 @type hostname: string 410 @param hostname: hostname of the node to connect to. 411 @rtype: bool 412 @return: Whether RAPI is working properly 413 414 """ 415 curl_config = rapi.client.GenericCurlConfig() 416 rapi_client = rapi.client.GanetiRapiClient(hostname, 417 curl_config_fn=curl_config) 418 try: 419 master_version = rapi_client.GetVersion() 420 except rapi.client.CertificateError, err: 421 logging.warning("RAPI certificate error: %s", err) 422 return False 423 except rapi.client.GanetiApiError, err: 424 if err.code == 401: 425 # Unauthorized, but RAPI is alive and responding 426 return True 427 else: 428 logging.warning("RAPI error: %s", err) 429 return False 430 else: 431 logging.debug("Reported RAPI version %s", master_version) 432 return master_version == constants.RAPI_VERSION
433
434 435 -def IsWconfdResponding():
436 """Probes an echo RPC to WConfD. 437 438 """ 439 probe_string = "ganeti watcher probe %d" % time.time() 440 441 try: 442 result = wconfd.Client().Echo(probe_string) 443 except Exception, err: # pylint: disable=W0703 444 logging.warning("WConfd connection error: %s", err) 445 return False 446 447 if result != probe_string: 448 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result) 449 return False 450 451 return True
452
453 454 -def ParseOptions():
455 """Parse the command line options. 456 457 @return: (options, args) as from OptionParser.parse_args() 458 459 """ 460 parser = OptionParser(description="Ganeti cluster watcher", 461 usage="%prog [-d]", 462 version="%%prog (ganeti) %s" % 463 constants.RELEASE_VERSION) 464 465 parser.add_option(cli.DEBUG_OPT) 466 parser.add_option(cli.NODEGROUP_OPT) 467 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 468 help="Autoarchive jobs older than this age (default" 469 " 6 hours)") 470 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 471 action="store_true", help="Ignore cluster pause setting") 472 parser.add_option("--wait-children", dest="wait_children", 473 action="store_true", help="Wait for child processes") 474 parser.add_option("--no-wait-children", dest="wait_children", 475 action="store_false", 476 help="Don't wait for child processes") 477 parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False, 478 action="store_true", help="Do not verify disk status") 479 parser.add_option("--rapi-ip", dest="rapi_ip", 480 default=constants.IP4_ADDRESS_LOCALHOST, 481 help="Use this IP to talk to RAPI.") 482 # See optparse documentation for why default values are not set by options 483 parser.set_defaults(wait_children=True) 484 options, args = parser.parse_args() 485 options.job_age = cli.ParseTimespec(options.job_age) 486 487 if args: 488 parser.error("No arguments expected") 489 490 return (options, args)
491
492 493 -def _WriteInstanceStatus(filename, data):
494 """Writes the per-group instance status file. 495 496 The entries are sorted. 497 498 @type filename: string 499 @param filename: Path to instance status file 500 @type data: list of tuple; (instance name as string, status as string) 501 @param data: Instance name and status 502 503 """ 504 logging.debug("Updating instance status file '%s' with %s instances", 505 filename, len(data)) 506 507 utils.WriteFile(filename, 508 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 509 sorted(data))))
510
511 512 -def _UpdateInstanceStatus(filename, instances):
513 """Writes an instance status file from L{Instance} objects. 514 515 @type filename: string 516 @param filename: Path to status file 517 @type instances: list of L{Instance} 518 519 """ 520 _WriteInstanceStatus(filename, [(inst.name, inst.status) 521 for inst in instances])
522
523 524 -def _ReadInstanceStatus(filename):
525 """Reads an instance status file. 526 527 @type filename: string 528 @param filename: Path to status file 529 @rtype: tuple; (None or number, list of lists containing instance name and 530 status) 531 @return: File's mtime and instance status contained in the file; mtime is 532 C{None} if file can't be read 533 534 """ 535 logging.debug("Reading per-group instance status from '%s'", filename) 536 537 statcb = utils.FileStatHelper() 538 try: 539 content = utils.ReadFile(filename, preread=statcb) 540 except EnvironmentError, err: 541 if err.errno == errno.ENOENT: 542 logging.error("Can't read '%s', does not exist (yet)", filename) 543 else: 544 logging.exception("Unable to read '%s', ignoring", filename) 545 return (None, None) 546 else: 547 return (statcb.st.st_mtime, [line.split(None, 1) 548 for line in content.splitlines()])
549
550 551 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
552 """Merges all per-group instance status files into a global one. 553 554 @type filename: string 555 @param filename: Path to global instance status file 556 @type pergroup_filename: string 557 @param pergroup_filename: Path to per-group status files, must contain "%s" 558 to be replaced with group UUID 559 @type groups: sequence 560 @param groups: UUIDs of known groups 561 562 """ 563 # Lock global status file in exclusive mode 564 lock = utils.FileLock.Open(filename) 565 try: 566 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 567 except errors.LockError, err: 568 # All per-group processes will lock and update the file. None of them 569 # should take longer than 10 seconds (the value of 570 # INSTANCE_STATUS_LOCK_TIMEOUT). 571 logging.error("Can't acquire lock on instance status file '%s', not" 572 " updating: %s", filename, err) 573 return 574 575 logging.debug("Acquired exclusive lock on '%s'", filename) 576 577 data = {} 578 579 # Load instance status from all groups 580 for group_uuid in groups: 581 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 582 583 if mtime is not None: 584 for (instance_name, status) in instdata: 585 data.setdefault(instance_name, []).append((mtime, status)) 586 587 # Select last update based on file mtime 588 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 589 for (instance_name, status) in data.items()] 590 591 # Write the global status file. Don't touch file after it's been 592 # updated--there is no lock anymore. 593 _WriteInstanceStatus(filename, inststatus)
594
595 596 -def GetLuxiClient(try_restart):
597 """Tries to connect to the luxi daemon. 598 599 @type try_restart: bool 600 @param try_restart: Whether to attempt to restart the master daemon 601 602 """ 603 try: 604 return cli.GetClient() 605 except errors.OpPrereqError, err: 606 # this is, from cli.GetClient, a not-master case 607 raise NotMasterError("Not on master node (%s)" % err) 608 609 except (rpcerr.NoMasterError, rpcerr.TimeoutError), err: 610 if not try_restart: 611 raise 612 613 logging.warning("Luxi daemon seems to be down (%s), trying to restart", 614 err) 615 616 if not utils.EnsureDaemon(constants.LUXID): 617 raise errors.GenericError("Can't start the master daemon") 618 619 # Retry the connection 620 return cli.GetClient()
621
622 623 -def _StartGroupChildren(cl, wait):
624 """Starts a new instance of the watcher for every node group. 625 626 """ 627 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 628 for arg in sys.argv) 629 630 result = cl.QueryGroups([], ["name", "uuid"], False) 631 632 children = [] 633 634 for (idx, (name, uuid)) in enumerate(result): 635 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 636 637 if idx > 0: 638 # Let's not kill the system 639 time.sleep(CHILD_PROCESS_DELAY) 640 641 logging.debug("Spawning child for group '%s' (%s), arguments %s", 642 name, uuid, args) 643 644 try: 645 # TODO: Should utils.StartDaemon be used instead? 646 pid = os.spawnv(os.P_NOWAIT, args[0], args) 647 except Exception: # pylint: disable=W0703 648 logging.exception("Failed to start child for group '%s' (%s)", 649 name, uuid) 650 else: 651 logging.debug("Started with PID %s", pid) 652 children.append(pid) 653 654 if wait: 655 for pid in children: 656 logging.debug("Waiting for child PID %s", pid) 657 try: 658 result = utils.RetryOnSignal(os.waitpid, pid, 0) 659 except EnvironmentError, err: 660 result = str(err) 661 662 logging.debug("Child PID %s exited with status %s", pid, result)
663
664 665 -def _ArchiveJobs(cl, age):
666 """Archives old jobs. 667 668 """ 669 (arch_count, left_count) = cl.AutoArchiveJobs(age) 670 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
671
672 673 -def _CheckMaster(cl):
674 """Ensures current host is master node. 675 676 """ 677 (master, ) = cl.QueryConfigValues(["master_node"]) 678 if master != netutils.Hostname.GetSysName(): 679 raise NotMasterError("This is not the master node")
680
681 682 @UsesRapiClient 683 -def _GlobalWatcher(opts):
684 """Main function for global watcher. 685 686 At the end child processes are spawned for every node group. 687 688 """ 689 StartNodeDaemons() 690 RunWatcherHooks() 691 692 # Run node maintenance in all cases, even if master, so that old masters can 693 # be properly cleaned up 694 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 695 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 696 697 try: 698 client = GetLuxiClient(True) 699 except NotMasterError: 700 # Don't proceed on non-master nodes 701 return constants.EXIT_SUCCESS 702 703 # we are on master now 704 utils.EnsureDaemon(constants.RAPI) 705 utils.EnsureDaemon(constants.WCONFD) 706 utils.EnsureDaemon(constants.MAINTD) 707 708 # If RAPI isn't responding to queries, try one restart 709 logging.debug("Attempting to talk to remote API on %s", 710 opts.rapi_ip) 711 if not IsRapiResponding(opts.rapi_ip): 712 logging.warning("Couldn't get answer from remote API, restaring daemon") 713 utils.StopDaemon(constants.RAPI) 714 utils.EnsureDaemon(constants.RAPI) 715 logging.debug("Second attempt to talk to remote API") 716 if not IsRapiResponding(opts.rapi_ip): 717 logging.fatal("RAPI is not responding") 718 logging.debug("Successfully talked to remote API") 719 720 # If WConfD isn't responding to queries, try one restart 721 logging.debug("Attempting to talk to WConfD") 722 if not IsWconfdResponding(): 723 logging.warning("WConfD not responsive, restarting daemon") 724 utils.StopDaemon(constants.WCONFD) 725 utils.EnsureDaemon(constants.WCONFD) 726 logging.debug("Second attempt to talk to WConfD") 727 if not IsWconfdResponding(): 728 logging.fatal("WConfD is not responding") 729 730 _CheckMaster(client) 731 _ArchiveJobs(client, opts.job_age) 732 733 # Spawn child processes for all node groups 734 _StartGroupChildren(client, opts.wait_children) 735 736 return constants.EXIT_SUCCESS
737
738 739 -def _GetGroupData(qcl, uuid):
740 """Retrieves instances and nodes per node group. 741 742 """ 743 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None) 744 745 prefix = "instance/" 746 prefix_len = len(prefix) 747 748 locked_instances = set() 749 750 for [[_, name], [_, lock]] in locks.data: 751 if name.startswith(prefix) and lock: 752 locked_instances.add(name[prefix_len:]) 753 754 queries = [ 755 (constants.QR_INSTANCE, 756 ["name", "status", "admin_state", "admin_state_source", "disks_active", 757 "snodes", "pnode.group.uuid", "snodes.group.uuid"], 758 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]), 759 (constants.QR_NODE, 760 ["name", "bootid", "offline"], 761 [qlang.OP_EQUAL, "group.uuid", uuid]), 762 ] 763 764 results = [] 765 for what, fields, qfilter in queries: 766 results.append(qcl.Query(what, fields, qfilter)) 767 768 results_data = map(operator.attrgetter("data"), results) 769 770 # Ensure results are tuples with two values 771 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 772 773 # Extract values ignoring result status 774 (raw_instances, raw_nodes) = [[map(compat.snd, values) 775 for values in res] 776 for res in results_data] 777 778 secondaries = {} 779 instances = [] 780 781 # Load all instances 782 for (name, status, config_state, config_state_source, disks_active, snodes, 783 pnode_group_uuid, snodes_group_uuid) in raw_instances: 784 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 785 logging.error("Ignoring split instance '%s', primary group %s, secondary" 786 " groups %s", name, pnode_group_uuid, 787 utils.CommaJoin(snodes_group_uuid)) 788 else: 789 instances.append(Instance(name, status, config_state, config_state_source, 790 disks_active, snodes)) 791 792 for node in snodes: 793 secondaries.setdefault(node, set()).add(name) 794 795 # Load all nodes 796 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 797 for (name, bootid, offline) in raw_nodes] 798 799 return (dict((node.name, node) for node in nodes), 800 dict((inst.name, inst) for inst in instances), 801 locked_instances)
802
803 804 -def _LoadKnownGroups():
805 """Returns a list of all node groups known by L{ssconf}. 806 807 """ 808 groups = ssconf.SimpleStore().GetNodegroupList() 809 810 result = list(line.split(None, 1)[0] for line in groups 811 if line.strip()) 812 813 if not compat.all(map(utils.UUID_RE.match, result)): 814 raise errors.GenericError("Ssconf contains invalid group UUID") 815 816 return result
817
818 819 -def _GroupWatcher(opts):
820 """Main function for per-group watcher process. 821 822 """ 823 group_uuid = opts.nodegroup.lower() 824 825 if not utils.UUID_RE.match(group_uuid): 826 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 827 " got '%s'" % 828 (cli.NODEGROUP_OPT_NAME, group_uuid)) 829 830 logging.info("Watcher for node group '%s'", group_uuid) 831 832 known_groups = _LoadKnownGroups() 833 834 # Check if node group is known 835 if group_uuid not in known_groups: 836 raise errors.GenericError("Node group '%s' is not known by ssconf" % 837 group_uuid) 838 839 # Group UUID has been verified and should not contain any dangerous 840 # characters 841 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 842 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 843 844 logging.debug("Using state file %s", state_path) 845 846 # Global watcher 847 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 848 if not statefile: 849 return constants.EXIT_FAILURE 850 851 notepad = state.WatcherState(statefile) # pylint: disable=E0602 852 try: 853 # Connect to master daemon 854 client = GetLuxiClient(False) 855 856 _CheckMaster(client) 857 858 (nodes, instances, locks) = _GetGroupData(client, group_uuid) 859 860 # Update per-group instance status file 861 _UpdateInstanceStatus(inst_status_path, instances.values()) 862 863 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 864 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 865 known_groups) 866 867 started = _CheckInstances(client, notepad, instances, locks) 868 _CheckDisks(client, notepad, nodes, instances, started) 869 if not opts.no_verify_disks: 870 _VerifyDisks(client, group_uuid, nodes, instances) 871 except Exception, err: 872 logging.info("Not updating status file due to failure: %s", err) 873 raise 874 else: 875 # Save changes for next run 876 notepad.Save(state_path) 877 878 return constants.EXIT_SUCCESS
879
880 881 -def Main():
882 """Main function. 883 884 """ 885 (options, _) = ParseOptions() 886 887 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 888 debug=options.debug, stderr_logging=options.debug) 889 890 if ShouldPause() and not options.ignore_pause: 891 logging.debug("Pause has been set, exiting") 892 return constants.EXIT_SUCCESS 893 894 # Try to acquire global watcher lock in shared mode 895 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 896 try: 897 lock.Shared(blocking=False) 898 except (EnvironmentError, errors.LockError), err: 899 logging.error("Can't acquire lock on %s: %s", 900 pathutils.WATCHER_LOCK_FILE, err) 901 return constants.EXIT_SUCCESS 902 903 if options.nodegroup is None: 904 fn = _GlobalWatcher 905 else: 906 # Per-nodegroup watcher 907 fn = _GroupWatcher 908 909 try: 910 return fn(options) 911 except (SystemExit, KeyboardInterrupt): 912 raise 913 except NotMasterError: 914 logging.debug("Not master, exiting") 915 return constants.EXIT_NOTMASTER 916 except errors.ResolverError, err: 917 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 918 return constants.EXIT_NODESETUP_ERROR 919 except errors.JobQueueFull: 920 logging.error("Job queue is full, can't query cluster state") 921 except errors.JobQueueDrainError: 922 logging.error("Job queue is drained, can't maintain cluster state") 923 except Exception, err: 924 logging.exception(str(err)) 925 return constants.EXIT_FAILURE 926 927 return constants.EXIT_SUCCESS
928