Package ganeti :: Package impexpd
[hide private]
[frames] | no frames]

Source Code for Package ganeti.impexpd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2010 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Classes and functions for import/export daemon. 
 32   
 33  """ 
 34   
 35  import os 
 36  import re 
 37  import socket 
 38  import logging 
 39  import signal 
 40  import errno 
 41  import time 
 42  from cStringIO import StringIO 
 43   
 44  from ganeti import constants 
 45  from ganeti import errors 
 46  from ganeti import utils 
 47  from ganeti import netutils 
 48  from ganeti import compat 
 49   
 50   
 51  #: Used to recognize point at which socat(1) starts to listen on its socket. 
 52  #: The local address is required for the remote peer to connect (in particular 
 53  #: the port number). 
 54  LISTENING_RE = re.compile(r"^listening on\s+" 
 55                            r"AF=(?P<family>\d+)\s+" 
 56                            r"(?P<address>.+):(?P<port>\d+)$", re.I) 
 57   
 58  #: Used to recognize point at which socat(1) is sending data over the wire 
 59  TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", 
 60                                re.I) 
 61   
 62  SOCAT_LOG_DEBUG = "D" 
 63  SOCAT_LOG_INFO = "I" 
 64  SOCAT_LOG_NOTICE = "N" 
 65  SOCAT_LOG_WARNING = "W" 
 66  SOCAT_LOG_ERROR = "E" 
 67  SOCAT_LOG_FATAL = "F" 
 68   
 69  SOCAT_LOG_IGNORE = compat.UniqueFrozenset([ 
 70    SOCAT_LOG_DEBUG, 
 71    SOCAT_LOG_INFO, 
 72    SOCAT_LOG_NOTICE, 
 73    ]) 
 74   
 75  #: Used to parse GNU dd(1) statistics 
 76  DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" 
 77                          r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) 
 78   
 79  #: Used to ignore "N+N records in/out" on dd(1)'s stderr 
 80  DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) 
 81   
 82  #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is 
 83  #: unavailable and SIGUSR1 is used instead) 
 84  DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) 
 85   
 86  #: Buffer size: at most this many bytes are transferred at once 
 87  BUFSIZE = 1024 * 1024 
 88   
 89  # Common options for socat 
 90  SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] 
 91  SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", 
 92                        "cipher=%s" % constants.OPENSSL_CIPHERS] 
 93   
 94  if constants.SOCAT_USE_COMPRESS: 
 95    # Disables all compression in by OpenSSL. Only supported in patched versions 
 96    # of socat (as of November 2010). See INSTALL for more information. 
 97    SOCAT_OPENSSL_OPTS.append("compress=none") 
 98   
 99  SOCAT_OPTION_MAXLEN = 400 
100   
101  (PROG_OTHER, 
102   PROG_SOCAT, 
103   PROG_DD, 
104   PROG_DD_PID, 
105   PROG_EXP_SIZE) = range(1, 6) 
106   
107  PROG_ALL = compat.UniqueFrozenset([ 
108    PROG_OTHER, 
109    PROG_SOCAT, 
110    PROG_DD, 
111    PROG_DD_PID, 
112    PROG_EXP_SIZE, 
113    ]) 
114 115 116 -class CommandBuilder(object):
117 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
118 """Initializes this class. 119 120 @param mode: Daemon mode (import or export) 121 @param opts: Options object 122 @type socat_stderr_fd: int 123 @param socat_stderr_fd: File descriptor socat should write its stderr to 124 @type dd_stderr_fd: int 125 @param dd_stderr_fd: File descriptor dd should write its stderr to 126 @type dd_pid_fd: int 127 @param dd_pid_fd: File descriptor the child should write dd's PID to 128 129 """ 130 self._opts = opts 131 self._mode = mode 132 self._socat_stderr_fd = socat_stderr_fd 133 self._dd_stderr_fd = dd_stderr_fd 134 self._dd_pid_fd = dd_pid_fd 135 136 assert (self._opts.magic is None or 137 constants.IE_MAGIC_RE.match(self._opts.magic))
138 139 @staticmethod
140 - def GetBashCommand(cmd):
141 """Prepares a command to be run in Bash. 142 143 """ 144 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
145
146 - def _GetSocatCommand(self):
147 """Returns the socat command. 148 149 """ 150 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ 151 "key=%s" % self._opts.key, 152 "cert=%s" % self._opts.cert, 153 "cafile=%s" % self._opts.ca, 154 ] 155 156 if self._opts.bind is not None: 157 common_addr_opts.append("bind=%s" % self._opts.bind) 158 159 assert not (self._opts.ipv4 and self._opts.ipv6) 160 161 if self._opts.ipv4: 162 common_addr_opts.append("pf=ipv4") 163 elif self._opts.ipv6: 164 common_addr_opts.append("pf=ipv6") 165 166 if self._mode == constants.IEM_IMPORT: 167 if self._opts.port is None: 168 port = 0 169 else: 170 port = self._opts.port 171 172 addr1 = [ 173 "OPENSSL-LISTEN:%s" % port, 174 "reuseaddr", 175 176 # Retry to listen if connection wasn't established successfully, up to 177 # 100 times a second. Note that this still leaves room for DoS attacks. 178 "forever", 179 "intervall=0.01", 180 ] + common_addr_opts 181 addr2 = ["stdout"] 182 183 elif self._mode == constants.IEM_EXPORT: 184 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host): 185 host = "[%s]" % self._opts.host 186 else: 187 host = self._opts.host 188 189 addr1 = ["stdin"] 190 addr2 = [ 191 "OPENSSL:%s:%s" % (host, self._opts.port), 192 193 # How long to wait per connection attempt 194 "connect-timeout=%s" % self._opts.connect_timeout, 195 196 # Retry a few times before giving up to connect (once per second) 197 "retry=%s" % self._opts.connect_retries, 198 "intervall=1", 199 ] + common_addr_opts 200 201 else: 202 raise errors.GenericError("Invalid mode '%s'" % self._mode) 203 204 for i in [addr1, addr2]: 205 for value in i: 206 if len(value) > SOCAT_OPTION_MAXLEN: 207 raise errors.GenericError("Socat option longer than %s" 208 " characters: %r" % 209 (SOCAT_OPTION_MAXLEN, value)) 210 if "," in value: 211 raise errors.GenericError("Comma not allowed in socat option" 212 " value: %r" % value) 213 214 return [ 215 constants.SOCAT_PATH, 216 217 # Log to stderr 218 "-ls", 219 220 # Log level 221 "-d", "-d", 222 223 # Buffer size 224 "-b%s" % BUFSIZE, 225 226 # Unidirectional mode, the first address is only used for reading, and the 227 # second address is only used for writing 228 "-u", 229 230 ",".join(addr1), ",".join(addr2), 231 ]
232
233 - def _GetMagicCommand(self):
234 """Returns the command to read/write the magic value. 235 236 """ 237 if not self._opts.magic: 238 return None 239 240 # Prefix to ensure magic isn't interpreted as option to "echo" 241 magic = "M=%s" % self._opts.magic 242 243 cmd = StringIO() 244 245 if self._mode == constants.IEM_IMPORT: 246 cmd.write("{ ") 247 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) 248 cmd.write(" && ") 249 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic)) 250 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) 251 cmd.write(" exit 1;") 252 cmd.write("fi;") 253 cmd.write(" }") 254 255 elif self._mode == constants.IEM_EXPORT: 256 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) 257 258 else: 259 raise errors.GenericError("Invalid mode '%s'" % self._mode) 260 261 return cmd.getvalue()
262
263 - def _GetDdCommand(self):
264 """Returns the command for measuring throughput. 265 266 """ 267 dd_cmd = StringIO() 268 269 magic_cmd = self._GetMagicCommand() 270 if magic_cmd: 271 dd_cmd.write("{ ") 272 dd_cmd.write(magic_cmd) 273 dd_cmd.write(" && ") 274 275 dd_cmd.write("{ ") 276 # Setting LC_ALL since we want to parse the output and explicitly 277 # redirecting stdin, as the background process (dd) would have 278 # /dev/null as stdin otherwise 279 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % 280 (BUFSIZE, self._dd_stderr_fd)) 281 # Send PID to daemon 282 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) 283 # And wait for dd 284 dd_cmd.write(" wait $pid;") 285 dd_cmd.write(" }") 286 287 if magic_cmd: 288 dd_cmd.write(" }") 289 290 return dd_cmd.getvalue()
291
292 - def _GetTransportCommand(self):
293 """Returns the command for the transport part of the daemon. 294 295 """ 296 socat_cmd = ("%s 2>&%d" % 297 (utils.ShellQuoteArgs(self._GetSocatCommand()), 298 self._socat_stderr_fd)) 299 dd_cmd = self._GetDdCommand() 300 301 compr = self._opts.compress 302 303 assert compr in constants.IEC_ALL 304 305 parts = [] 306 307 if self._mode == constants.IEM_IMPORT: 308 parts.append(socat_cmd) 309 310 if compr == constants.IEC_GZIP: 311 parts.append("gunzip -c") 312 313 parts.append(dd_cmd) 314 315 elif self._mode == constants.IEM_EXPORT: 316 parts.append(dd_cmd) 317 318 if compr == constants.IEC_GZIP: 319 parts.append("gzip -c") 320 321 parts.append(socat_cmd) 322 323 else: 324 raise errors.GenericError("Invalid mode '%s'" % self._mode) 325 326 # TODO: Run transport as separate user 327 # The transport uses its own shell to simplify running it as a separate user 328 # in the future. 329 return self.GetBashCommand(" | ".join(parts))
330
331 - def GetCommand(self):
332 """Returns the complete child process command. 333 334 """ 335 transport_cmd = self._GetTransportCommand() 336 337 buf = StringIO() 338 339 if self._opts.cmd_prefix: 340 buf.write(self._opts.cmd_prefix) 341 buf.write(" ") 342 343 buf.write(utils.ShellQuoteArgs(transport_cmd)) 344 345 if self._opts.cmd_suffix: 346 buf.write(" ") 347 buf.write(self._opts.cmd_suffix) 348 349 return self.GetBashCommand(buf.getvalue())
350
351 352 -def _VerifyListening(family, address, port):
353 """Verify address given as listening address by socat. 354 355 """ 356 if family not in (socket.AF_INET, socket.AF_INET6): 357 raise errors.GenericError("Address family %r not supported" % family) 358 359 if (family == socket.AF_INET6 and address.startswith("[") and 360 address.endswith("]")): 361 address = address.lstrip("[").rstrip("]") 362 363 try: 364 packed_address = socket.inet_pton(family, address) 365 except socket.error: 366 raise errors.GenericError("Invalid address %r for family %s" % 367 (address, family)) 368 369 return (socket.inet_ntop(family, packed_address), port)
370
371 372 -class ChildIOProcessor(object):
373 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
374 """Initializes this class. 375 376 """ 377 self._debug = debug 378 self._status_file = status_file 379 self._logger = logger 380 381 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) 382 for prog in PROG_ALL]) 383 384 self._dd_pid = None 385 self._dd_ready = False 386 self._dd_tp_samples = throughput_samples 387 self._dd_progress = [] 388 389 # Expected size of transferred data 390 self._exp_size = exp_size
391
392 - def GetLineSplitter(self, prog):
393 """Returns the line splitter for a program. 394 395 """ 396 return self._splitter[prog]
397
398 - def FlushAll(self):
399 """Flushes all line splitters. 400 401 """ 402 for ls in self._splitter.itervalues(): 403 ls.flush()
404
405 - def CloseAll(self):
406 """Closes all line splitters. 407 408 """ 409 for ls in self._splitter.itervalues(): 410 ls.close() 411 self._splitter.clear()
412
413 - def NotifyDd(self):
414 """Tells dd(1) to write statistics. 415 416 """ 417 if self._dd_pid is None: 418 # Can't notify 419 return False 420 421 if not self._dd_ready: 422 # There's a race condition between starting the program and sending 423 # signals. The signal handler is only registered after some time, so we 424 # have to check whether the program is ready. If it isn't, sending a 425 # signal will invoke the default handler (and usually abort the program). 426 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): 427 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) 428 return False 429 430 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) 431 self._dd_ready = True 432 433 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) 434 try: 435 os.kill(self._dd_pid, DD_INFO_SIGNAL) 436 except EnvironmentError, err: 437 if err.errno != errno.ESRCH: 438 raise 439 440 # Process no longer exists 441 logging.debug("dd exited") 442 self._dd_pid = None 443 444 return True
445
446 - def _ProcessOutput(self, line, prog):
447 """Takes care of child process output. 448 449 @type line: string 450 @param line: Child output line 451 @type prog: number 452 @param prog: Program from which the line originates 453 454 """ 455 force_update = False 456 forward_line = line 457 458 if prog == PROG_SOCAT: 459 level = None 460 parts = line.split(None, 4) 461 462 if len(parts) == 5: 463 (_, _, _, level, msg) = parts 464 465 force_update = self._ProcessSocatOutput(self._status_file, level, msg) 466 467 if self._debug or (level and level not in SOCAT_LOG_IGNORE): 468 forward_line = "socat: %s %s" % (level, msg) 469 else: 470 forward_line = None 471 else: 472 forward_line = "socat: %s" % line 473 474 elif prog == PROG_DD: 475 (should_forward, force_update) = self._ProcessDdOutput(line) 476 477 if should_forward or self._debug: 478 forward_line = "dd: %s" % line 479 else: 480 forward_line = None 481 482 elif prog == PROG_DD_PID: 483 if self._dd_pid: 484 raise RuntimeError("dd PID reported more than once") 485 logging.debug("Received dd PID %r", line) 486 self._dd_pid = int(line) 487 forward_line = None 488 489 elif prog == PROG_EXP_SIZE: 490 logging.debug("Received predicted size %r", line) 491 forward_line = None 492 493 if line: 494 try: 495 exp_size = utils.BytesToMebibyte(int(line)) 496 except (ValueError, TypeError), err: 497 logging.error("Failed to convert predicted size %r to number: %s", 498 line, err) 499 exp_size = None 500 else: 501 exp_size = None 502 503 self._exp_size = exp_size 504 505 if forward_line: 506 self._logger.info(forward_line) 507 self._status_file.AddRecentOutput(forward_line) 508 509 self._status_file.Update(force_update)
510 511 @staticmethod
512 - def _ProcessSocatOutput(status_file, level, msg):
513 """Interprets socat log output. 514 515 """ 516 if level == SOCAT_LOG_NOTICE: 517 if status_file.GetListenPort() is None: 518 # TODO: Maybe implement timeout to not listen forever 519 m = LISTENING_RE.match(msg) 520 if m: 521 (_, port) = _VerifyListening(int(m.group("family")), 522 m.group("address"), 523 int(m.group("port"))) 524 525 status_file.SetListenPort(port) 526 return True 527 528 if not status_file.GetConnected(): 529 m = TRANSFER_LOOP_RE.match(msg) 530 if m: 531 logging.debug("Connection established") 532 status_file.SetConnected() 533 return True 534 535 return False
536
537 - def _ProcessDdOutput(self, line):
538 """Interprets a line of dd(1)'s output. 539 540 """ 541 m = DD_INFO_RE.match(line) 542 if m: 543 seconds = float(m.group("seconds")) 544 mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) 545 self._UpdateDdProgress(seconds, mbytes) 546 return (False, True) 547 548 m = DD_STDERR_IGNORE.match(line) 549 if m: 550 # Ignore 551 return (False, False) 552 553 # Forward line 554 return (True, False)
555
556 - def _UpdateDdProgress(self, seconds, mbytes):
557 """Updates the internal status variables for dd(1) progress. 558 559 @type seconds: float 560 @param seconds: Timestamp of this update 561 @type mbytes: float 562 @param mbytes: Total number of MiB transferred so far 563 564 """ 565 # Add latest sample 566 self._dd_progress.append((seconds, mbytes)) 567 568 # Remove old samples 569 del self._dd_progress[:-self._dd_tp_samples] 570 571 # Calculate throughput 572 throughput = _CalcThroughput(self._dd_progress) 573 574 # Calculate percent and ETA 575 percent = None 576 eta = None 577 578 if self._exp_size is not None: 579 if self._exp_size != 0: 580 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) 581 582 if throughput: 583 eta = max(0, float(self._exp_size - mbytes) / throughput) 584 585 self._status_file.SetProgress(mbytes, throughput, percent, eta)
586
587 588 -def _CalcThroughput(samples):
589 """Calculates the throughput in MiB/second. 590 591 @type samples: sequence 592 @param samples: List of samples, each consisting of a (timestamp, mbytes) 593 tuple 594 @rtype: float or None 595 @return: Throughput in MiB/second 596 597 """ 598 if len(samples) < 2: 599 # Can't calculate throughput 600 return None 601 602 (start_time, start_mbytes) = samples[0] 603 (end_time, end_mbytes) = samples[-1] 604 605 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
606