Package ganeti :: Package impexpd
[hide private]
[frames] | no frames]

Source Code for Package ganeti.impexpd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Classes and functions for import/export daemon. 
 32   
 33  """ 
 34   
 35  import os 
 36  import re 
 37  import socket 
 38  import logging 
 39  import signal 
 40  import errno 
 41  import time 
 42  from cStringIO import StringIO 
 43   
 44  from ganeti import constants 
 45  from ganeti import errors 
 46  from ganeti import utils 
 47  from ganeti import netutils 
 48  from ganeti import compat 
 49   
 50   
 51  #: Used to recognize point at which socat(1) starts to listen on its socket. 
 52  #: The local address is required for the remote peer to connect (in particular 
 53  #: the port number). 
 54  LISTENING_RE = re.compile(r"^listening on\s+" 
 55                            r"AF=(?P<family>\d+)\s+" 
 56                            r"(?P<address>.+):(?P<port>\d+)$", re.I) 
 57   
 58  #: Used to recognize point at which socat(1) is sending data over the wire 
 59  TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", 
 60                                re.I) 
 61   
 62  SOCAT_LOG_DEBUG = "D" 
 63  SOCAT_LOG_INFO = "I" 
 64  SOCAT_LOG_NOTICE = "N" 
 65  SOCAT_LOG_WARNING = "W" 
 66  SOCAT_LOG_ERROR = "E" 
 67  SOCAT_LOG_FATAL = "F" 
 68   
 69  SOCAT_LOG_IGNORE = compat.UniqueFrozenset([ 
 70    SOCAT_LOG_DEBUG, 
 71    SOCAT_LOG_INFO, 
 72    SOCAT_LOG_NOTICE, 
 73    ]) 
 74   
 75  #: Used to parse GNU dd(1) statistics 
 76  DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" 
 77                          r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) 
 78   
 79  #: Used to ignore "N+N records in/out" on dd(1)'s stderr 
 80  DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) 
 81   
 82  #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is 
 83  #: unavailable and SIGUSR1 is used instead) 
 84  DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) 
 85   
 86  #: Buffer size: at most this many bytes are transferred at once 
 87  BUFSIZE = 1024 * 1024 
 88   
 89  # Common options for socat 
 90  SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] 
 91  SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLS1", 
 92                        "cipher=%s" % constants.OPENSSL_CIPHERS] 
 93   
 94  if constants.SOCAT_USE_COMPRESS: 
 95    # Disables all compression in by OpenSSL. Only supported in patched versions 
 96    # of socat (as of November 2010). See INSTALL for more information. 
 97    SOCAT_OPENSSL_OPTS.append("compress=none") 
 98   
 99  SOCAT_OPTION_MAXLEN = 400 
100   
101  (PROG_OTHER, 
102   PROG_SOCAT, 
103   PROG_DD, 
104   PROG_DD_PID, 
105   PROG_EXP_SIZE) = range(1, 6) 
106   
107  PROG_ALL = compat.UniqueFrozenset([ 
108    PROG_OTHER, 
109    PROG_SOCAT, 
110    PROG_DD, 
111    PROG_DD_PID, 
112    PROG_EXP_SIZE, 
113    ]) 
114 115 116 -class CommandBuilder(object):
117 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
118 """Initializes this class. 119 120 @param mode: Daemon mode (import or export) 121 @param opts: Options object 122 @type socat_stderr_fd: int 123 @param socat_stderr_fd: File descriptor socat should write its stderr to 124 @type dd_stderr_fd: int 125 @param dd_stderr_fd: File descriptor dd should write its stderr to 126 @type dd_pid_fd: int 127 @param dd_pid_fd: File descriptor the child should write dd's PID to 128 129 """ 130 self._opts = opts 131 self._mode = mode 132 self._socat_stderr_fd = socat_stderr_fd 133 self._dd_stderr_fd = dd_stderr_fd 134 self._dd_pid_fd = dd_pid_fd 135 136 assert (self._opts.magic is None or 137 constants.IE_MAGIC_RE.match(self._opts.magic))
138 139 @staticmethod
140 - def GetBashCommand(cmd):
141 """Prepares a command to be run in Bash. 142 143 """ 144 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
145
146 - def _GetSocatCommand(self):
147 """Returns the socat command. 148 149 """ 150 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ 151 "key=%s" % self._opts.key, 152 "cert=%s" % self._opts.cert, 153 "cafile=%s" % self._opts.ca, 154 ] 155 156 if self._opts.bind is not None: 157 common_addr_opts.append("bind=%s" % self._opts.bind) 158 159 assert not (self._opts.ipv4 and self._opts.ipv6) 160 161 if self._opts.ipv4: 162 common_addr_opts.append("pf=ipv4") 163 elif self._opts.ipv6: 164 common_addr_opts.append("pf=ipv6") 165 166 if self._mode == constants.IEM_IMPORT: 167 if self._opts.port is None: 168 port = 0 169 else: 170 port = self._opts.port 171 172 addr1 = [ 173 "OPENSSL-LISTEN:%s" % port, 174 "reuseaddr", 175 176 # Retry to listen if connection wasn't established successfully, up to 177 # 100 times a second. Note that this still leaves room for DoS attacks. 178 "forever", 179 "intervall=0.01", 180 ] + common_addr_opts 181 addr2 = ["stdout"] 182 183 elif self._mode == constants.IEM_EXPORT: 184 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host): 185 host = "[%s]" % self._opts.host 186 else: 187 host = self._opts.host 188 189 addr1 = ["stdin"] 190 addr2 = [ 191 "OPENSSL:%s:%s" % (host, self._opts.port), 192 193 # How long to wait per connection attempt 194 "connect-timeout=%s" % self._opts.connect_timeout, 195 196 # Retry a few times before giving up to connect (once per second) 197 "retry=%s" % self._opts.connect_retries, 198 "intervall=1", 199 ] + common_addr_opts 200 201 else: 202 raise errors.GenericError("Invalid mode '%s'" % self._mode) 203 204 for i in [addr1, addr2]: 205 for value in i: 206 if len(value) > SOCAT_OPTION_MAXLEN: 207 raise errors.GenericError("Socat option longer than %s" 208 " characters: %r" % 209 (SOCAT_OPTION_MAXLEN, value)) 210 if "," in value: 211 raise errors.GenericError("Comma not allowed in socat option" 212 " value: %r" % value) 213 214 return [ 215 constants.SOCAT_PATH, 216 217 # Log to stderr 218 "-ls", 219 220 # Log level 221 "-d", "-d", 222 223 # Buffer size 224 "-b%s" % BUFSIZE, 225 226 # Unidirectional mode, the first address is only used for reading, and the 227 # second address is only used for writing 228 "-u", 229 230 ",".join(addr1), ",".join(addr2), 231 ]
232
233 - def _GetMagicCommand(self):
234 """Returns the command to read/write the magic value. 235 236 """ 237 if not self._opts.magic: 238 return None 239 240 # Prefix to ensure magic isn't interpreted as option to "echo" 241 magic = "M=%s" % self._opts.magic 242 243 cmd = StringIO() 244 245 if self._mode == constants.IEM_IMPORT: 246 cmd.write("{ ") 247 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) 248 cmd.write(" && ") 249 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic)) 250 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) 251 cmd.write(" exit 1;") 252 cmd.write("fi;") 253 cmd.write(" }") 254 255 elif self._mode == constants.IEM_EXPORT: 256 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) 257 258 else: 259 raise errors.GenericError("Invalid mode '%s'" % self._mode) 260 261 return cmd.getvalue()
262
263 - def _GetDdCommand(self):
264 """Returns the command for measuring throughput. 265 266 """ 267 dd_cmd = StringIO() 268 269 magic_cmd = self._GetMagicCommand() 270 if magic_cmd: 271 dd_cmd.write("{ ") 272 dd_cmd.write(magic_cmd) 273 dd_cmd.write(" && ") 274 275 dd_cmd.write("{ ") 276 # Setting LC_ALL since we want to parse the output and explicitly 277 # redirecting stdin, as the background process (dd) would have 278 # /dev/null as stdin otherwise 279 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % 280 (BUFSIZE, self._dd_stderr_fd)) 281 # Send PID to daemon 282 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) 283 # And wait for dd 284 dd_cmd.write(" wait $pid;") 285 dd_cmd.write(" }") 286 287 if magic_cmd: 288 dd_cmd.write(" }") 289 290 return dd_cmd.getvalue()
291
292 - def _GetTransportCommand(self):
293 """Returns the command for the transport part of the daemon. 294 295 """ 296 socat_cmd = ("%s 2>&%d" % 297 (utils.ShellQuoteArgs(self._GetSocatCommand()), 298 self._socat_stderr_fd)) 299 dd_cmd = self._GetDdCommand() 300 301 compr = self._opts.compress 302 303 parts = [] 304 305 if self._mode == constants.IEM_IMPORT: 306 parts.append(socat_cmd) 307 308 if compr in [constants.IEC_GZIP, constants.IEC_GZIP_FAST, 309 constants.IEC_GZIP_SLOW, constants.IEC_LZOP]: 310 utility_name = constants.IEC_COMPRESSION_UTILITIES.get(compr, compr) 311 parts.append("%s -d -c" % utility_name) 312 elif compr != constants.IEC_NONE: 313 parts.append("%s -d" % compr) 314 else: 315 # No compression 316 pass 317 318 parts.append(dd_cmd) 319 320 elif self._mode == constants.IEM_EXPORT: 321 parts.append(dd_cmd) 322 323 if compr in [constants.IEC_GZIP_SLOW, constants.IEC_LZOP]: 324 utility_name = constants.IEC_COMPRESSION_UTILITIES.get(compr, compr) 325 parts.append("%s -c" % utility_name) 326 elif compr in [constants.IEC_GZIP_FAST, constants.IEC_GZIP]: 327 parts.append("gzip -1 -c") 328 elif compr != constants.IEC_NONE: 329 parts.append(compr) 330 else: 331 # No compression 332 pass 333 334 parts.append(socat_cmd) 335 336 else: 337 raise errors.GenericError("Invalid mode '%s'" % self._mode) 338 339 # TODO: Run transport as separate user 340 # The transport uses its own shell to simplify running it as a separate user 341 # in the future. 342 return self.GetBashCommand(" | ".join(parts))
343
344 - def GetCommand(self):
345 """Returns the complete child process command. 346 347 """ 348 transport_cmd = self._GetTransportCommand() 349 350 buf = StringIO() 351 352 if self._opts.cmd_prefix: 353 buf.write(self._opts.cmd_prefix) 354 buf.write(" ") 355 356 buf.write(utils.ShellQuoteArgs(transport_cmd)) 357 358 if self._opts.cmd_suffix: 359 buf.write(" ") 360 buf.write(self._opts.cmd_suffix) 361 362 return self.GetBashCommand(buf.getvalue())
363
364 365 -def _VerifyListening(family, address, port):
366 """Verify address given as listening address by socat. 367 368 """ 369 if family not in (socket.AF_INET, socket.AF_INET6): 370 raise errors.GenericError("Address family %r not supported" % family) 371 372 if (family == socket.AF_INET6 and address.startswith("[") and 373 address.endswith("]")): 374 address = address.lstrip("[").rstrip("]") 375 376 try: 377 packed_address = socket.inet_pton(family, address) 378 except socket.error: 379 raise errors.GenericError("Invalid address %r for family %s" % 380 (address, family)) 381 382 return (socket.inet_ntop(family, packed_address), port)
383
384 385 -class ChildIOProcessor(object):
386 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
387 """Initializes this class. 388 389 """ 390 self._debug = debug 391 self._status_file = status_file 392 self._logger = logger 393 394 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) 395 for prog in PROG_ALL]) 396 397 self._dd_pid = None 398 self._dd_ready = False 399 self._dd_tp_samples = throughput_samples 400 self._dd_progress = [] 401 402 # Expected size of transferred data 403 self._exp_size = exp_size
404
405 - def GetLineSplitter(self, prog):
406 """Returns the line splitter for a program. 407 408 """ 409 return self._splitter[prog]
410
411 - def FlushAll(self):
412 """Flushes all line splitters. 413 414 """ 415 for ls in self._splitter.itervalues(): 416 ls.flush()
417
418 - def CloseAll(self):
419 """Closes all line splitters. 420 421 """ 422 for ls in self._splitter.itervalues(): 423 ls.close() 424 self._splitter.clear()
425
426 - def NotifyDd(self):
427 """Tells dd(1) to write statistics. 428 429 """ 430 if self._dd_pid is None: 431 # Can't notify 432 return False 433 434 if not self._dd_ready: 435 # There's a race condition between starting the program and sending 436 # signals. The signal handler is only registered after some time, so we 437 # have to check whether the program is ready. If it isn't, sending a 438 # signal will invoke the default handler (and usually abort the program). 439 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): 440 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) 441 return False 442 443 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) 444 self._dd_ready = True 445 446 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) 447 try: 448 os.kill(self._dd_pid, DD_INFO_SIGNAL) 449 except EnvironmentError, err: 450 if err.errno != errno.ESRCH: 451 raise 452 453 # Process no longer exists 454 logging.debug("dd exited") 455 self._dd_pid = None 456 457 return True
458
459 - def _ProcessOutput(self, line, prog):
460 """Takes care of child process output. 461 462 @type line: string 463 @param line: Child output line 464 @type prog: number 465 @param prog: Program from which the line originates 466 467 """ 468 force_update = False 469 forward_line = line 470 471 if prog == PROG_SOCAT: 472 level = None 473 parts = line.split(None, 4) 474 475 if len(parts) == 5: 476 (_, _, _, level, msg) = parts 477 478 force_update = self._ProcessSocatOutput(self._status_file, level, msg) 479 480 if self._debug or (level and level not in SOCAT_LOG_IGNORE): 481 forward_line = "socat: %s %s" % (level, msg) 482 else: 483 forward_line = None 484 else: 485 forward_line = "socat: %s" % line 486 487 elif prog == PROG_DD: 488 (should_forward, force_update) = self._ProcessDdOutput(line) 489 490 if should_forward or self._debug: 491 forward_line = "dd: %s" % line 492 else: 493 forward_line = None 494 495 elif prog == PROG_DD_PID: 496 if self._dd_pid: 497 raise RuntimeError("dd PID reported more than once") 498 logging.debug("Received dd PID %r", line) 499 self._dd_pid = int(line) 500 forward_line = None 501 502 elif prog == PROG_EXP_SIZE: 503 logging.debug("Received predicted size %r", line) 504 forward_line = None 505 506 if line: 507 try: 508 exp_size = utils.BytesToMebibyte(int(line)) 509 except (ValueError, TypeError), err: 510 logging.error("Failed to convert predicted size %r to number: %s", 511 line, err) 512 exp_size = None 513 else: 514 exp_size = None 515 516 self._exp_size = exp_size 517 518 if forward_line: 519 self._logger.info(forward_line) 520 self._status_file.AddRecentOutput(forward_line) 521 522 self._status_file.Update(force_update)
523 524 @staticmethod
525 - def _ProcessSocatOutput(status_file, level, msg):
526 """Interprets socat log output. 527 528 """ 529 if level == SOCAT_LOG_NOTICE: 530 if status_file.GetListenPort() is None: 531 # TODO: Maybe implement timeout to not listen forever 532 m = LISTENING_RE.match(msg) 533 if m: 534 (_, port) = _VerifyListening(int(m.group("family")), 535 m.group("address"), 536 int(m.group("port"))) 537 538 status_file.SetListenPort(port) 539 return True 540 541 if not status_file.GetConnected(): 542 m = TRANSFER_LOOP_RE.match(msg) 543 if m: 544 logging.debug("Connection established") 545 status_file.SetConnected() 546 return True 547 548 return False
549
550 - def _ProcessDdOutput(self, line):
551 """Interprets a line of dd(1)'s output. 552 553 """ 554 m = DD_INFO_RE.match(line) 555 if m: 556 seconds = float(m.group("seconds")) 557 mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) 558 self._UpdateDdProgress(seconds, mbytes) 559 return (False, True) 560 561 m = DD_STDERR_IGNORE.match(line) 562 if m: 563 # Ignore 564 return (False, False) 565 566 # Forward line 567 return (True, False)
568
569 - def _UpdateDdProgress(self, seconds, mbytes):
570 """Updates the internal status variables for dd(1) progress. 571 572 @type seconds: float 573 @param seconds: Timestamp of this update 574 @type mbytes: float 575 @param mbytes: Total number of MiB transferred so far 576 577 """ 578 # Add latest sample 579 self._dd_progress.append((seconds, mbytes)) 580 581 # Remove old samples 582 del self._dd_progress[:-self._dd_tp_samples] 583 584 # Calculate throughput 585 throughput = _CalcThroughput(self._dd_progress) 586 587 # Calculate percent and ETA 588 percent = None 589 eta = None 590 591 if self._exp_size is not None: 592 if self._exp_size != 0: 593 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) 594 595 if throughput: 596 eta = max(0, float(self._exp_size - mbytes) / throughput) 597 598 self._status_file.SetProgress(mbytes, throughput, percent, eta)
599
600 601 -def _CalcThroughput(samples):
602 """Calculates the throughput in MiB/second. 603 604 @type samples: sequence 605 @param samples: List of samples, each consisting of a (timestamp, mbytes) 606 tuple 607 @rtype: float or None 608 @return: Throughput in MiB/second 609 610 """ 611 if len(samples) < 2: 612 # Can't calculate throughput 613 return None 614 615 (start_time, start_mbytes) = samples[0] 616 (end_time, end_mbytes) = samples[-1] 617 618 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
619