1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Qemu monitor control classes
32
33 """
34
35
36 import os
37 import stat
38 import errno
39 import socket
40 import StringIO
41 import logging
42 try:
43 import fdsend
44 except ImportError:
45 fdsend = None
46
47 from bitarray import bitarray
48
49 from ganeti import errors
50 from ganeti import utils
51 from ganeti import serializer
55 """QMP command not supported by the monitor.
56
57 This is raised in case a QmpMonitor instance is asked to execute a command
58 not supported by the instance.
59
60 This is a KVM-specific exception, intended to assist in falling back to using
61 the human monitor for operations QMP does not support.
62
63 """
64 pass
65
68 """QEMU Messaging Protocol (QMP) message.
69
70 """
72 """Creates a new QMP message based on the passed data.
73
74 """
75 if not isinstance(data, dict):
76 raise TypeError("QmpMessage must be initialized with a dict")
77
78 self.data = data
79
81 """Get the value of the required field if present, or None.
82
83 Overrides the [] operator to provide access to the message data,
84 returning None if the required item is not in the message
85 @return: the value of the field_name field, or None if field_name
86 is not contained in the message
87
88 """
89 return self.data.get(field_name, None)
90
92 """Set the value of the required field_name to field_value.
93
94 """
95 self.data[field_name] = field_value
96
98 """Return the number of fields stored in this QmpMessage.
99
100 """
101 return len(self.data)
102
104 """Delete the specified element from the QmpMessage.
105
106 """
107 del(self.data[key])
108
109 @staticmethod
111 """Build a QmpMessage from a JSON encoded string.
112
113 @type json_string: str
114 @param json_string: JSON string representing the message
115 @rtype: L{QmpMessage}
116 @return: a L{QmpMessage} built from json_string
117
118 """
119
120 data = serializer.LoadJson(json_string)
121 return QmpMessage(data)
122
126
128
129
130 return self.data == other.data
131
134 _SOCKET_TIMEOUT = 5
135
137 """Instantiates the MonitorSocket object.
138
139 @type monitor_filename: string
140 @param monitor_filename: the filename of the UNIX raw socket on which the
141 monitor (QMP or simple one) is listening
142
143 """
144 self.monitor_filename = monitor_filename
145 self._connected = False
146
148 sock_stat = None
149 try:
150 sock_stat = os.stat(self.monitor_filename)
151 except EnvironmentError, err:
152 if err.errno == errno.ENOENT:
153 raise errors.HypervisorError("No monitor socket found")
154 else:
155 raise errors.HypervisorError("Error checking monitor socket: %s",
156 utils.ErrnoOrStr(err))
157 if not stat.S_ISSOCK(sock_stat.st_mode):
158 raise errors.HypervisorError("Monitor socket is not a socket")
159
161 """Make sure that the connection is established.
162
163 """
164 if not self._connected:
165 raise errors.ProgrammerError("To use a MonitorSocket you need to first"
166 " invoke connect() on it")
167
169 """Connect to the monitor socket if not already connected.
170
171 """
172 if not self._connected:
173 self._connect()
174
176 """Return whether there is a connection to the socket or not.
177
178 """
179 return self._connected
180
182 """Connects to the monitor.
183
184 Connects to the UNIX socket
185
186 @raise errors.HypervisorError: when there are communication errors
187
188 """
189 if self._connected:
190 raise errors.ProgrammerError("Cannot connect twice")
191
192 self._check_socket()
193
194
195 try:
196 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
197
198
199 self.sock.settimeout(self._SOCKET_TIMEOUT)
200 self.sock.connect(self.monitor_filename)
201 except EnvironmentError:
202 raise errors.HypervisorError("Can't connect to qmp socket")
203 self._connected = True
204
206 """Closes the socket
207
208 It cannot be used after this call.
209
210 """
211 if self._connected:
212 self._close()
213
215 self.sock.close()
216 self._connected = False
217
220 """Decorator that wraps MonitorSocket external methods"""
221 def wrapper(*args, **kwargs):
222 """Ensure proper connect/close and exception propagation"""
223 mon = args[0]
224 already_connected = mon.is_connected()
225 mon.connect()
226 try:
227 ret = fn(*args, **kwargs)
228 finally:
229
230
231
232
233 if not already_connected:
234 mon.close()
235 return ret
236 return wrapper
237
240 """Connection to the QEMU Monitor using the QEMU Monitor Protocol (QMP).
241
242 """
243 _FIRST_MESSAGE_KEY = "QMP"
244 _EVENT_KEY = "event"
245 _ERROR_KEY = "error"
246 _RETURN_KEY = "return"
247 _ACTUAL_KEY = ACTUAL_KEY = "actual"
248 _ERROR_CLASS_KEY = "class"
249 _ERROR_DESC_KEY = "desc"
250 _EXECUTE_KEY = "execute"
251 _ARGUMENTS_KEY = "arguments"
252 _VERSION_KEY = "version"
253 _PACKAGE_KEY = "package"
254 _QEMU_KEY = "qemu"
255 _CAPABILITIES_COMMAND = "qmp_capabilities"
256 _QUERY_COMMANDS = "query-commands"
257 _MESSAGE_END_TOKEN = "\r\n"
258 _QEMU_PCI_SLOTS = 32
259
261 super(QmpConnection, self).__init__(monitor_filename)
262 self._buf = ""
263 self.supported_commands = None
264
268
269 - def __exit__(self, exc_type, exc_value, tb):
271
273 """Connects to the QMP monitor.
274
275 Connects to the UNIX socket and makes sure that we can actually send and
276 receive data to the kvm instance via QMP.
277
278 @raise errors.HypervisorError: when there are communication errors
279 @raise errors.ProgrammerError: when there are data serialization errors
280
281 """
282 super(QmpConnection, self).connect()
283
284
285 greeting = self._Recv()
286 if not greeting[self._FIRST_MESSAGE_KEY]:
287 self._connected = False
288 raise errors.HypervisorError("kvm: QMP communication error (wrong"
289 " server greeting")
290
291
292
293 version_info = greeting[self._FIRST_MESSAGE_KEY][self._VERSION_KEY]
294
295 self.version = (version_info[self._QEMU_KEY]["major"],
296 version_info[self._QEMU_KEY]["minor"],
297 version_info[self._QEMU_KEY]["micro"])
298 self.package = version_info[self._PACKAGE_KEY].strip()
299
300
301
302 self._buf = ""
303
304
305
306
307 self.Execute(self._CAPABILITIES_COMMAND)
308 self.supported_commands = self._GetSupportedCommands()
309
311 """Extract and parse a QMP message from the given buffer.
312
313 Seeks for a QMP message in the given buf. If found, it parses it and
314 returns it together with the rest of the characters in the buf.
315 If no message is found, returns None and the whole buffer.
316
317 @raise errors.ProgrammerError: when there are data serialization errors
318
319 """
320 message = None
321
322
323 pos = buf.find(self._MESSAGE_END_TOKEN)
324 if pos >= 0:
325 try:
326 message = QmpMessage.BuildFromJsonString(buf[:pos + 1])
327 except Exception, err:
328 raise errors.ProgrammerError("QMP data serialization error: %s" % err)
329 buf = buf[pos + 1:]
330
331 return (message, buf)
332
334 """Receives a message from QMP and decodes the received JSON object.
335
336 @rtype: QmpMessage
337 @return: the received message
338 @raise errors.HypervisorError: when there are communication errors
339 @raise errors.ProgrammerError: when there are data serialization errors
340
341 """
342 self._check_connection()
343
344
345 (message, self._buf) = self._ParseMessage(self._buf)
346 if message:
347 return message
348
349 recv_buffer = StringIO.StringIO(self._buf)
350 recv_buffer.seek(len(self._buf))
351 try:
352 while True:
353 data = self.sock.recv(4096)
354 if not data:
355 break
356 recv_buffer.write(data)
357
358 (message, self._buf) = self._ParseMessage(recv_buffer.getvalue())
359 if message:
360 return message
361
362 except socket.timeout, err:
363 raise errors.HypervisorError("Timeout while receiving a QMP message: "
364 "%s" % (err))
365 except socket.error, err:
366 raise errors.HypervisorError("Unable to receive data from KVM using the"
367 " QMP protocol: %s" % err)
368
369 - def _Send(self, message):
370 """Encodes and sends a message to KVM using QMP.
371
372 @type message: QmpMessage
373 @param message: message to send to KVM
374 @raise errors.HypervisorError: when there are communication errors
375 @raise errors.ProgrammerError: when there are data serialization errors
376
377 """
378 self._check_connection()
379 try:
380 message_str = str(message)
381 except Exception, err:
382 raise errors.ProgrammerError("QMP data deserialization error: %s" % err)
383
384 try:
385 self.sock.sendall(message_str)
386 except socket.timeout, err:
387 raise errors.HypervisorError("Timeout while sending a QMP message: "
388 "%s (%s)" % (err.string, err.errno))
389 except socket.error, err:
390 raise errors.HypervisorError("Unable to send data from KVM using the"
391 " QMP protocol: %s" % err)
392
394 """Update the list of supported commands.
395
396 """
397 result = self.Execute(self._QUERY_COMMANDS)
398 return frozenset(com["name"] for com in result)
399
400 - def Execute(self, command, arguments=None):
401 """Executes a QMP command and returns the response of the server.
402
403 @type command: str
404 @param command: the command to execute
405 @type arguments: dict
406 @param arguments: dictionary of arguments to be passed to the command
407 @rtype: dict
408 @return: dictionary representing the received JSON object
409 @raise errors.HypervisorError: when there are communication errors
410 @raise errors.ProgrammerError: when there are data serialization errors
411
412 """
413 self._check_connection()
414
415
416
417 if (self.supported_commands is not None and
418 command not in self.supported_commands):
419 raise QmpCommandNotSupported("Instance does not support the '%s'"
420 " QMP command." % command)
421
422 message = QmpMessage({self._EXECUTE_KEY: command})
423 if arguments:
424 message[self._ARGUMENTS_KEY] = arguments
425 self._Send(message)
426
427 ret = self._GetResponse(command)
428
429 if command not in [self._QUERY_COMMANDS, self._CAPABILITIES_COMMAND]:
430 logging.debug("QMP %s %s: %s\n", command, arguments, ret)
431 return ret
432
434 """Parse the QMP response
435
436 If error key found in the response message raise HypervisorError.
437 Ignore any async event and thus return the response message
438 related to command.
439
440 """
441
442
443
444
445
446 while True:
447 response = self._Recv()
448 err = response[self._ERROR_KEY]
449 if err:
450 raise errors.HypervisorError("kvm: error executing the %s"
451 " command: %s (%s):" %
452 (command,
453 err[self._ERROR_DESC_KEY],
454 err[self._ERROR_CLASS_KEY]))
455
456 elif response[self._EVENT_KEY]:
457
458 continue
459
460 return response[self._RETURN_KEY]
461
462 @_ensure_connection
463 - def HotAddNic(self, nic, devid, tapfds=None, vhostfds=None, features=None):
464 """Hot-add a NIC
465
466 First pass the tapfds, then netdev_add and then device_add
467
468 """
469 if tapfds is None:
470 tapfds = []
471 if vhostfds is None:
472 vhostfds = []
473 if features is None:
474 features = {}
475
476 enable_vhost = features.get("vhost", False)
477 enable_mq, virtio_net_queues = features.get("mq", (False, 1))
478
479 fdnames = []
480 for i, fd in enumerate(tapfds):
481 fdname = "%s-%d" % (devid, i)
482 self._GetFd(fd, fdname)
483 fdnames.append(fdname)
484
485 arguments = {
486 "type": "tap",
487 "id": devid,
488 "fds": ":".join(fdnames),
489 }
490 if enable_vhost:
491 fdnames = []
492 for i, fd in enumerate(vhostfds):
493 fdname = "%s-vhost-%d" % (devid, i)
494 self._GetFd(fd, fdname)
495 fdnames.append(fdname)
496
497 arguments.update({
498 "vhost": "on",
499 "vhostfds": ":".join(fdnames),
500 })
501 self.Execute("netdev_add", arguments)
502
503 arguments = {
504 "driver": "virtio-net-pci",
505 "id": devid,
506 "bus": "pci.0",
507 "addr": hex(nic.pci),
508 "netdev": devid,
509 "mac": nic.mac,
510 }
511 if enable_mq:
512 arguments.update({
513 "mq": "on",
514 "vectors": (2 * virtio_net_queues + 1),
515 })
516 self.Execute("device_add", arguments)
517
518 @_ensure_connection
520 """Hot-del a NIC
521
522 """
523 self.Execute("device_del", {"id": devid})
524 self.Execute("netdev_del", {"id": devid})
525
526 @_ensure_connection
528 """Hot-add a disk
529
530 Try opening the device to obtain a fd and pass it with SCM_RIGHTS. This
531 will be omitted in case of userspace access mode (open will fail).
532 Then use blockdev-add and then device_add.
533
534 """
535 if os.path.exists(uri):
536 fd = os.open(uri, os.O_RDWR)
537 fdset = self._AddFd(fd)
538 os.close(fd)
539 filename = "/dev/fdset/%s" % fdset
540 else:
541
542
543 filename = uri
544 fdset = None
545
546 arguments = {
547 "options": {
548 "driver": "raw",
549 "id": devid,
550 "file": {
551 "driver": "file",
552 "filename": filename,
553 }
554 }
555 }
556 self.Execute("blockdev-add", arguments)
557
558 if fdset is not None:
559 self._RemoveFdset(fdset)
560
561 arguments = {
562 "driver": "virtio-blk-pci",
563 "id": devid,
564 "bus": "pci.0",
565 "addr": hex(disk.pci),
566 "drive": devid,
567 }
568 self.Execute("device_add", arguments)
569
570 @_ensure_connection
572 """Hot-del a Disk
573
574 Note that drive_del is not supported yet in qmp and thus should
575 be invoked from HMP.
576
577 """
578 self.Execute("device_del", {"id": devid})
579
580
581
583 """Get the devices of the first PCI bus of a running instance.
584
585 """
586 self._check_connection()
587 pci = self.Execute("query-pci")
588 bus = pci[0]
589 devices = bus["devices"]
590 return devices
591
592 @_ensure_connection
594 """Check if a specific device exists or not on a running instance.
595
596 It will match the PCI slot of the device and the id currently
597 obtained by _GenerateDeviceKVMId().
598
599 """
600 for d in self._GetPCIDevices():
601 if d["qdev_id"] == devid and d["slot"] == device.pci:
602 return True
603
604 return False
605
606 @_ensure_connection
608 """Get the first available PCI slot of a running instance.
609
610 """
611 slots = bitarray(self._QEMU_PCI_SLOTS)
612 slots.setall(False)
613 for d in self._GetPCIDevices():
614 slot = d["slot"]
615 slots[slot] = True
616
617 return utils.GetFreeSlot(slots)
618
619 @_ensure_connection
621 """Check if disk hotplug is possible
622
623 Hotplug is *not* supported in case:
624 - fdsend module is missing
625 - add-fd and blockdev-add qmp commands are not supported
626
627 """
628 def _raise(reason):
629 raise errors.HotplugError("Cannot hot-add disk: %s." % reason)
630
631 if not fdsend:
632 _raise("fdsend python module is missing")
633
634 if "add-fd" not in self.supported_commands:
635 _raise("add-fd qmp command is not supported")
636
637 if "blockdev-add" not in self.supported_commands:
638 _raise("blockdev-add qmp command is not supported")
639
640 @_ensure_connection
642 """Check if NIC hotplug is possible
643
644 Hotplug is *not* supported in case:
645 - fdsend module is missing
646 - getfd and netdev_add qmp commands are not supported
647
648 """
649 def _raise(reason):
650 raise errors.HotplugError("Cannot hot-add NIC: %s." % reason)
651
652 if not fdsend:
653 _raise("fdsend python module is missing")
654
655 if "getfd" not in self.supported_commands:
656 _raise("getfd qmp command is not supported")
657
658 if "netdev_add" not in self.supported_commands:
659 _raise("netdev_add qmp command is not supported")
660
661 - def _GetFd(self, fd, fdname):
662 """Wrapper around the getfd qmp command
663
664 Use fdsend to send an fd to a running process via SCM_RIGHTS and then use
665 the getfd qmp command to name it properly so that it can be used
666 later by NIC hotplugging.
667
668 @type fd: int
669 @param fd: The file descriptor to pass
670 @raise errors.HypervisorError: If getfd fails for some reason
671
672 """
673 self._check_connection()
674 try:
675 fdsend.sendfds(self.sock, " ", fds=[fd])
676 arguments = {
677 "fdname": fdname,
678 }
679 self.Execute("getfd", arguments)
680 except errors.HypervisorError, err:
681 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err)
682 raise
683
685 """Wrapper around add-fd qmp command
686
687 Use fdsend to send fd to a running process via SCM_RIGHTS and then add-fd
688 qmp command to add it to an fdset so that it can be used later by
689 disk hotplugging.
690
691 @type fd: int
692 @param fd: The file descriptor to pass
693
694 @return: The fdset ID that the fd has been added to
695 @raise errors.HypervisorError: If add-fd fails for some reason
696
697 """
698 self._check_connection()
699 try:
700 fdsend.sendfds(self.sock, " ", fds=[fd])
701
702 response = self.Execute("add-fd")
703 fdset = response["fdset-id"]
704 except errors.HypervisorError, err:
705 logging.info("Passing fd %s via SCM_RIGHTS failed: %s", fd, err)
706 raise
707
708 return fdset
709
711 """Wrapper around remove-fd qmp command
712
713 Remove the file descriptor previously passed. After qemu has dup'd the fd
714 (e.g. during disk hotplug), it can be safely removed.
715
716 """
717 self._check_connection()
718
719 try:
720 self.Execute("remove-fd", {"fdset-id": fdset})
721 except errors.HypervisorError, err:
722
723
724
725
726
727 logging.info("Removing fdset with id %s failed: %s", fdset, err)
728