Package ganeti :: Package hypervisor :: Package hv_kvm :: Module monitor
[hide private]
[frames] | no frames]

Source Code for Module ganeti.hypervisor.hv_kvm.monitor

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Qemu monitor control classes 
 32   
 33  """ 
 34   
 35   
 36  import os 
 37  import stat 
 38  import errno 
 39  import socket 
 40  import StringIO 
 41  import logging 
 42  try: 
 43    import fdsend   # pylint: disable=F0401 
 44  except ImportError: 
 45    fdsend = None 
 46   
 47  from bitarray import bitarray 
 48   
 49  from ganeti import errors 
 50  from ganeti import utils 
 51  from ganeti import serializer 
52 53 54 -class QmpCommandNotSupported(errors.HypervisorError):
55 """QMP command not supported by the monitor. 56 57 This is raised in case a QmpMonitor instance is asked to execute a command 58 not supported by the instance. 59 60 This is a KVM-specific exception, intended to assist in falling back to using 61 the human monitor for operations QMP does not support. 62 63 """ 64 pass
65
66 67 -class QmpMessage(object):
68 """QEMU Messaging Protocol (QMP) message. 69 70 """
71 - def __init__(self, data):
72 """Creates a new QMP message based on the passed data. 73 74 """ 75 if not isinstance(data, dict): 76 raise TypeError("QmpMessage must be initialized with a dict") 77 78 self.data = data
79
80 - def __getitem__(self, field_name):
81 """Get the value of the required field if present, or None. 82 83 Overrides the [] operator to provide access to the message data, 84 returning None if the required item is not in the message 85 @return: the value of the field_name field, or None if field_name 86 is not contained in the message 87 88 """ 89 return self.data.get(field_name, None)
90
91 - def __setitem__(self, field_name, field_value):
92 """Set the value of the required field_name to field_value. 93 94 """ 95 self.data[field_name] = field_value
96
97 - def __len__(self):
98 """Return the number of fields stored in this QmpMessage. 99 100 """ 101 return len(self.data)
102
103 - def __delitem__(self, key):
104 """Delete the specified element from the QmpMessage. 105 106 """ 107 del(self.data[key])
108 109 @staticmethod
110 - def BuildFromJsonString(json_string):
111 """Build a QmpMessage from a JSON encoded string. 112 113 @type json_string: str 114 @param json_string: JSON string representing the message 115 @rtype: L{QmpMessage} 116 @return: a L{QmpMessage} built from json_string 117 118 """ 119 # Parse the string 120 data = serializer.LoadJson(json_string) 121 return QmpMessage(data)
122
123 - def __str__(self):
124 # The protocol expects the JSON object to be sent as a single line. 125 return serializer.DumpJson(self.data)
126
127 - def __eq__(self, other):
128 # When comparing two QmpMessages, we are interested in comparing 129 # their internal representation of the message data 130 return self.data == other.data
131
132 133 -class MonitorSocket(object):
134 _SOCKET_TIMEOUT = 5 135
136 - def __init__(self, monitor_filename):
137 """Instantiates the MonitorSocket object. 138 139 @type monitor_filename: string 140 @param monitor_filename: the filename of the UNIX raw socket on which the 141 monitor (QMP or simple one) is listening 142 143 """ 144 self.monitor_filename = monitor_filename 145 self._connected = False
146
147 - def _check_socket(self):
148 sock_stat = None 149 try: 150 sock_stat = os.stat(self.monitor_filename) 151 except EnvironmentError, err: 152 if err.errno == errno.ENOENT: 153 raise errors.HypervisorError("No monitor socket found") 154 else: 155 raise errors.HypervisorError("Error checking monitor socket: %s", 156 utils.ErrnoOrStr(err)) 157 if not stat.S_ISSOCK(sock_stat.st_mode): 158 raise errors.HypervisorError("Monitor socket is not a socket")
159
160 - def _check_connection(self):
161 """Make sure that the connection is established. 162 163 """ 164 if not self._connected: 165 raise errors.ProgrammerError("To use a MonitorSocket you need to first" 166 " invoke connect() on it")
167
168 - def connect(self):
169 """Connect to the monitor socket if not already connected. 170 171 """ 172 if not self._connected: 173 self._connect()
174
175 - def is_connected(self):
176 """Return whether there is a connection to the socket or not. 177 178 """ 179 return self._connected
180
181 - def _connect(self):
182 """Connects to the monitor. 183 184 Connects to the UNIX socket 185 186 @raise errors.HypervisorError: when there are communication errors 187 188 """ 189 if self._connected: 190 raise errors.ProgrammerError("Cannot connect twice") 191 192 self._check_socket() 193 194 # Check file existance/stuff 195 try: 196 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 197 # We want to fail if the server doesn't send a complete message 198 # in a reasonable amount of time 199 self.sock.settimeout(self._SOCKET_TIMEOUT) 200 self.sock.connect(self.monitor_filename) 201 except EnvironmentError: 202 raise errors.HypervisorError("Can't connect to qmp socket") 203 self._connected = True
204
205 - def close(self):
206 """Closes the socket 207 208 It cannot be used after this call. 209 210 """ 211 if self._connected: 212 self._close()
213
214 - def _close(self):
215 self.sock.close() 216 self._connected = False
217
218 219 -def _ensure_connection(fn):
220 """Decorator that wraps MonitorSocket external methods""" 221 def wrapper(*args, **kwargs): 222 """Ensure proper connect/close and exception propagation""" 223 mon = args[0] 224 already_connected = mon.is_connected() 225 mon.connect() 226 try: 227 ret = fn(*args, **kwargs) 228 finally: 229 # In general this decorator wraps external methods. 230 # Here we close the connection only if we initiated it before, 231 # to protect us from using the socket after closing it 232 # in case we invoke a decorated method internally by accident. 233 if not already_connected: 234 mon.close() 235 return ret
236 return wrapper 237
238 239 -class QmpConnection(MonitorSocket):
240 """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP). 241 242 """ 243 _FIRST_MESSAGE_KEY = "QMP" 244 _EVENT_KEY = "event" 245 _ERROR_KEY = "error" 246 _RETURN_KEY = "return" 247 _ACTUAL_KEY = ACTUAL_KEY = "actual" 248 _ERROR_CLASS_KEY = "class" 249 _ERROR_DESC_KEY = "desc" 250 _EXECUTE_KEY = "execute" 251 _ARGUMENTS_KEY = "arguments" 252 _VERSION_KEY = "version" 253 _PACKAGE_KEY = "package" 254 _QEMU_KEY = "qemu" 255 _CAPABILITIES_COMMAND = "qmp_capabilities" 256 _QUERY_COMMANDS = "query-commands" 257 _MESSAGE_END_TOKEN = "\r\n" 258 _QEMU_PCI_SLOTS = 32 # The number of PCI slots QEMU exposes by default 259
260 - def __init__(self, monitor_filename):
261 super(QmpConnection, self).__init__(monitor_filename) 262 self._buf = "" 263 self.supported_commands = None
264
265 - def __enter__(self):
266 self.connect() 267 return self
268
269 - def __exit__(self, exc_type, exc_value, tb):
270 self.close()
271
272 - def connect(self):
273 """Connects to the QMP monitor. 274 275 Connects to the UNIX socket and makes sure that we can actually send and 276 receive data to the kvm instance via QMP. 277 278 @raise errors.HypervisorError: when there are communication errors 279 @raise errors.ProgrammerError: when there are data serialization errors 280 281 """ 282 super(QmpConnection, self).connect() 283 # Check if we receive a correct greeting message from the server 284 # (As per the QEMU Protocol Specification 0.1 - section 2.2) 285 greeting = self._Recv() 286 if not greeting[self._FIRST_MESSAGE_KEY]: 287 self._connected = False 288 raise errors.HypervisorError("kvm: QMP communication error (wrong" 289 " server greeting") 290 291 # Extract the version info from the greeting and make it available to users 292 # of the monitor. 293 version_info = greeting[self._FIRST_MESSAGE_KEY][self._VERSION_KEY] 294 295 self.version = (version_info[self._QEMU_KEY]["major"], 296 version_info[self._QEMU_KEY]["minor"], 297 version_info[self._QEMU_KEY]["micro"]) 298 self.package = version_info[self._PACKAGE_KEY].strip() 299 300 # This is needed because QMP can return more than one greetings 301 # see https://groups.google.com/d/msg/ganeti-devel/gZYcvHKDooU/SnukC8dgS5AJ 302 self._buf = "" 303 304 # Let's put the monitor in command mode using the qmp_capabilities 305 # command, or else no command will be executable. 306 # (As per the QEMU Protocol Specification 0.1 - section 4) 307 self.Execute(self._CAPABILITIES_COMMAND) 308 self.supported_commands = self._GetSupportedCommands()
309
310 - def _ParseMessage(self, buf):
311 """Extract and parse a QMP message from the given buffer. 312 313 Seeks for a QMP message in the given buf. If found, it parses it and 314 returns it together with the rest of the characters in the buf. 315 If no message is found, returns None and the whole buffer. 316 317 @raise errors.ProgrammerError: when there are data serialization errors 318 319 """ 320 message = None 321 # Check if we got the message end token (CRLF, as per the QEMU Protocol 322 # Specification 0.1 - Section 2.1.1) 323 pos = buf.find(self._MESSAGE_END_TOKEN) 324 if pos >= 0: 325 try: 326 message = QmpMessage.BuildFromJsonString(buf[:pos + 1]) 327 except Exception, err: 328 raise errors.ProgrammerError("QMP data serialization error: %s" % err) 329 buf = buf[pos + 1:] 330 331 return (message, buf)
332
333 - def _Recv(self):
334 """Receives a message from QMP and decodes the received JSON object. 335 336 @rtype: QmpMessage 337 @return: the received message 338 @raise errors.HypervisorError: when there are communication errors 339 @raise errors.ProgrammerError: when there are data serialization errors 340 341 """ 342 self._check_connection() 343 344 # Check if there is already a message in the buffer 345 (message, self._buf) = self._ParseMessage(self._buf) 346 if message: 347 return message 348 349 recv_buffer = StringIO.StringIO(self._buf) 350 recv_buffer.seek(len(self._buf)) 351 try: 352 while True: 353 data = self.sock.recv(4096) 354 if not data: 355 break 356 recv_buffer.write(data) 357 358 (message, self._buf) = self._ParseMessage(recv_buffer.getvalue()) 359 if message: 360 return message 361 362 except socket.timeout, err: 363 raise errors.HypervisorError("Timeout while receiving a QMP message: " 364 "%s" % (err)) 365 except socket.error, err: 366 raise errors.HypervisorError("Unable to receive data from KVM using the" 367 " QMP protocol: %s" % err)
368
369 - def _Send(self, message):
370 """Encodes and sends a message to KVM using QMP. 371 372 @type message: QmpMessage 373 @param message: message to send to KVM 374 @raise errors.HypervisorError: when there are communication errors 375 @raise errors.ProgrammerError: when there are data serialization errors 376 377 """ 378 self._check_connection() 379 try: 380 message_str = str(message) 381 except Exception, err: 382 raise errors.ProgrammerError("QMP data deserialization error: %s" % err) 383 384 try: 385 self.sock.sendall(message_str) 386 except socket.timeout, err: 387 raise errors.HypervisorError("Timeout while sending a QMP message: " 388 "%s (%s)" % (err.string, err.errno)) 389 except socket.error, err: 390 raise errors.HypervisorError("Unable to send data from KVM using the" 391 " QMP protocol: %s" % err)
392
393 - def _GetSupportedCommands(self):
394 """Update the list of supported commands. 395 396 """ 397 result = self.Execute(self._QUERY_COMMANDS) 398 return frozenset(com["name"] for com in result)
399
400 - def Execute(self, command, arguments=None):
401 """Executes a QMP command and returns the response of the server. 402 403 @type command: str 404 @param command: the command to execute 405 @type arguments: dict 406 @param arguments: dictionary of arguments to be passed to the command 407 @rtype: dict 408 @return: dictionary representing the received JSON object 409 @raise errors.HypervisorError: when there are communication errors 410 @raise errors.ProgrammerError: when there are data serialization errors 411 412 """ 413 self._check_connection() 414 415 # During the first calls of Execute, the list of supported commands has not 416 # yet been populated, so we can't use it. 417 if (self.supported_commands is not None and 418 command not in self.supported_commands): 419 raise QmpCommandNotSupported("Instance does not support the '%s'" 420 " QMP command." % command) 421 422 message = QmpMessage({self._EXECUTE_KEY: command}) 423 if arguments: 424 message[self._ARGUMENTS_KEY] = arguments 425 self._Send(message) 426 427 ret = self._GetResponse(command) 428 # log important qmp commands.. 429 if command not in [self._QUERY_COMMANDS, self._CAPABILITIES_COMMAND]: 430 logging.debug("QMP %s %s: %s\n", command, arguments, ret) 431 return ret
432
433 - def _GetResponse(self, command):
434 """Parse the QMP response 435 436 If error key found in the response message raise HypervisorError. 437 Ignore any async event and thus return the response message 438 related to command. 439 440 """ 441 # According the the QMP specification, there are only two reply types to a 442 # command: either error (containing the "error" key) or success (containing 443 # the "return" key). There is also a third possibility, that of an 444 # (unrelated to the command) asynchronous event notification, identified by 445 # the "event" key. 446 while True: 447 response = self._Recv() 448 err = response[self._ERROR_KEY] 449 if err: 450 raise errors.HypervisorError("kvm: error executing the %s" 451 " command: %s (%s):" % 452 (command, 453 err[self._ERROR_DESC_KEY], 454 err[self._ERROR_CLASS_KEY])) 455 456 elif response[self._EVENT_KEY]: 457 # Filter-out any asynchronous events 458 continue 459 460 return response[self._RETURN_KEY]
461 462 @_ensure_connection
463 - def HotAddNic(self, nic, devid, tapfds=None, vhostfds=None, features=None):
464 """Hot-add a NIC 465 466 First pass the tapfds, then netdev_add and then device_add 467 468 """ 469 if tapfds is None: 470 tapfds = [] 471 if vhostfds is None: 472 vhostfds = [] 473 if features is None: 474 features = {} 475 476 enable_vhost = features.get("vhost", False) 477 enable_mq, virtio_net_queues = features.get("mq", (False, 1)) 478 479 fdnames = [] 480 for i, fd in enumerate(tapfds): 481 fdname = "%s-%d" % (devid, i) 482 self._GetFd(fd, fdname) 483 fdnames.append(fdname) 484 485 arguments = { 486 "type": "tap", 487 "id": devid, 488 "fds": ":".join(fdnames), 489 } 490 if enable_vhost: 491 fdnames = [] 492 for i, fd in enumerate(vhostfds): 493 fdname = "%s-vhost-%d" % (devid, i) 494 self._GetFd(fd, fdname) 495 fdnames.append(fdname) 496 497 arguments.update({ 498 "vhost": "on", 499 "vhostfds": ":".join(fdnames), 500 }) 501 self.Execute("netdev_add", arguments) 502 503 arguments = { 504 "driver": "virtio-net-pci", 505 "id": devid, 506 "bus": "pci.0", 507 "addr": hex(nic.pci), 508 "netdev": devid, 509 "mac": nic.mac, 510 } 511 if enable_mq: 512 arguments.update({ 513 "mq": "on", 514 "vectors": (2 * virtio_net_queues + 1), 515 }) 516 self.Execute("device_add", arguments)
517 518 @_ensure_connection
519 - def HotDelNic(self, devid):
520 """Hot-del a NIC 521 522 """ 523 self.Execute("device_del", {"id": devid}) 524 self.Execute("netdev_del", {"id": devid})
525 526 @_ensure_connection
527 - def HotAddDisk(self, disk, devid, uri):
528 """Hot-add a disk 529 530 Try opening the device to obtain a fd and pass it with SCM_RIGHTS. This 531 will be omitted in case of userspace access mode (open will fail). 532 Then use blockdev-add and then device_add. 533 534 """ 535 if os.path.exists(uri): 536 fd = os.open(uri, os.O_RDWR) 537 fdset = self._AddFd(fd) 538 os.close(fd) 539 filename = "/dev/fdset/%s" % fdset 540 else: 541 # The uri is not a file. 542 # This can happen if a userspace uri is provided. 543 filename = uri 544 fdset = None 545 546 arguments = { 547 "options": { 548 "driver": "raw", 549 "id": devid, 550 "file": { 551 "driver": "file", 552 "filename": filename, 553 } 554 } 555 } 556 self.Execute("blockdev-add", arguments) 557 558 if fdset is not None: 559 self._RemoveFdset(fdset) 560 561 arguments = { 562 "driver": "virtio-blk-pci", 563 "id": devid, 564 "bus": "pci.0", 565 "addr": hex(disk.pci), 566 "drive": devid, 567 } 568 self.Execute("device_add", arguments)
569 570 @_ensure_connection
571 - def HotDelDisk(self, devid):
572 """Hot-del a Disk 573 574 Note that drive_del is not supported yet in qmp and thus should 575 be invoked from HMP. 576 577 """ 578 self.Execute("device_del", {"id": devid})
579 #TODO: uncomment when drive_del gets implemented in upstream qemu 580 # self.Execute("drive_del", {"id": devid}) 581
582 - def _GetPCIDevices(self):
583 """Get the devices of the first PCI bus of a running instance. 584 585 """ 586 self._check_connection() 587 pci = self.Execute("query-pci") 588 bus = pci[0] 589 devices = bus["devices"] 590 return devices
591 592 @_ensure_connection
593 - def HasPCIDevice(self, device, devid):
594 """Check if a specific device exists or not on a running instance. 595 596 It will match the PCI slot of the device and the id currently 597 obtained by _GenerateDeviceKVMId(). 598 599 """ 600 for d in self._GetPCIDevices(): 601 if d["qdev_id"] == devid and d["slot"] == device.pci: 602 return True 603 604 return False
605 606 @_ensure_connection
607 - def GetFreePCISlot(self):
608 """Get the first available PCI slot of a running instance. 609 610 """ 611 slots = bitarray(self._QEMU_PCI_SLOTS) 612 slots.setall(False) # pylint: disable=E1101 613 for d in self._GetPCIDevices(): 614 slot = d["slot"] 615 slots[slot] = True 616 617 return utils.GetFreeSlot(slots)
618 619 @_ensure_connection
620 - def CheckDiskHotAddSupport(self):
621 """Check if disk hotplug is possible 622 623 Hotplug is *not* supported in case: 624 - fdsend module is missing 625 - add-fd and blockdev-add qmp commands are not supported 626 627 """ 628 def _raise(reason): 629 raise errors.HotplugError("Cannot hot-add disk: %s." % reason)
630 631 if not fdsend: 632 _raise("fdsend python module is missing") 633 634 if "add-fd" not in self.supported_commands: 635 _raise("add-fd qmp command is not supported") 636 637 if "blockdev-add" not in self.supported_commands: 638 _raise("blockdev-add qmp command is not supported")
639 640 @_ensure_connection
641 - def CheckNicHotAddSupport(self):
642 """Check if NIC hotplug is possible 643 644 Hotplug is *not* supported in case: 645 - fdsend module is missing 646 - getfd and netdev_add qmp commands are not supported 647 648 """ 649 def _raise(reason): 650 raise errors.HotplugError("Cannot hot-add NIC: %s." % reason)
651 652 if not fdsend: 653 _raise("fdsend python module is missing") 654 655 if "getfd" not in self.supported_commands: 656 _raise("getfd qmp command is not supported") 657 658 if "netdev_add" not in self.supported_commands: 659 _raise("netdev_add qmp command is not supported") 660
661 - def _GetFd(self, fd, fdname):
662 """Wrapper around the getfd qmp command 663 664 Use fdsend to send an fd to a running process via SCM_RIGHTS and then use 665 the getfd qmp command to name it properly so that it can be used 666 later by NIC hotplugging. 667 668 @type fd: int 669 @param fd: The file descriptor to pass 670 @raise errors.HypervisorError: If getfd fails for some reason 671 672 """ 673 self._check_connection() 674 try: 675 fdsend.sendfds(self.sock, " ", fds=[fd]) 676 arguments = { 677 "fdname": fdname, 678 } 679 self.Execute("getfd", arguments) 680 except errors.HypervisorError, err: 681 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err) 682 raise
683
684 - def _AddFd(self, fd):
685 """Wrapper around add-fd qmp command 686 687 Use fdsend to send fd to a running process via SCM_RIGHTS and then add-fd 688 qmp command to add it to an fdset so that it can be used later by 689 disk hotplugging. 690 691 @type fd: int 692 @param fd: The file descriptor to pass 693 694 @return: The fdset ID that the fd has been added to 695 @raise errors.HypervisorError: If add-fd fails for some reason 696 697 """ 698 self._check_connection() 699 try: 700 fdsend.sendfds(self.sock, " ", fds=[fd]) 701 # Omit fdset-id and let qemu create a new one (see qmp-commands.hx) 702 response = self.Execute("add-fd") 703 fdset = response["fdset-id"] 704 except errors.HypervisorError, err: 705 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err) 706 raise 707 708 return fdset
709
710 - def _RemoveFdset(self, fdset):
711 """Wrapper around remove-fd qmp command 712 713 Remove the file descriptor previously passed. After qemu has dup'd the fd 714 (e.g. during disk hotplug), it can be safely removed. 715 716 """ 717 self._check_connection() 718 # Omit the fd to cleanup all fds in the fdset (see qemu/qmp-commands.hx) 719 try: 720 self.Execute("remove-fd", {"fdset-id": fdset}) 721 except errors.HypervisorError, err: 722 # There is no big deal if we cannot remove an fdset. This cleanup here is 723 # done on a best effort basis. Upon next hot-add a new fdset will be 724 # created. If we raise an exception here, that is after drive_add has 725 # succeeded, the whole hot-add action will fail and the runtime file will 726 # not be updated which will make the instance non migrate-able 727 logging.info("Removing fdset with id %s failed: %s", fdset, err)
728