1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Qemu monitor control classes
32
33 """
34
35
36 import os
37 import stat
38 import errno
39 import socket
40 import StringIO
41 import logging
42 try:
43 import fdsend
44 except ImportError:
45 fdsend = None
46
47 from bitarray import bitarray
48
49 from ganeti import errors
50 from ganeti import utils
51 from ganeti import constants
52 from ganeti import serializer
56 """QMP command not supported by the monitor.
57
58 This is raised in case a QmpMonitor instance is asked to execute a command
59 not supported by the instance.
60
61 This is a KVM-specific exception, intended to assist in falling back to using
62 the human monitor for operations QMP does not support.
63
64 """
65 pass
66
69 """QEMU Messaging Protocol (QMP) message.
70
71 """
73 """Creates a new QMP message based on the passed data.
74
75 """
76 if not isinstance(data, dict):
77 raise TypeError("QmpMessage must be initialized with a dict")
78
79 self.data = data
80
82 """Get the value of the required field if present, or None.
83
84 Overrides the [] operator to provide access to the message data,
85 returning None if the required item is not in the message
86 @return: the value of the field_name field, or None if field_name
87 is not contained in the message
88
89 """
90 return self.data.get(field_name, None)
91
93 """Set the value of the required field_name to field_value.
94
95 """
96 self.data[field_name] = field_value
97
99 """Return the number of fields stored in this QmpMessage.
100
101 """
102 return len(self.data)
103
105 """Delete the specified element from the QmpMessage.
106
107 """
108 del(self.data[key])
109
110 @staticmethod
112 """Build a QmpMessage from a JSON encoded string.
113
114 @type json_string: str
115 @param json_string: JSON string representing the message
116 @rtype: L{QmpMessage}
117 @return: a L{QmpMessage} built from json_string
118
119 """
120
121 data = serializer.LoadJson(json_string)
122 return QmpMessage(data)
123
127
129
130
131 return self.data == other.data
132
135 _SOCKET_TIMEOUT = 5
136
138 """Instantiates the MonitorSocket object.
139
140 @type monitor_filename: string
141 @param monitor_filename: the filename of the UNIX raw socket on which the
142 monitor (QMP or simple one) is listening
143
144 """
145 self.monitor_filename = monitor_filename
146 self._connected = False
147
149 sock_stat = None
150 try:
151 sock_stat = os.stat(self.monitor_filename)
152 except EnvironmentError, err:
153 if err.errno == errno.ENOENT:
154 raise errors.HypervisorError("No monitor socket found")
155 else:
156 raise errors.HypervisorError("Error checking monitor socket: %s",
157 utils.ErrnoOrStr(err))
158 if not stat.S_ISSOCK(sock_stat.st_mode):
159 raise errors.HypervisorError("Monitor socket is not a socket")
160
162 """Make sure that the connection is established.
163
164 """
165 if not self._connected:
166 raise errors.ProgrammerError("To use a MonitorSocket you need to first"
167 " invoke connect() on it")
168
170 """Connect to the monitor socket if not already connected.
171
172 """
173 if not self._connected:
174 self._connect()
175
177 """Return whether there is a connection to the socket or not.
178
179 """
180 return self._connected
181
183 """Connects to the monitor.
184
185 Connects to the UNIX socket
186
187 @raise errors.HypervisorError: when there are communication errors
188
189 """
190 if self._connected:
191 raise errors.ProgrammerError("Cannot connect twice")
192
193 self._check_socket()
194
195
196 try:
197 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
198
199
200 self.sock.settimeout(self._SOCKET_TIMEOUT)
201 self.sock.connect(self.monitor_filename)
202 except EnvironmentError:
203 raise errors.HypervisorError("Can't connect to qmp socket")
204 self._connected = True
205
207 """Closes the socket
208
209 It cannot be used after this call.
210
211 """
212 if self._connected:
213 self._close()
214
216 self.sock.close()
217 self._connected = False
218
221 """Decorator that wraps MonitorSocket external methods"""
222 def wrapper(*args, **kwargs):
223 """Ensure proper connect/close and exception propagation"""
224 mon = args[0]
225 already_connected = mon.is_connected()
226 mon.connect()
227 try:
228 ret = fn(*args, **kwargs)
229 finally:
230
231
232
233
234 if not already_connected:
235 mon.close()
236 return ret
237 return wrapper
238
241 """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP).
242
243 """
244 _FIRST_MESSAGE_KEY = "QMP"
245 _EVENT_KEY = "event"
246 _ERROR_KEY = "error"
247 _RETURN_KEY = "return"
248 _ACTUAL_KEY = ACTUAL_KEY = "actual"
249 _ERROR_CLASS_KEY = "class"
250 _ERROR_DESC_KEY = "desc"
251 _EXECUTE_KEY = "execute"
252 _ARGUMENTS_KEY = "arguments"
253 _VERSION_KEY = "version"
254 _PACKAGE_KEY = "package"
255 _QEMU_KEY = "qemu"
256 _CAPABILITIES_COMMAND = "qmp_capabilities"
257 _QUERY_COMMANDS = "query-commands"
258 _MESSAGE_END_TOKEN = "\r\n"
259
260
261 _DEVICE_ATTRIBUTES = [
262 "driver", "id", "bus", "addr", "channel", "scsi-id", "lun"
263 ]
264
266 super(QmpConnection, self).__init__(monitor_filename)
267 self._buf = ""
268 self.supported_commands = None
269
273
274 - def __exit__(self, exc_type, exc_value, tb):
276
278 """Connects to the QMP monitor.
279
280 Connects to the UNIX socket and makes sure that we can actually send and
281 receive data to the kvm instance via QMP.
282
283 @raise errors.HypervisorError: when there are communication errors
284 @raise errors.ProgrammerError: when there are data serialization errors
285
286 """
287 super(QmpConnection, self).connect()
288
289
290 greeting = self._Recv()
291 if not greeting[self._FIRST_MESSAGE_KEY]:
292 self._connected = False
293 raise errors.HypervisorError("kvm: QMP communication error (wrong"
294 " server greeting")
295
296
297
298 version_info = greeting[self._FIRST_MESSAGE_KEY][self._VERSION_KEY]
299
300 self.version = (version_info[self._QEMU_KEY]["major"],
301 version_info[self._QEMU_KEY]["minor"],
302 version_info[self._QEMU_KEY]["micro"])
303 self.package = version_info[self._PACKAGE_KEY].strip()
304
305
306
307 self._buf = ""
308
309
310
311
312 self.Execute(self._CAPABILITIES_COMMAND)
313 self.supported_commands = self._GetSupportedCommands()
314
316 """Extract and parse a QMP message from the given buffer.
317
318 Seeks for a QMP message in the given buf. If found, it parses it and
319 returns it together with the rest of the characters in the buf.
320 If no message is found, returns None and the whole buffer.
321
322 @raise errors.ProgrammerError: when there are data serialization errors
323
324 """
325 message = None
326
327
328 pos = buf.find(self._MESSAGE_END_TOKEN)
329 if pos >= 0:
330 try:
331 message = QmpMessage.BuildFromJsonString(buf[:pos + 1])
332 except Exception, err:
333 raise errors.ProgrammerError("QMP data serialization error: %s" % err)
334 buf = buf[pos + 1:]
335
336 return (message, buf)
337
339 """Receives a message from QMP and decodes the received JSON object.
340
341 @rtype: QmpMessage
342 @return: the received message
343 @raise errors.HypervisorError: when there are communication errors
344 @raise errors.ProgrammerError: when there are data serialization errors
345
346 """
347 self._check_connection()
348
349
350 (message, self._buf) = self._ParseMessage(self._buf)
351 if message:
352 return message
353
354 recv_buffer = StringIO.StringIO(self._buf)
355 recv_buffer.seek(len(self._buf))
356 try:
357 while True:
358 data = self.sock.recv(4096)
359 if not data:
360 break
361 recv_buffer.write(data)
362
363 (message, self._buf) = self._ParseMessage(recv_buffer.getvalue())
364 if message:
365 return message
366
367 except socket.timeout, err:
368 raise errors.HypervisorError("Timeout while receiving a QMP message: "
369 "%s" % (err))
370 except socket.error, err:
371 raise errors.HypervisorError("Unable to receive data from KVM using the"
372 " QMP protocol: %s" % err)
373
374 - def _Send(self, message):
375 """Encodes and sends a message to KVM using QMP.
376
377 @type message: QmpMessage
378 @param message: message to send to KVM
379 @raise errors.HypervisorError: when there are communication errors
380 @raise errors.ProgrammerError: when there are data serialization errors
381
382 """
383 self._check_connection()
384 try:
385 message_str = str(message)
386 except Exception, err:
387 raise errors.ProgrammerError("QMP data deserialization error: %s" % err)
388
389 try:
390 self.sock.sendall(message_str)
391 except socket.timeout, err:
392 raise errors.HypervisorError("Timeout while sending a QMP message: "
393 "%s (%s)" % (err.string, err.errno))
394 except socket.error, err:
395 raise errors.HypervisorError("Unable to send data from KVM using the"
396 " QMP protocol: %s" % err)
397
399 """Update the list of supported commands.
400
401 """
402 result = self.Execute(self._QUERY_COMMANDS)
403 return frozenset(com["name"] for com in result)
404
405 - def Execute(self, command, arguments=None):
406 """Executes a QMP command and returns the response of the server.
407
408 @type command: str
409 @param command: the command to execute
410 @type arguments: dict
411 @param arguments: dictionary of arguments to be passed to the command
412 @rtype: dict
413 @return: dictionary representing the received JSON object
414 @raise errors.HypervisorError: when there are communication errors
415 @raise errors.ProgrammerError: when there are data serialization errors
416
417 """
418 self._check_connection()
419
420
421
422 if (self.supported_commands is not None and
423 command not in self.supported_commands):
424 raise QmpCommandNotSupported("Instance does not support the '%s'"
425 " QMP command." % command)
426
427 message = QmpMessage({self._EXECUTE_KEY: command})
428 if arguments:
429 message[self._ARGUMENTS_KEY] = arguments
430 self._Send(message)
431
432 ret = self._GetResponse(command)
433
434 if command not in [self._QUERY_COMMANDS, self._CAPABILITIES_COMMAND]:
435 logging.debug("QMP %s %s: %s\n", command, arguments, ret)
436 return ret
437
439 """Parse the QMP response
440
441 If error key found in the response message raise HypervisorError.
442 Ignore any async event and thus return the response message
443 related to command.
444
445 """
446
447
448
449
450
451 while True:
452 response = self._Recv()
453 err = response[self._ERROR_KEY]
454 if err:
455 raise errors.HypervisorError("kvm: error executing the %s"
456 " command: %s (%s):" %
457 (command,
458 err[self._ERROR_DESC_KEY],
459 err[self._ERROR_CLASS_KEY]))
460
461 elif response[self._EVENT_KEY]:
462
463 continue
464
465 return response[self._RETURN_KEY]
466
468 """Filter non valid keys of the device's hvinfo (if any)."""
469 ret = {}
470 for k in self._DEVICE_ATTRIBUTES:
471 if k in hvinfo:
472 ret[k] = hvinfo[k]
473
474 return ret
475
476 @_ensure_connection
477 - def HotAddNic(self, nic, devid, tapfds=None, vhostfds=None, features=None):
478 """Hot-add a NIC
479
480 First pass the tapfds, then netdev_add and then device_add
481
482 """
483 if tapfds is None:
484 tapfds = []
485 if vhostfds is None:
486 vhostfds = []
487 if features is None:
488 features = {}
489
490 enable_vhost = features.get("vhost", False)
491 enable_mq, virtio_net_queues = features.get("mq", (False, 1))
492
493 fdnames = []
494 for i, fd in enumerate(tapfds):
495 fdname = "%s-%d" % (devid, i)
496 self._GetFd(fd, fdname)
497 fdnames.append(fdname)
498
499 arguments = {
500 "type": "tap",
501 "id": devid,
502 "fds": ":".join(fdnames),
503 }
504 if enable_vhost:
505 fdnames = []
506 for i, fd in enumerate(vhostfds):
507 fdname = "%s-vhost-%d" % (devid, i)
508 self._GetFd(fd, fdname)
509 fdnames.append(fdname)
510
511 arguments.update({
512 "vhost": "on",
513 "vhostfds": ":".join(fdnames),
514 })
515 self.Execute("netdev_add", arguments)
516
517 arguments = {
518 "netdev": devid,
519 "mac": nic.mac,
520 }
521
522
523 arguments.update(self._filter_hvinfo(nic.hvinfo))
524 if enable_mq:
525 arguments.update({
526 "mq": "on",
527 "vectors": (2 * virtio_net_queues + 1),
528 })
529 self.Execute("device_add", arguments)
530
531 @_ensure_connection
533 """Hot-del a NIC
534
535 """
536 self.Execute("device_del", {"id": devid})
537 self.Execute("netdev_del", {"id": devid})
538
539 @_ensure_connection
540 - def HotAddDisk(self, disk, devid, uri, drive_add_fn=None):
541 """Hot-add a disk
542
543 Try opening the device to obtain a fd and pass it with SCM_RIGHTS. This
544 will be omitted in case of userspace access mode (open will fail).
545 Then use blockdev-add QMP command or drive_add_fn() callback if any.
546 The add the guest device.
547
548 """
549 if os.path.exists(uri):
550 fd = os.open(uri, os.O_RDWR)
551 fdset = self._AddFd(fd)
552 os.close(fd)
553 filename = "/dev/fdset/%s" % fdset
554 else:
555
556
557 filename = uri
558 fdset = None
559
560
561
562
563
564
565
566
567
568
569
570 if drive_add_fn:
571 drive_add_fn(filename)
572 else:
573 arguments = {
574 "options": {
575 "driver": "raw",
576 "id": devid,
577 "file": {
578 "driver": "file",
579 "filename": filename,
580 }
581 }
582 }
583 self.Execute("blockdev-add", arguments)
584
585 if fdset is not None:
586 self._RemoveFdset(fdset)
587
588 arguments = {
589 "drive": devid,
590 }
591
592
593
594 arguments.update(self._filter_hvinfo(disk.hvinfo))
595 self.Execute("device_add", arguments)
596
597 @_ensure_connection
599 """Hot-del a Disk
600
601 Note that drive_del is not supported yet in qmp and thus should
602 be invoked from HMP.
603
604 """
605 self.Execute("device_del", {"id": devid})
606
607
608
610 """Get the devices of the first PCI bus of a running instance.
611
612 """
613 self._check_connection()
614 pci = self.Execute("query-pci")
615 bus = pci[0]
616 devices = bus["devices"]
617 return devices
618
620 """Check if a specific device ID exists on the PCI bus.
621
622 """
623 for d in self._GetPCIDevices():
624 if d["qdev_id"] == devid:
625 return True
626
627 return False
628
630 """Get the block devices of a running instance.
631
632 The query-block QMP command returns a list of dictionaries
633 including information for each virtual disk. For example:
634
635 [{"device": "disk-049f140d", "inserted": {"file": ..., "image": ...}}]
636
637 @rtype: list of dicts
638 @return: Info about the virtual disks of the instance.
639
640 """
641 self._check_connection()
642 devices = self.Execute("query-block")
643 return devices
644
646 """Check if a specific device ID exists among block devices.
647
648 """
649 for d in self._GetBlockDevices():
650 if d["device"] == devid:
651 return True
652
653 return False
654
655 @_ensure_connection
657 """Check if a specific device exists or not on a running instance.
658
659 It first checks the PCI devices and then the block devices.
660
661 """
662 if (self._HasPCIDevice(devid) or self._HasBlockDevice(devid)):
663 return True
664
665 return False
666
667 @_ensure_connection
669 """Get the first available PCI slot of a running instance.
670
671 """
672 slots = bitarray(constants.QEMU_PCI_SLOTS)
673 slots.setall(False)
674 for d in self._GetPCIDevices():
675 slot = d["slot"]
676 slots[slot] = True
677
678 return utils.GetFreeSlot(slots)
679
680 @_ensure_connection
682 """Check if disk hotplug is possible
683
684 Hotplug is *not* supported in case:
685 - fdsend module is missing
686 - add-fd and blockdev-add qmp commands are not supported
687
688 """
689 def _raise(reason):
690 raise errors.HotplugError("Cannot hot-add disk: %s." % reason)
691
692 if not fdsend:
693 _raise("fdsend python module is missing")
694
695 if "add-fd" not in self.supported_commands:
696 _raise("add-fd qmp command is not supported")
697
698 if "blockdev-add" not in self.supported_commands:
699 _raise("blockdev-add qmp command is not supported")
700
701 @_ensure_connection
703 """Check if NIC hotplug is possible
704
705 Hotplug is *not* supported in case:
706 - fdsend module is missing
707 - getfd and netdev_add qmp commands are not supported
708
709 """
710 def _raise(reason):
711 raise errors.HotplugError("Cannot hot-add NIC: %s." % reason)
712
713 if not fdsend:
714 _raise("fdsend python module is missing")
715
716 if "getfd" not in self.supported_commands:
717 _raise("getfd qmp command is not supported")
718
719 if "netdev_add" not in self.supported_commands:
720 _raise("netdev_add qmp command is not supported")
721
722 - def _GetFd(self, fd, fdname):
723 """Wrapper around the getfd qmp command
724
725 Use fdsend to send an fd to a running process via SCM_RIGHTS and then use
726 the getfd qmp command to name it properly so that it can be used
727 later by NIC hotplugging.
728
729 @type fd: int
730 @param fd: The file descriptor to pass
731 @raise errors.HypervisorError: If getfd fails for some reason
732
733 """
734 self._check_connection()
735 try:
736 fdsend.sendfds(self.sock, " ", fds=[fd])
737 arguments = {
738 "fdname": fdname,
739 }
740 self.Execute("getfd", arguments)
741 except errors.HypervisorError, err:
742 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err)
743 raise
744
746 """Wrapper around add-fd qmp command
747
748 Use fdsend to send fd to a running process via SCM_RIGHTS and then add-fd
749 qmp command to add it to an fdset so that it can be used later by
750 disk hotplugging.
751
752 @type fd: int
753 @param fd: The file descriptor to pass
754
755 @return: The fdset ID that the fd has been added to
756 @raise errors.HypervisorError: If add-fd fails for some reason
757
758 """
759 self._check_connection()
760 try:
761 fdsend.sendfds(self.sock, " ", fds=[fd])
762
763 response = self.Execute("add-fd")
764 fdset = response["fdset-id"]
765 except errors.HypervisorError, err:
766 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err)
767 raise
768
769 return fdset
770
772 """Wrapper around remove-fd qmp command
773
774 Remove the file descriptor previously passed. After qemu has dup'd the fd
775 (e.g. during disk hotplug), it can be safely removed.
776
777 """
778 self._check_connection()
779
780 try:
781 self.Execute("remove-fd", {"fdset-id": fdset})
782 except errors.HypervisorError, err:
783
784
785
786
787
788 logging.info("Removing fdset with id %s failed: %s", fdset, err)
789