Package ganeti :: Package watcher
[hide private]
[frames] | no frames]

Source Code for Package ganeti.watcher

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Tool to restart erroneously downed virtual machines. 
 32   
 33  This program and set of classes implement a watchdog to restart 
 34  virtual machines in a Ganeti cluster that have crashed or been killed 
 35  by a node reboot.  Run from cron or similar. 
 36   
 37  """ 
 38   
 39  import os 
 40  import os.path 
 41  import sys 
 42  import time 
 43  import logging 
 44  import operator 
 45  import errno 
 46  from optparse import OptionParser 
 47   
 48  from ganeti import utils 
 49  from ganeti import constants 
 50  from ganeti import compat 
 51  from ganeti import errors 
 52  from ganeti import opcodes 
 53  from ganeti import cli 
 54  import ganeti.rpc.node as rpc 
 55  import ganeti.rpc.errors as rpcerr 
 56  from ganeti import rapi 
 57  from ganeti import netutils 
 58  from ganeti import qlang 
 59  from ganeti import objects 
 60  from ganeti import ssconf 
 61  from ganeti import ht 
 62  from ganeti import pathutils 
 63   
 64  import ganeti.rapi.client # pylint: disable=W0611 
 65  from ganeti.rapi.client import UsesRapiClient 
 66   
 67  from ganeti.watcher import nodemaint 
 68  from ganeti.watcher import state 
 69   
 70   
 71  MAXTRIES = 5 
 72  BAD_STATES = compat.UniqueFrozenset([ 
 73    constants.INSTST_ERRORDOWN, 
 74    ]) 
 75  HELPLESS_STATES = compat.UniqueFrozenset([ 
 76    constants.INSTST_NODEDOWN, 
 77    constants.INSTST_NODEOFFLINE, 
 78    ]) 
 79  NOTICE = "NOTICE" 
 80  ERROR = "ERROR" 
 81   
 82  #: Number of seconds to wait between starting child processes for node groups 
 83  CHILD_PROCESS_DELAY = 1.0 
 84   
 85  #: How many seconds to wait for instance status file lock 
 86  INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 
87 88 89 -class NotMasterError(errors.GenericError):
90 """Exception raised when this host is not the master."""
91
92 93 -def ShouldPause():
94 """Check whether we should pause. 95 96 """ 97 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
98
99 100 -def StartNodeDaemons():
101 """Start all the daemons that should be running on all nodes. 102 103 """ 104 # on master or not, try to start the node daemon 105 utils.EnsureDaemon(constants.NODED) 106 # start confd as well. On non candidates it will be in disabled mode. 107 if constants.ENABLE_CONFD: 108 utils.EnsureDaemon(constants.CONFD) 109 # start mond as well: all nodes need monitoring 110 if constants.ENABLE_MOND: 111 utils.EnsureDaemon(constants.MOND) 112 # start kvmd, which will quit if not needed to run 113 utils.EnsureDaemon(constants.KVMD)
114
115 116 -def RunWatcherHooks():
117 """Run the watcher hooks. 118 119 """ 120 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, 121 constants.HOOKS_NAME_WATCHER) 122 if not os.path.isdir(hooks_dir): 123 return 124 125 try: 126 results = utils.RunParts(hooks_dir) 127 except Exception, err: # pylint: disable=W0703 128 logging.exception("RunParts %s failed: %s", hooks_dir, err) 129 return 130 131 for (relname, status, runresult) in results: 132 if status == constants.RUNPARTS_SKIP: 133 logging.debug("Watcher hook %s: skipped", relname) 134 elif status == constants.RUNPARTS_ERR: 135 logging.warning("Watcher hook %s: error (%s)", relname, runresult) 136 elif status == constants.RUNPARTS_RUN: 137 if runresult.failed: 138 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", 139 relname, runresult.exit_code, runresult.output) 140 else: 141 logging.debug("Watcher hook %s: success (output: %s)", relname, 142 runresult.output) 143 else: 144 raise errors.ProgrammerError("Unknown status %s returned by RunParts", 145 status)
146
147 148 -class Instance(object):
149 """Abstraction for a Virtual Machine instance. 150 151 """
152 - def __init__(self, name, status, config_state, config_state_source, 153 disks_active, snodes):
154 self.name = name 155 self.status = status 156 self.config_state = config_state 157 self.config_state_source = config_state_source 158 self.disks_active = disks_active 159 self.snodes = snodes
160
161 - def Restart(self, cl):
162 """Encapsulates the start of an instance. 163 164 """ 165 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) 166 cli.SubmitOpCode(op, cl=cl)
167
168 - def ActivateDisks(self, cl):
169 """Encapsulates the activation of all disks of an instance. 170 171 """ 172 op = opcodes.OpInstanceActivateDisks(instance_name=self.name) 173 cli.SubmitOpCode(op, cl=cl)
174
175 - def NeedsCleanup(self):
176 """Determines whether the instance needs cleanup. 177 178 Determines whether the instance needs cleanup after having been 179 shutdown by the user. 180 181 @rtype: bool 182 @return: True if the instance needs cleanup, False otherwise. 183 184 """ 185 return self.status == constants.INSTST_USERDOWN and \ 186 self.config_state != constants.ADMINST_DOWN
187
188 189 -class Node(object):
190 """Data container representing cluster node. 191 192 """
193 - def __init__(self, name, bootid, offline, secondaries):
194 """Initializes this class. 195 196 """ 197 self.name = name 198 self.bootid = bootid 199 self.offline = offline 200 self.secondaries = secondaries
201
202 203 -def _CleanupInstance(cl, notepad, inst, locks):
204 n = notepad.NumberOfCleanupAttempts(inst.name) 205 206 if inst.name in locks: 207 logging.info("Not cleaning up instance '%s', instance is locked", 208 inst.name) 209 return 210 211 if n > MAXTRIES: 212 logging.warning("Not cleaning up instance '%s', retries exhausted", 213 inst.name) 214 return 215 216 logging.info("Instance '%s' was shutdown by the user, cleaning up instance", 217 inst.name) 218 op = opcodes.OpInstanceShutdown(instance_name=inst.name, 219 admin_state_source=constants.USER_SOURCE) 220 221 try: 222 cli.SubmitOpCode(op, cl=cl) 223 if notepad.NumberOfCleanupAttempts(inst.name): 224 notepad.RemoveInstance(inst.name) 225 except Exception: # pylint: disable=W0703 226 logging.exception("Error while cleaning up instance '%s'", inst.name) 227 notepad.RecordCleanupAttempt(inst.name)
228
229 230 -def _CheckInstances(cl, notepad, instances, locks):
231 """Make a pass over the list of instances, restarting downed ones. 232 233 """ 234 notepad.MaintainInstanceList(instances.keys()) 235 236 started = set() 237 238 for inst in instances.values(): 239 if inst.NeedsCleanup(): 240 _CleanupInstance(cl, notepad, inst, locks) 241 elif inst.status in BAD_STATES: 242 n = notepad.NumberOfRestartAttempts(inst.name) 243 244 if n > MAXTRIES: 245 logging.warning("Not restarting instance '%s', retries exhausted", 246 inst.name) 247 continue 248 249 if n == MAXTRIES: 250 notepad.RecordRestartAttempt(inst.name) 251 logging.error("Could not restart instance '%s' after %s attempts," 252 " giving up", inst.name, MAXTRIES) 253 continue 254 255 try: 256 logging.info("Restarting instance '%s' (attempt #%s)", 257 inst.name, n + 1) 258 inst.Restart(cl) 259 except Exception: # pylint: disable=W0703 260 logging.exception("Error while restarting instance '%s'", inst.name) 261 else: 262 started.add(inst.name) 263 264 notepad.RecordRestartAttempt(inst.name) 265 266 else: 267 if notepad.NumberOfRestartAttempts(inst.name): 268 notepad.RemoveInstance(inst.name) 269 if inst.status not in HELPLESS_STATES: 270 logging.info("Restart of instance '%s' succeeded", inst.name) 271 272 return started
273
274 275 -def _CheckDisks(cl, notepad, nodes, instances, started):
276 """Check all nodes for restarted ones. 277 278 """ 279 check_nodes = [] 280 281 for node in nodes.values(): 282 old = notepad.GetNodeBootID(node.name) 283 if not node.bootid: 284 # Bad node, not returning a boot id 285 if not node.offline: 286 logging.debug("Node '%s' missing boot ID, skipping secondary checks", 287 node.name) 288 continue 289 290 if old != node.bootid: 291 # Node's boot ID has changed, probably through a reboot 292 check_nodes.append(node) 293 294 if check_nodes: 295 # Activate disks for all instances with any of the checked nodes as a 296 # secondary node. 297 for node in check_nodes: 298 for instance_name in node.secondaries: 299 try: 300 inst = instances[instance_name] 301 except KeyError: 302 logging.info("Can't find instance '%s', maybe it was ignored", 303 instance_name) 304 continue 305 306 if not inst.disks_active: 307 logging.info("Skipping disk activation for instance with not" 308 " activated disks '%s'", inst.name) 309 continue 310 311 if inst.name in started: 312 # we already tried to start the instance, which should have 313 # activated its drives (if they can be at all) 314 logging.debug("Skipping disk activation for instance '%s' as" 315 " it was already started", inst.name) 316 continue 317 318 try: 319 logging.info("Activating disks for instance '%s'", inst.name) 320 inst.ActivateDisks(cl) 321 except Exception: # pylint: disable=W0703 322 logging.exception("Error while activating disks for instance '%s'", 323 inst.name) 324 325 # Keep changed boot IDs 326 for node in check_nodes: 327 notepad.SetNodeBootID(node.name, node.bootid)
328
329 330 -def _CheckForOfflineNodes(nodes, instance):
331 """Checks if given instances has any secondary in offline status. 332 333 @param instance: The instance object 334 @return: True if any of the secondary is offline, False otherwise 335 336 """ 337 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
338
339 340 -def _VerifyDisks(cl, uuid, nodes, instances):
341 """Run a per-group "gnt-cluster verify-disks". 342 343 """ 344 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks( 345 group_name=uuid, priority=constants.OP_PRIO_LOW)]) 346 ((_, offline_disk_instances, _), ) = \ 347 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 348 cl.ArchiveJob(job_id) 349 350 if not offline_disk_instances: 351 # nothing to do 352 logging.debug("Verify-disks reported no offline disks, nothing to do") 353 return 354 355 logging.debug("Will activate disks for instance(s) %s", 356 utils.CommaJoin(offline_disk_instances)) 357 358 # We submit only one job, and wait for it. Not optimal, but this puts less 359 # load on the job queue. 360 job = [] 361 for name in offline_disk_instances: 362 try: 363 inst = instances[name] 364 except KeyError: 365 logging.info("Can't find instance '%s', maybe it was ignored", name) 366 continue 367 368 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): 369 logging.info("Skipping instance '%s' because it is in a helpless state" 370 " or has offline secondaries", name) 371 continue 372 373 job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) 374 375 if job: 376 job_id = cli.SendJob(job, cl=cl) 377 378 try: 379 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) 380 except Exception: # pylint: disable=W0703 381 logging.exception("Error while activating disks")
382
383 384 -def IsRapiResponding(hostname):
385 """Connects to RAPI port and does a simple test. 386 387 Connects to RAPI port of hostname and does a simple test. At this time, the 388 test is GetVersion. 389 390 If RAPI responds with error code "401 Unauthorized", the test is successful, 391 because the aim of this function is to assess whether RAPI is responding, not 392 if it is accessible. 393 394 @type hostname: string 395 @param hostname: hostname of the node to connect to. 396 @rtype: bool 397 @return: Whether RAPI is working properly 398 399 """ 400 curl_config = rapi.client.GenericCurlConfig() 401 rapi_client = rapi.client.GanetiRapiClient(hostname, 402 curl_config_fn=curl_config) 403 try: 404 master_version = rapi_client.GetVersion() 405 except rapi.client.CertificateError, err: 406 logging.warning("RAPI certificate error: %s", err) 407 return False 408 except rapi.client.GanetiApiError, err: 409 if err.code == 401: 410 # Unauthorized, but RAPI is alive and responding 411 return True 412 else: 413 logging.warning("RAPI error: %s", err) 414 return False 415 else: 416 logging.debug("Reported RAPI version %s", master_version) 417 return master_version == constants.RAPI_VERSION
418
419 420 -def ParseOptions():
421 """Parse the command line options. 422 423 @return: (options, args) as from OptionParser.parse_args() 424 425 """ 426 parser = OptionParser(description="Ganeti cluster watcher", 427 usage="%prog [-d]", 428 version="%%prog (ganeti) %s" % 429 constants.RELEASE_VERSION) 430 431 parser.add_option(cli.DEBUG_OPT) 432 parser.add_option(cli.NODEGROUP_OPT) 433 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, 434 help="Autoarchive jobs older than this age (default" 435 " 6 hours)") 436 parser.add_option("--ignore-pause", dest="ignore_pause", default=False, 437 action="store_true", help="Ignore cluster pause setting") 438 parser.add_option("--wait-children", dest="wait_children", 439 action="store_true", help="Wait for child processes") 440 parser.add_option("--no-wait-children", dest="wait_children", 441 action="store_false", 442 help="Don't wait for child processes") 443 # See optparse documentation for why default values are not set by options 444 parser.set_defaults(wait_children=True) 445 options, args = parser.parse_args() 446 options.job_age = cli.ParseTimespec(options.job_age) 447 448 if args: 449 parser.error("No arguments expected") 450 451 return (options, args)
452
453 454 -def _WriteInstanceStatus(filename, data):
455 """Writes the per-group instance status file. 456 457 The entries are sorted. 458 459 @type filename: string 460 @param filename: Path to instance status file 461 @type data: list of tuple; (instance name as string, status as string) 462 @param data: Instance name and status 463 464 """ 465 logging.debug("Updating instance status file '%s' with %s instances", 466 filename, len(data)) 467 468 utils.WriteFile(filename, 469 data="".join(map(compat.partial(operator.mod, "%s %s\n"), 470 sorted(data))))
471
472 473 -def _UpdateInstanceStatus(filename, instances):
474 """Writes an instance status file from L{Instance} objects. 475 476 @type filename: string 477 @param filename: Path to status file 478 @type instances: list of L{Instance} 479 480 """ 481 _WriteInstanceStatus(filename, [(inst.name, inst.status) 482 for inst in instances])
483
484 485 -def _ReadInstanceStatus(filename):
486 """Reads an instance status file. 487 488 @type filename: string 489 @param filename: Path to status file 490 @rtype: tuple; (None or number, list of lists containing instance name and 491 status) 492 @return: File's mtime and instance status contained in the file; mtime is 493 C{None} if file can't be read 494 495 """ 496 logging.debug("Reading per-group instance status from '%s'", filename) 497 498 statcb = utils.FileStatHelper() 499 try: 500 content = utils.ReadFile(filename, preread=statcb) 501 except EnvironmentError, err: 502 if err.errno == errno.ENOENT: 503 logging.error("Can't read '%s', does not exist (yet)", filename) 504 else: 505 logging.exception("Unable to read '%s', ignoring", filename) 506 return (None, None) 507 else: 508 return (statcb.st.st_mtime, [line.split(None, 1) 509 for line in content.splitlines()])
510
511 512 -def _MergeInstanceStatus(filename, pergroup_filename, groups):
513 """Merges all per-group instance status files into a global one. 514 515 @type filename: string 516 @param filename: Path to global instance status file 517 @type pergroup_filename: string 518 @param pergroup_filename: Path to per-group status files, must contain "%s" 519 to be replaced with group UUID 520 @type groups: sequence 521 @param groups: UUIDs of known groups 522 523 """ 524 # Lock global status file in exclusive mode 525 lock = utils.FileLock.Open(filename) 526 try: 527 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) 528 except errors.LockError, err: 529 # All per-group processes will lock and update the file. None of them 530 # should take longer than 10 seconds (the value of 531 # INSTANCE_STATUS_LOCK_TIMEOUT). 532 logging.error("Can't acquire lock on instance status file '%s', not" 533 " updating: %s", filename, err) 534 return 535 536 logging.debug("Acquired exclusive lock on '%s'", filename) 537 538 data = {} 539 540 # Load instance status from all groups 541 for group_uuid in groups: 542 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) 543 544 if mtime is not None: 545 for (instance_name, status) in instdata: 546 data.setdefault(instance_name, []).append((mtime, status)) 547 548 # Select last update based on file mtime 549 inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) 550 for (instance_name, status) in data.items()] 551 552 # Write the global status file. Don't touch file after it's been 553 # updated--there is no lock anymore. 554 _WriteInstanceStatus(filename, inststatus)
555
556 557 -def GetLuxiClient(try_restart, query=False):
558 """Tries to connect to the master daemon. 559 560 @type try_restart: bool 561 @param try_restart: Whether to attempt to restart the master daemon 562 563 """ 564 try: 565 return cli.GetClient(query=query) 566 except errors.OpPrereqError, err: 567 # this is, from cli.GetClient, a not-master case 568 raise NotMasterError("Not on master node (%s)" % err) 569 570 except rpcerr.NoMasterError, err: 571 if not try_restart: 572 raise 573 574 logging.warning("Master daemon seems to be down (%s), trying to restart", 575 err) 576 577 if not utils.EnsureDaemon(constants.MASTERD): 578 raise errors.GenericError("Can't start the master daemon") 579 580 # Retry the connection 581 return cli.GetClient(query=query)
582
583 584 -def _StartGroupChildren(cl, wait):
585 """Starts a new instance of the watcher for every node group. 586 587 """ 588 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) 589 for arg in sys.argv) 590 591 result = cl.QueryGroups([], ["name", "uuid"], False) 592 593 children = [] 594 595 for (idx, (name, uuid)) in enumerate(result): 596 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] 597 598 if idx > 0: 599 # Let's not kill the system 600 time.sleep(CHILD_PROCESS_DELAY) 601 602 logging.debug("Spawning child for group '%s' (%s), arguments %s", 603 name, uuid, args) 604 605 try: 606 # TODO: Should utils.StartDaemon be used instead? 607 pid = os.spawnv(os.P_NOWAIT, args[0], args) 608 except Exception: # pylint: disable=W0703 609 logging.exception("Failed to start child for group '%s' (%s)", 610 name, uuid) 611 else: 612 logging.debug("Started with PID %s", pid) 613 children.append(pid) 614 615 if wait: 616 for pid in children: 617 logging.debug("Waiting for child PID %s", pid) 618 try: 619 result = utils.RetryOnSignal(os.waitpid, pid, 0) 620 except EnvironmentError, err: 621 result = str(err) 622 623 logging.debug("Child PID %s exited with status %s", pid, result)
624
625 626 -def _ArchiveJobs(cl, age):
627 """Archives old jobs. 628 629 """ 630 (arch_count, left_count) = cl.AutoArchiveJobs(age) 631 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
632
633 634 -def _CheckMaster(cl):
635 """Ensures current host is master node. 636 637 """ 638 (master, ) = cl.QueryConfigValues(["master_node"]) 639 if master != netutils.Hostname.GetSysName(): 640 raise NotMasterError("This is not the master node")
641
642 643 @UsesRapiClient 644 -def _GlobalWatcher(opts):
645 """Main function for global watcher. 646 647 At the end child processes are spawned for every node group. 648 649 """ 650 StartNodeDaemons() 651 RunWatcherHooks() 652 653 # Run node maintenance in all cases, even if master, so that old masters can 654 # be properly cleaned up 655 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 656 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 657 658 try: 659 client = GetLuxiClient(True) 660 query_client = GetLuxiClient(True, query=True) 661 except NotMasterError: 662 # Don't proceed on non-master nodes 663 return constants.EXIT_SUCCESS 664 665 # we are on master now 666 utils.EnsureDaemon(constants.RAPI) 667 668 # If RAPI isn't responding to queries, try one restart 669 logging.debug("Attempting to talk to remote API on %s", 670 constants.IP4_ADDRESS_LOCALHOST) 671 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 672 logging.warning("Couldn't get answer from remote API, restaring daemon") 673 utils.StopDaemon(constants.RAPI) 674 utils.EnsureDaemon(constants.RAPI) 675 logging.debug("Second attempt to talk to remote API") 676 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): 677 logging.fatal("RAPI is not responding") 678 logging.debug("Successfully talked to remote API") 679 680 _CheckMaster(client) 681 _ArchiveJobs(client, opts.job_age) 682 683 # Spawn child processes for all node groups 684 _StartGroupChildren(query_client, opts.wait_children) 685 686 return constants.EXIT_SUCCESS
687
688 689 -def _GetGroupData(qcl, uuid):
690 """Retrieves instances and nodes per node group. 691 692 """ 693 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None) 694 695 prefix = "instance/" 696 prefix_len = len(prefix) 697 698 locked_instances = set() 699 700 for [[_, name], [_, lock]] in locks.data: 701 if name.startswith(prefix) and lock: 702 locked_instances.add(name[prefix_len:]) 703 704 queries = [ 705 (constants.QR_INSTANCE, 706 ["name", "status", "admin_state", "admin_state_source", "disks_active", 707 "snodes", "pnode.group.uuid", "snodes.group.uuid"], 708 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]), 709 (constants.QR_NODE, 710 ["name", "bootid", "offline"], 711 [qlang.OP_EQUAL, "group.uuid", uuid]), 712 ] 713 714 results = [] 715 for what, fields, qfilter in queries: 716 results.append(qcl.Query(what, fields, qfilter)) 717 718 results_data = map(operator.attrgetter("data"), results) 719 720 # Ensure results are tuples with two values 721 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) 722 723 # Extract values ignoring result status 724 (raw_instances, raw_nodes) = [[map(compat.snd, values) 725 for values in res] 726 for res in results_data] 727 728 secondaries = {} 729 instances = [] 730 731 # Load all instances 732 for (name, status, config_state, config_state_source, disks_active, snodes, 733 pnode_group_uuid, snodes_group_uuid) in raw_instances: 734 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): 735 logging.error("Ignoring split instance '%s', primary group %s, secondary" 736 " groups %s", name, pnode_group_uuid, 737 utils.CommaJoin(snodes_group_uuid)) 738 else: 739 instances.append(Instance(name, status, config_state, config_state_source, 740 disks_active, snodes)) 741 742 for node in snodes: 743 secondaries.setdefault(node, set()).add(name) 744 745 # Load all nodes 746 nodes = [Node(name, bootid, offline, secondaries.get(name, set())) 747 for (name, bootid, offline) in raw_nodes] 748 749 return (dict((node.name, node) for node in nodes), 750 dict((inst.name, inst) for inst in instances), 751 locked_instances)
752
753 754 -def _LoadKnownGroups():
755 """Returns a list of all node groups known by L{ssconf}. 756 757 """ 758 groups = ssconf.SimpleStore().GetNodegroupList() 759 760 result = list(line.split(None, 1)[0] for line in groups 761 if line.strip()) 762 763 if not compat.all(map(utils.UUID_RE.match, result)): 764 raise errors.GenericError("Ssconf contains invalid group UUID") 765 766 return result
767
768 769 -def _GroupWatcher(opts):
770 """Main function for per-group watcher process. 771 772 """ 773 group_uuid = opts.nodegroup.lower() 774 775 if not utils.UUID_RE.match(group_uuid): 776 raise errors.GenericError("Node group parameter (%s) must be given a UUID," 777 " got '%s'" % 778 (cli.NODEGROUP_OPT_NAME, group_uuid)) 779 780 logging.info("Watcher for node group '%s'", group_uuid) 781 782 known_groups = _LoadKnownGroups() 783 784 # Check if node group is known 785 if group_uuid not in known_groups: 786 raise errors.GenericError("Node group '%s' is not known by ssconf" % 787 group_uuid) 788 789 # Group UUID has been verified and should not contain any dangerous 790 # characters 791 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid 792 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid 793 794 logging.debug("Using state file %s", state_path) 795 796 # Global watcher 797 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 798 if not statefile: 799 return constants.EXIT_FAILURE 800 801 notepad = state.WatcherState(statefile) # pylint: disable=E0602 802 try: 803 # Connect to master daemon 804 client = GetLuxiClient(False) 805 query_client = GetLuxiClient(False, query=True) 806 807 _CheckMaster(client) 808 809 (nodes, instances, locks) = _GetGroupData(query_client, group_uuid) 810 811 # Update per-group instance status file 812 _UpdateInstanceStatus(inst_status_path, instances.values()) 813 814 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, 815 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, 816 known_groups) 817 818 started = _CheckInstances(client, notepad, instances, locks) 819 _CheckDisks(client, notepad, nodes, instances, started) 820 _VerifyDisks(client, group_uuid, nodes, instances) 821 except Exception, err: 822 logging.info("Not updating status file due to failure: %s", err) 823 raise 824 else: 825 # Save changes for next run 826 notepad.Save(state_path) 827 828 return constants.EXIT_SUCCESS
829
830 831 -def Main():
832 """Main function. 833 834 """ 835 (options, _) = ParseOptions() 836 837 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], 838 debug=options.debug, stderr_logging=options.debug) 839 840 if ShouldPause() and not options.ignore_pause: 841 logging.debug("Pause has been set, exiting") 842 return constants.EXIT_SUCCESS 843 844 # Try to acquire global watcher lock in shared mode 845 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) 846 try: 847 lock.Shared(blocking=False) 848 except (EnvironmentError, errors.LockError), err: 849 logging.error("Can't acquire lock on %s: %s", 850 pathutils.WATCHER_LOCK_FILE, err) 851 return constants.EXIT_SUCCESS 852 853 if options.nodegroup is None: 854 fn = _GlobalWatcher 855 else: 856 # Per-nodegroup watcher 857 fn = _GroupWatcher 858 859 try: 860 return fn(options) 861 except (SystemExit, KeyboardInterrupt): 862 raise 863 except NotMasterError: 864 logging.debug("Not master, exiting") 865 return constants.EXIT_NOTMASTER 866 except errors.ResolverError, err: 867 logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) 868 return constants.EXIT_NODESETUP_ERROR 869 except errors.JobQueueFull: 870 logging.error("Job queue is full, can't query cluster state") 871 except errors.JobQueueDrainError: 872 logging.error("Job queue is drained, can't maintain cluster state") 873 except Exception, err: 874 logging.exception(str(err)) 875 return constants.EXIT_FAILURE 876 877 return constants.EXIT_SUCCESS
878