Package ganeti :: Package impexpd
[hide private]
[frames] | no frames]

Source Code for Package ganeti.impexpd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Classes and functions for import/export daemon. 
 23   
 24  """ 
 25   
 26  import os 
 27  import re 
 28  import socket 
 29  import logging 
 30  import signal 
 31  import errno 
 32  import time 
 33  from cStringIO import StringIO 
 34   
 35  from ganeti import constants 
 36  from ganeti import errors 
 37  from ganeti import utils 
 38  from ganeti import netutils 
 39  from ganeti import compat 
 40   
 41   
 42  #: Used to recognize point at which socat(1) starts to listen on its socket. 
 43  #: The local address is required for the remote peer to connect (in particular 
 44  #: the port number). 
 45  LISTENING_RE = re.compile(r"^listening on\s+" 
 46                            r"AF=(?P<family>\d+)\s+" 
 47                            r"(?P<address>.+):(?P<port>\d+)$", re.I) 
 48   
 49  #: Used to recognize point at which socat(1) is sending data over the wire 
 50  TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", 
 51                                re.I) 
 52   
 53  SOCAT_LOG_DEBUG = "D" 
 54  SOCAT_LOG_INFO = "I" 
 55  SOCAT_LOG_NOTICE = "N" 
 56  SOCAT_LOG_WARNING = "W" 
 57  SOCAT_LOG_ERROR = "E" 
 58  SOCAT_LOG_FATAL = "F" 
 59   
 60  SOCAT_LOG_IGNORE = compat.UniqueFrozenset([ 
 61    SOCAT_LOG_DEBUG, 
 62    SOCAT_LOG_INFO, 
 63    SOCAT_LOG_NOTICE, 
 64    ]) 
 65   
 66  #: Used to parse GNU dd(1) statistics 
 67  DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" 
 68                          r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) 
 69   
 70  #: Used to ignore "N+N records in/out" on dd(1)'s stderr 
 71  DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) 
 72   
 73  #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is 
 74  #: unavailable and SIGUSR1 is used instead) 
 75  DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) 
 76   
 77  #: Buffer size: at most this many bytes are transferred at once 
 78  BUFSIZE = 1024 * 1024 
 79   
 80  # Common options for socat 
 81  SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] 
 82  SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", 
 83                        "cipher=%s" % constants.OPENSSL_CIPHERS] 
 84   
 85  if constants.SOCAT_USE_COMPRESS: 
 86    # Disables all compression in by OpenSSL. Only supported in patched versions 
 87    # of socat (as of November 2010). See INSTALL for more information. 
 88    SOCAT_OPENSSL_OPTS.append("compress=none") 
 89   
 90  SOCAT_OPTION_MAXLEN = 400 
 91   
 92  (PROG_OTHER, 
 93   PROG_SOCAT, 
 94   PROG_DD, 
 95   PROG_DD_PID, 
 96   PROG_EXP_SIZE) = range(1, 6) 
 97   
 98  PROG_ALL = compat.UniqueFrozenset([ 
 99    PROG_OTHER, 
100    PROG_SOCAT, 
101    PROG_DD, 
102    PROG_DD_PID, 
103    PROG_EXP_SIZE, 
104    ]) 
105 106 107 -class CommandBuilder(object):
108 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
109 """Initializes this class. 110 111 @param mode: Daemon mode (import or export) 112 @param opts: Options object 113 @type socat_stderr_fd: int 114 @param socat_stderr_fd: File descriptor socat should write its stderr to 115 @type dd_stderr_fd: int 116 @param dd_stderr_fd: File descriptor dd should write its stderr to 117 @type dd_pid_fd: int 118 @param dd_pid_fd: File descriptor the child should write dd's PID to 119 120 """ 121 self._opts = opts 122 self._mode = mode 123 self._socat_stderr_fd = socat_stderr_fd 124 self._dd_stderr_fd = dd_stderr_fd 125 self._dd_pid_fd = dd_pid_fd 126 127 assert (self._opts.magic is None or 128 constants.IE_MAGIC_RE.match(self._opts.magic))
129 130 @staticmethod
131 - def GetBashCommand(cmd):
132 """Prepares a command to be run in Bash. 133 134 """ 135 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
136
137 - def _GetSocatCommand(self):
138 """Returns the socat command. 139 140 """ 141 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ 142 "key=%s" % self._opts.key, 143 "cert=%s" % self._opts.cert, 144 "cafile=%s" % self._opts.ca, 145 ] 146 147 if self._opts.bind is not None: 148 common_addr_opts.append("bind=%s" % self._opts.bind) 149 150 assert not (self._opts.ipv4 and self._opts.ipv6) 151 152 if self._opts.ipv4: 153 common_addr_opts.append("pf=ipv4") 154 elif self._opts.ipv6: 155 common_addr_opts.append("pf=ipv6") 156 157 if self._mode == constants.IEM_IMPORT: 158 if self._opts.port is None: 159 port = 0 160 else: 161 port = self._opts.port 162 163 addr1 = [ 164 "OPENSSL-LISTEN:%s" % port, 165 "reuseaddr", 166 167 # Retry to listen if connection wasn't established successfully, up to 168 # 100 times a second. Note that this still leaves room for DoS attacks. 169 "forever", 170 "intervall=0.01", 171 ] + common_addr_opts 172 addr2 = ["stdout"] 173 174 elif self._mode == constants.IEM_EXPORT: 175 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host): 176 host = "[%s]" % self._opts.host 177 else: 178 host = self._opts.host 179 180 addr1 = ["stdin"] 181 addr2 = [ 182 "OPENSSL:%s:%s" % (host, self._opts.port), 183 184 # How long to wait per connection attempt 185 "connect-timeout=%s" % self._opts.connect_timeout, 186 187 # Retry a few times before giving up to connect (once per second) 188 "retry=%s" % self._opts.connect_retries, 189 "intervall=1", 190 ] + common_addr_opts 191 192 else: 193 raise errors.GenericError("Invalid mode '%s'" % self._mode) 194 195 for i in [addr1, addr2]: 196 for value in i: 197 if len(value) > SOCAT_OPTION_MAXLEN: 198 raise errors.GenericError("Socat option longer than %s" 199 " characters: %r" % 200 (SOCAT_OPTION_MAXLEN, value)) 201 if "," in value: 202 raise errors.GenericError("Comma not allowed in socat option" 203 " value: %r" % value) 204 205 return [ 206 constants.SOCAT_PATH, 207 208 # Log to stderr 209 "-ls", 210 211 # Log level 212 "-d", "-d", 213 214 # Buffer size 215 "-b%s" % BUFSIZE, 216 217 # Unidirectional mode, the first address is only used for reading, and the 218 # second address is only used for writing 219 "-u", 220 221 ",".join(addr1), ",".join(addr2), 222 ]
223
224 - def _GetMagicCommand(self):
225 """Returns the command to read/write the magic value. 226 227 """ 228 if not self._opts.magic: 229 return None 230 231 # Prefix to ensure magic isn't interpreted as option to "echo" 232 magic = "M=%s" % self._opts.magic 233 234 cmd = StringIO() 235 236 if self._mode == constants.IEM_IMPORT: 237 cmd.write("{ ") 238 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) 239 cmd.write(" && ") 240 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic)) 241 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) 242 cmd.write(" exit 1;") 243 cmd.write("fi;") 244 cmd.write(" }") 245 246 elif self._mode == constants.IEM_EXPORT: 247 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) 248 249 else: 250 raise errors.GenericError("Invalid mode '%s'" % self._mode) 251 252 return cmd.getvalue()
253
254 - def _GetDdCommand(self):
255 """Returns the command for measuring throughput. 256 257 """ 258 dd_cmd = StringIO() 259 260 magic_cmd = self._GetMagicCommand() 261 if magic_cmd: 262 dd_cmd.write("{ ") 263 dd_cmd.write(magic_cmd) 264 dd_cmd.write(" && ") 265 266 dd_cmd.write("{ ") 267 # Setting LC_ALL since we want to parse the output and explicitly 268 # redirecting stdin, as the background process (dd) would have 269 # /dev/null as stdin otherwise 270 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % 271 (BUFSIZE, self._dd_stderr_fd)) 272 # Send PID to daemon 273 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) 274 # And wait for dd 275 dd_cmd.write(" wait $pid;") 276 dd_cmd.write(" }") 277 278 if magic_cmd: 279 dd_cmd.write(" }") 280 281 return dd_cmd.getvalue()
282
283 - def _GetTransportCommand(self):
284 """Returns the command for the transport part of the daemon. 285 286 """ 287 socat_cmd = ("%s 2>&%d" % 288 (utils.ShellQuoteArgs(self._GetSocatCommand()), 289 self._socat_stderr_fd)) 290 dd_cmd = self._GetDdCommand() 291 292 compr = self._opts.compress 293 294 assert compr in constants.IEC_ALL 295 296 parts = [] 297 298 if self._mode == constants.IEM_IMPORT: 299 parts.append(socat_cmd) 300 301 if compr == constants.IEC_GZIP: 302 parts.append("gunzip -c") 303 304 parts.append(dd_cmd) 305 306 elif self._mode == constants.IEM_EXPORT: 307 parts.append(dd_cmd) 308 309 if compr == constants.IEC_GZIP: 310 parts.append("gzip -c") 311 312 parts.append(socat_cmd) 313 314 else: 315 raise errors.GenericError("Invalid mode '%s'" % self._mode) 316 317 # TODO: Run transport as separate user 318 # The transport uses its own shell to simplify running it as a separate user 319 # in the future. 320 return self.GetBashCommand(" | ".join(parts))
321
322 - def GetCommand(self):
323 """Returns the complete child process command. 324 325 """ 326 transport_cmd = self._GetTransportCommand() 327 328 buf = StringIO() 329 330 if self._opts.cmd_prefix: 331 buf.write(self._opts.cmd_prefix) 332 buf.write(" ") 333 334 buf.write(utils.ShellQuoteArgs(transport_cmd)) 335 336 if self._opts.cmd_suffix: 337 buf.write(" ") 338 buf.write(self._opts.cmd_suffix) 339 340 return self.GetBashCommand(buf.getvalue())
341
342 343 -def _VerifyListening(family, address, port):
344 """Verify address given as listening address by socat. 345 346 """ 347 if family not in (socket.AF_INET, socket.AF_INET6): 348 raise errors.GenericError("Address family %r not supported" % family) 349 350 if (family == socket.AF_INET6 and address.startswith("[") and 351 address.endswith("]")): 352 address = address.lstrip("[").rstrip("]") 353 354 try: 355 packed_address = socket.inet_pton(family, address) 356 except socket.error: 357 raise errors.GenericError("Invalid address %r for family %s" % 358 (address, family)) 359 360 return (socket.inet_ntop(family, packed_address), port)
361
362 363 -class ChildIOProcessor(object):
364 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
365 """Initializes this class. 366 367 """ 368 self._debug = debug 369 self._status_file = status_file 370 self._logger = logger 371 372 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) 373 for prog in PROG_ALL]) 374 375 self._dd_pid = None 376 self._dd_ready = False 377 self._dd_tp_samples = throughput_samples 378 self._dd_progress = [] 379 380 # Expected size of transferred data 381 self._exp_size = exp_size
382
383 - def GetLineSplitter(self, prog):
384 """Returns the line splitter for a program. 385 386 """ 387 return self._splitter[prog]
388
389 - def FlushAll(self):
390 """Flushes all line splitters. 391 392 """ 393 for ls in self._splitter.itervalues(): 394 ls.flush()
395
396 - def CloseAll(self):
397 """Closes all line splitters. 398 399 """ 400 for ls in self._splitter.itervalues(): 401 ls.close() 402 self._splitter.clear()
403
404 - def NotifyDd(self):
405 """Tells dd(1) to write statistics. 406 407 """ 408 if self._dd_pid is None: 409 # Can't notify 410 return False 411 412 if not self._dd_ready: 413 # There's a race condition between starting the program and sending 414 # signals. The signal handler is only registered after some time, so we 415 # have to check whether the program is ready. If it isn't, sending a 416 # signal will invoke the default handler (and usually abort the program). 417 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): 418 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) 419 return False 420 421 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) 422 self._dd_ready = True 423 424 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) 425 try: 426 os.kill(self._dd_pid, DD_INFO_SIGNAL) 427 except EnvironmentError, err: 428 if err.errno != errno.ESRCH: 429 raise 430 431 # Process no longer exists 432 logging.debug("dd exited") 433 self._dd_pid = None 434 435 return True
436
437 - def _ProcessOutput(self, line, prog):
438 """Takes care of child process output. 439 440 @type line: string 441 @param line: Child output line 442 @type prog: number 443 @param prog: Program from which the line originates 444 445 """ 446 force_update = False 447 forward_line = line 448 449 if prog == PROG_SOCAT: 450 level = None 451 parts = line.split(None, 4) 452 453 if len(parts) == 5: 454 (_, _, _, level, msg) = parts 455 456 force_update = self._ProcessSocatOutput(self._status_file, level, msg) 457 458 if self._debug or (level and level not in SOCAT_LOG_IGNORE): 459 forward_line = "socat: %s %s" % (level, msg) 460 else: 461 forward_line = None 462 else: 463 forward_line = "socat: %s" % line 464 465 elif prog == PROG_DD: 466 (should_forward, force_update) = self._ProcessDdOutput(line) 467 468 if should_forward or self._debug: 469 forward_line = "dd: %s" % line 470 else: 471 forward_line = None 472 473 elif prog == PROG_DD_PID: 474 if self._dd_pid: 475 raise RuntimeError("dd PID reported more than once") 476 logging.debug("Received dd PID %r", line) 477 self._dd_pid = int(line) 478 forward_line = None 479 480 elif prog == PROG_EXP_SIZE: 481 logging.debug("Received predicted size %r", line) 482 forward_line = None 483 484 if line: 485 try: 486 exp_size = utils.BytesToMebibyte(int(line)) 487 except (ValueError, TypeError), err: 488 logging.error("Failed to convert predicted size %r to number: %s", 489 line, err) 490 exp_size = None 491 else: 492 exp_size = None 493 494 self._exp_size = exp_size 495 496 if forward_line: 497 self._logger.info(forward_line) 498 self._status_file.AddRecentOutput(forward_line) 499 500 self._status_file.Update(force_update)
501 502 @staticmethod
503 - def _ProcessSocatOutput(status_file, level, msg):
504 """Interprets socat log output. 505 506 """ 507 if level == SOCAT_LOG_NOTICE: 508 if status_file.GetListenPort() is None: 509 # TODO: Maybe implement timeout to not listen forever 510 m = LISTENING_RE.match(msg) 511 if m: 512 (_, port) = _VerifyListening(int(m.group("family")), 513 m.group("address"), 514 int(m.group("port"))) 515 516 status_file.SetListenPort(port) 517 return True 518 519 if not status_file.GetConnected(): 520 m = TRANSFER_LOOP_RE.match(msg) 521 if m: 522 logging.debug("Connection established") 523 status_file.SetConnected() 524 return True 525 526 return False
527
528 - def _ProcessDdOutput(self, line):
529 """Interprets a line of dd(1)'s output. 530 531 """ 532 m = DD_INFO_RE.match(line) 533 if m: 534 seconds = float(m.group("seconds")) 535 mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) 536 self._UpdateDdProgress(seconds, mbytes) 537 return (False, True) 538 539 m = DD_STDERR_IGNORE.match(line) 540 if m: 541 # Ignore 542 return (False, False) 543 544 # Forward line 545 return (True, False)
546
547 - def _UpdateDdProgress(self, seconds, mbytes):
548 """Updates the internal status variables for dd(1) progress. 549 550 @type seconds: float 551 @param seconds: Timestamp of this update 552 @type mbytes: float 553 @param mbytes: Total number of MiB transferred so far 554 555 """ 556 # Add latest sample 557 self._dd_progress.append((seconds, mbytes)) 558 559 # Remove old samples 560 del self._dd_progress[:-self._dd_tp_samples] 561 562 # Calculate throughput 563 throughput = _CalcThroughput(self._dd_progress) 564 565 # Calculate percent and ETA 566 percent = None 567 eta = None 568 569 if self._exp_size is not None: 570 if self._exp_size != 0: 571 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) 572 573 if throughput: 574 eta = max(0, float(self._exp_size - mbytes) / throughput) 575 576 self._status_file.SetProgress(mbytes, throughput, percent, eta)
577
578 579 -def _CalcThroughput(samples):
580 """Calculates the throughput in MiB/second. 581 582 @type samples: sequence 583 @param samples: List of samples, each consisting of a (timestamp, mbytes) 584 tuple 585 @rtype: float or None 586 @return: Throughput in MiB/second 587 588 """ 589 if len(samples) < 2: 590 # Can't calculate throughput 591 return None 592 593 (start_time, start_mbytes) = samples[0] 594 (end_time, end_mbytes) = samples[-1] 595 596 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
597