Package ganeti :: Package impexpd
[hide private]
[frames] | no frames]

Source Code for Package ganeti.impexpd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Classes and functions for import/export daemon. 
 23   
 24  """ 
 25   
 26  import os 
 27  import re 
 28  import socket 
 29  import logging 
 30  import signal 
 31  import errno 
 32  import time 
 33  from cStringIO import StringIO 
 34   
 35  from ganeti import constants 
 36  from ganeti import errors 
 37  from ganeti import utils 
 38   
 39   
 40  #: Used to recognize point at which socat(1) starts to listen on its socket. 
 41  #: The local address is required for the remote peer to connect (in particular 
 42  #: the port number). 
 43  LISTENING_RE = re.compile(r"^listening on\s+" 
 44                            r"AF=(?P<family>\d+)\s+" 
 45                            r"(?P<address>.+):(?P<port>\d+)$", re.I) 
 46   
 47  #: Used to recognize point at which socat(1) is sending data over the wire 
 48  TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", 
 49                                re.I) 
 50   
 51  SOCAT_LOG_DEBUG = "D" 
 52  SOCAT_LOG_INFO = "I" 
 53  SOCAT_LOG_NOTICE = "N" 
 54  SOCAT_LOG_WARNING = "W" 
 55  SOCAT_LOG_ERROR = "E" 
 56  SOCAT_LOG_FATAL = "F" 
 57   
 58  SOCAT_LOG_IGNORE = frozenset([ 
 59    SOCAT_LOG_DEBUG, 
 60    SOCAT_LOG_INFO, 
 61    SOCAT_LOG_NOTICE, 
 62    ]) 
 63   
 64  #: Used to parse GNU dd(1) statistics 
 65  DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" 
 66                          r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) 
 67   
 68  #: Used to ignore "N+N records in/out" on dd(1)'s stderr 
 69  DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) 
 70   
 71  #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is 
 72  #: unavailable and SIGUSR1 is used instead) 
 73  DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) 
 74   
 75  #: Buffer size: at most this many bytes are transferred at once 
 76  BUFSIZE = 1024 * 1024 
 77   
 78  # Common options for socat 
 79  SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] 
 80  SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", 
 81                        "cipher=%s" % constants.OPENSSL_CIPHERS] 
 82   
 83  if constants.SOCAT_USE_COMPRESS: 
 84    # Disables all compression in by OpenSSL. Only supported in patched versions 
 85    # of socat (as of November 2010). See INSTALL for more information. 
 86    SOCAT_OPENSSL_OPTS.append("compress=none") 
 87   
 88  SOCAT_OPTION_MAXLEN = 400 
 89   
 90  (PROG_OTHER, 
 91   PROG_SOCAT, 
 92   PROG_DD, 
 93   PROG_DD_PID, 
 94   PROG_EXP_SIZE) = range(1, 6) 
 95  PROG_ALL = frozenset([ 
 96    PROG_OTHER, 
 97    PROG_SOCAT, 
 98    PROG_DD, 
 99    PROG_DD_PID, 
100    PROG_EXP_SIZE, 
101    ]) 
102 103 104 -class CommandBuilder(object):
105 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
106 """Initializes this class. 107 108 @param mode: Daemon mode (import or export) 109 @param opts: Options object 110 @type socat_stderr_fd: int 111 @param socat_stderr_fd: File descriptor socat should write its stderr to 112 @type dd_stderr_fd: int 113 @param dd_stderr_fd: File descriptor dd should write its stderr to 114 @type dd_pid_fd: int 115 @param dd_pid_fd: File descriptor the child should write dd's PID to 116 117 """ 118 self._opts = opts 119 self._mode = mode 120 self._socat_stderr_fd = socat_stderr_fd 121 self._dd_stderr_fd = dd_stderr_fd 122 self._dd_pid_fd = dd_pid_fd 123 124 assert (self._opts.magic is None or 125 constants.IE_MAGIC_RE.match(self._opts.magic))
126 127 @staticmethod
128 - def GetBashCommand(cmd):
129 """Prepares a command to be run in Bash. 130 131 """ 132 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
133
134 - def _GetSocatCommand(self):
135 """Returns the socat command. 136 137 """ 138 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ 139 "key=%s" % self._opts.key, 140 "cert=%s" % self._opts.cert, 141 "cafile=%s" % self._opts.ca, 142 ] 143 144 if self._opts.bind is not None: 145 common_addr_opts.append("bind=%s" % self._opts.bind) 146 147 if self._mode == constants.IEM_IMPORT: 148 if self._opts.port is None: 149 port = 0 150 else: 151 port = self._opts.port 152 153 addr1 = [ 154 "OPENSSL-LISTEN:%s" % port, 155 "reuseaddr", 156 157 # Retry to listen if connection wasn't established successfully, up to 158 # 100 times a second. Note that this still leaves room for DoS attacks. 159 "forever", 160 "intervall=0.01", 161 ] + common_addr_opts 162 addr2 = ["stdout"] 163 164 elif self._mode == constants.IEM_EXPORT: 165 addr1 = ["stdin"] 166 addr2 = [ 167 "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), 168 169 # How long to wait per connection attempt 170 "connect-timeout=%s" % self._opts.connect_timeout, 171 172 # Retry a few times before giving up to connect (once per second) 173 "retry=%s" % self._opts.connect_retries, 174 "intervall=1", 175 ] + common_addr_opts 176 177 else: 178 raise errors.GenericError("Invalid mode '%s'" % self._mode) 179 180 for i in [addr1, addr2]: 181 for value in i: 182 if len(value) > SOCAT_OPTION_MAXLEN: 183 raise errors.GenericError("Socat option longer than %s" 184 " characters: %r" % 185 (SOCAT_OPTION_MAXLEN, value)) 186 if "," in value: 187 raise errors.GenericError("Comma not allowed in socat option" 188 " value: %r" % value) 189 190 return [ 191 constants.SOCAT_PATH, 192 193 # Log to stderr 194 "-ls", 195 196 # Log level 197 "-d", "-d", 198 199 # Buffer size 200 "-b%s" % BUFSIZE, 201 202 # Unidirectional mode, the first address is only used for reading, and the 203 # second address is only used for writing 204 "-u", 205 206 ",".join(addr1), ",".join(addr2) 207 ]
208
209 - def _GetMagicCommand(self):
210 """Returns the command to read/write the magic value. 211 212 """ 213 if not self._opts.magic: 214 return None 215 216 # Prefix to ensure magic isn't interpreted as option to "echo" 217 magic = "M=%s" % self._opts.magic 218 219 cmd = StringIO() 220 221 if self._mode == constants.IEM_IMPORT: 222 cmd.write("{ ") 223 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) 224 cmd.write(" && ") 225 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic)) 226 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) 227 cmd.write(" exit 1;") 228 cmd.write("fi;") 229 cmd.write(" }") 230 231 elif self._mode == constants.IEM_EXPORT: 232 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) 233 234 else: 235 raise errors.GenericError("Invalid mode '%s'" % self._mode) 236 237 return cmd.getvalue()
238
239 - def _GetDdCommand(self):
240 """Returns the command for measuring throughput. 241 242 """ 243 dd_cmd = StringIO() 244 245 magic_cmd = self._GetMagicCommand() 246 if magic_cmd: 247 dd_cmd.write("{ ") 248 dd_cmd.write(magic_cmd) 249 dd_cmd.write(" && ") 250 251 dd_cmd.write("{ ") 252 # Setting LC_ALL since we want to parse the output and explicitely 253 # redirecting stdin, as the background process (dd) would have /dev/null as 254 # stdin otherwise 255 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % 256 (BUFSIZE, self._dd_stderr_fd)) 257 # Send PID to daemon 258 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) 259 # And wait for dd 260 dd_cmd.write(" wait $pid;") 261 dd_cmd.write(" }") 262 263 if magic_cmd: 264 dd_cmd.write(" }") 265 266 return dd_cmd.getvalue()
267
268 - def _GetTransportCommand(self):
269 """Returns the command for the transport part of the daemon. 270 271 """ 272 socat_cmd = ("%s 2>&%d" % 273 (utils.ShellQuoteArgs(self._GetSocatCommand()), 274 self._socat_stderr_fd)) 275 dd_cmd = self._GetDdCommand() 276 277 compr = self._opts.compress 278 279 assert compr in constants.IEC_ALL 280 281 parts = [] 282 283 if self._mode == constants.IEM_IMPORT: 284 parts.append(socat_cmd) 285 286 if compr == constants.IEC_GZIP: 287 parts.append("gunzip -c") 288 289 parts.append(dd_cmd) 290 291 elif self._mode == constants.IEM_EXPORT: 292 parts.append(dd_cmd) 293 294 if compr == constants.IEC_GZIP: 295 parts.append("gzip -c") 296 297 parts.append(socat_cmd) 298 299 else: 300 raise errors.GenericError("Invalid mode '%s'" % self._mode) 301 302 # TODO: Run transport as separate user 303 # The transport uses its own shell to simplify running it as a separate user 304 # in the future. 305 return self.GetBashCommand(" | ".join(parts))
306
307 - def GetCommand(self):
308 """Returns the complete child process command. 309 310 """ 311 transport_cmd = self._GetTransportCommand() 312 313 buf = StringIO() 314 315 if self._opts.cmd_prefix: 316 buf.write(self._opts.cmd_prefix) 317 buf.write(" ") 318 319 buf.write(utils.ShellQuoteArgs(transport_cmd)) 320 321 if self._opts.cmd_suffix: 322 buf.write(" ") 323 buf.write(self._opts.cmd_suffix) 324 325 return self.GetBashCommand(buf.getvalue())
326
327 328 -def _VerifyListening(family, address, port):
329 """Verify address given as listening address by socat. 330 331 """ 332 # TODO: Implement IPv6 support 333 if family != socket.AF_INET: 334 raise errors.GenericError("Address family %r not supported" % family) 335 336 try: 337 packed_address = socket.inet_pton(family, address) 338 except socket.error: 339 raise errors.GenericError("Invalid address %r for family %s" % 340 (address, family)) 341 342 return (socket.inet_ntop(family, packed_address), port)
343
344 345 -class ChildIOProcessor(object):
346 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
347 """Initializes this class. 348 349 """ 350 self._debug = debug 351 self._status_file = status_file 352 self._logger = logger 353 354 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) 355 for prog in PROG_ALL]) 356 357 self._dd_pid = None 358 self._dd_ready = False 359 self._dd_tp_samples = throughput_samples 360 self._dd_progress = [] 361 362 # Expected size of transferred data 363 self._exp_size = exp_size
364
365 - def GetLineSplitter(self, prog):
366 """Returns the line splitter for a program. 367 368 """ 369 return self._splitter[prog]
370
371 - def FlushAll(self):
372 """Flushes all line splitters. 373 374 """ 375 for ls in self._splitter.itervalues(): 376 ls.flush()
377
378 - def CloseAll(self):
379 """Closes all line splitters. 380 381 """ 382 for ls in self._splitter.itervalues(): 383 ls.close() 384 self._splitter.clear()
385
386 - def NotifyDd(self):
387 """Tells dd(1) to write statistics. 388 389 """ 390 if self._dd_pid is None: 391 # Can't notify 392 return False 393 394 if not self._dd_ready: 395 # There's a race condition between starting the program and sending 396 # signals. The signal handler is only registered after some time, so we 397 # have to check whether the program is ready. If it isn't, sending a 398 # signal will invoke the default handler (and usually abort the program). 399 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): 400 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) 401 return False 402 403 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) 404 self._dd_ready = True 405 406 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) 407 try: 408 os.kill(self._dd_pid, DD_INFO_SIGNAL) 409 except EnvironmentError, err: 410 if err.errno != errno.ESRCH: 411 raise 412 413 # Process no longer exists 414 logging.debug("dd exited") 415 self._dd_pid = None 416 417 return True
418
419 - def _ProcessOutput(self, line, prog):
420 """Takes care of child process output. 421 422 @type line: string 423 @param line: Child output line 424 @type prog: number 425 @param prog: Program from which the line originates 426 427 """ 428 force_update = False 429 forward_line = line 430 431 if prog == PROG_SOCAT: 432 level = None 433 parts = line.split(None, 4) 434 435 if len(parts) == 5: 436 (_, _, _, level, msg) = parts 437 438 force_update = self._ProcessSocatOutput(self._status_file, level, msg) 439 440 if self._debug or (level and level not in SOCAT_LOG_IGNORE): 441 forward_line = "socat: %s %s" % (level, msg) 442 else: 443 forward_line = None 444 else: 445 forward_line = "socat: %s" % line 446 447 elif prog == PROG_DD: 448 (should_forward, force_update) = self._ProcessDdOutput(line) 449 450 if should_forward or self._debug: 451 forward_line = "dd: %s" % line 452 else: 453 forward_line = None 454 455 elif prog == PROG_DD_PID: 456 if self._dd_pid: 457 raise RuntimeError("dd PID reported more than once") 458 logging.debug("Received dd PID %r", line) 459 self._dd_pid = int(line) 460 forward_line = None 461 462 elif prog == PROG_EXP_SIZE: 463 logging.debug("Received predicted size %r", line) 464 forward_line = None 465 466 if line: 467 try: 468 exp_size = utils.BytesToMebibyte(int(line)) 469 except (ValueError, TypeError), err: 470 logging.error("Failed to convert predicted size %r to number: %s", 471 line, err) 472 exp_size = None 473 else: 474 exp_size = None 475 476 self._exp_size = exp_size 477 478 if forward_line: 479 self._logger.info(forward_line) 480 self._status_file.AddRecentOutput(forward_line) 481 482 self._status_file.Update(force_update)
483 484 @staticmethod
485 - def _ProcessSocatOutput(status_file, level, msg):
486 """Interprets socat log output. 487 488 """ 489 if level == SOCAT_LOG_NOTICE: 490 if status_file.GetListenPort() is None: 491 # TODO: Maybe implement timeout to not listen forever 492 m = LISTENING_RE.match(msg) 493 if m: 494 (_, port) = _VerifyListening(int(m.group("family")), 495 m.group("address"), 496 int(m.group("port"))) 497 498 status_file.SetListenPort(port) 499 return True 500 501 if not status_file.GetConnected(): 502 m = TRANSFER_LOOP_RE.match(msg) 503 if m: 504 logging.debug("Connection established") 505 status_file.SetConnected() 506 return True 507 508 return False
509
510 - def _ProcessDdOutput(self, line):
511 """Interprets a line of dd(1)'s output. 512 513 """ 514 m = DD_INFO_RE.match(line) 515 if m: 516 seconds = float(m.group("seconds")) 517 mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) 518 self._UpdateDdProgress(seconds, mbytes) 519 return (False, True) 520 521 m = DD_STDERR_IGNORE.match(line) 522 if m: 523 # Ignore 524 return (False, False) 525 526 # Forward line 527 return (True, False)
528
529 - def _UpdateDdProgress(self, seconds, mbytes):
530 """Updates the internal status variables for dd(1) progress. 531 532 @type seconds: float 533 @param seconds: Timestamp of this update 534 @type mbytes: float 535 @param mbytes: Total number of MiB transferred so far 536 537 """ 538 # Add latest sample 539 self._dd_progress.append((seconds, mbytes)) 540 541 # Remove old samples 542 del self._dd_progress[:-self._dd_tp_samples] 543 544 # Calculate throughput 545 throughput = _CalcThroughput(self._dd_progress) 546 547 # Calculate percent and ETA 548 percent = None 549 eta = None 550 551 if self._exp_size is not None: 552 if self._exp_size != 0: 553 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) 554 555 if throughput: 556 eta = max(0, float(self._exp_size - mbytes) / throughput) 557 558 self._status_file.SetProgress(mbytes, throughput, percent, eta)
559
560 561 -def _CalcThroughput(samples):
562 """Calculates the throughput in MiB/second. 563 564 @type samples: sequence 565 @param samples: List of samples, each consisting of a (timestamp, mbytes) 566 tuple 567 @rtype: float or None 568 @return: Throughput in MiB/second 569 570 """ 571 if len(samples) < 2: 572 # Can't calculate throughput 573 return None 574 575 (start_time, start_mbytes) = samples[0] 576 (end_time, end_mbytes) = samples[-1] 577 578 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
579