Package ganeti :: Package impexpd
[hide private]
[frames] | no frames]

Source Code for Package ganeti.impexpd

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2010 Google Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or 
  9  # (at your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
 19  # 02110-1301, USA. 
 20   
 21   
 22  """Classes and functions for import/export daemon. 
 23   
 24  """ 
 25   
 26  import os 
 27  import re 
 28  import socket 
 29  import logging 
 30  import signal 
 31  import errno 
 32  import time 
 33  from cStringIO import StringIO 
 34   
 35  from ganeti import constants 
 36  from ganeti import errors 
 37  from ganeti import utils 
 38   
 39   
 40  #: Used to recognize point at which socat(1) starts to listen on its socket. 
 41  #: The local address is required for the remote peer to connect (in particular 
 42  #: the port number). 
 43  LISTENING_RE = re.compile(r"^listening on\s+" 
 44                            r"AF=(?P<family>\d+)\s+" 
 45                            r"(?P<address>.+):(?P<port>\d+)$", re.I) 
 46   
 47  #: Used to recognize point at which socat(1) is sending data over the wire 
 48  TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$", 
 49                                re.I) 
 50   
 51  SOCAT_LOG_DEBUG = "D" 
 52  SOCAT_LOG_INFO = "I" 
 53  SOCAT_LOG_NOTICE = "N" 
 54  SOCAT_LOG_WARNING = "W" 
 55  SOCAT_LOG_ERROR = "E" 
 56  SOCAT_LOG_FATAL = "F" 
 57   
 58  SOCAT_LOG_IGNORE = frozenset([ 
 59    SOCAT_LOG_DEBUG, 
 60    SOCAT_LOG_INFO, 
 61    SOCAT_LOG_NOTICE, 
 62    ]) 
 63   
 64  #: Used to parse GNU dd(1) statistics 
 65  DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*" 
 66                          r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I) 
 67   
 68  #: Used to ignore "N+N records in/out" on dd(1)'s stderr 
 69  DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I) 
 70   
 71  #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is 
 72  #: unavailable and SIGUSR1 is used instead) 
 73  DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) 
 74   
 75  #: Buffer size: at most this many bytes are transferred at once 
 76  BUFSIZE = 1024 * 1024 
 77   
 78  # Common options for socat 
 79  SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] 
 80  SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", 
 81                        "cipher=%s" % constants.OPENSSL_CIPHERS] 
 82   
 83  SOCAT_OPTION_MAXLEN = 400 
 84   
 85  (PROG_OTHER, 
 86   PROG_SOCAT, 
 87   PROG_DD, 
 88   PROG_DD_PID, 
 89   PROG_EXP_SIZE) = range(1, 6) 
 90  PROG_ALL = frozenset([ 
 91    PROG_OTHER, 
 92    PROG_SOCAT, 
 93    PROG_DD, 
 94    PROG_DD_PID, 
 95    PROG_EXP_SIZE, 
 96    ]) 
97 98 99 -class CommandBuilder(object):
100 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
101 """Initializes this class. 102 103 @param mode: Daemon mode (import or export) 104 @param opts: Options object 105 @type socat_stderr_fd: int 106 @param socat_stderr_fd: File descriptor socat should write its stderr to 107 @type dd_stderr_fd: int 108 @param dd_stderr_fd: File descriptor dd should write its stderr to 109 @type dd_pid_fd: int 110 @param dd_pid_fd: File descriptor the child should write dd's PID to 111 112 """ 113 self._opts = opts 114 self._mode = mode 115 self._socat_stderr_fd = socat_stderr_fd 116 self._dd_stderr_fd = dd_stderr_fd 117 self._dd_pid_fd = dd_pid_fd 118 119 assert (self._opts.magic is None or 120 constants.IE_MAGIC_RE.match(self._opts.magic))
121 122 @staticmethod
123 - def GetBashCommand(cmd):
124 """Prepares a command to be run in Bash. 125 126 """ 127 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
128
129 - def _GetSocatCommand(self):
130 """Returns the socat command. 131 132 """ 133 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ 134 "key=%s" % self._opts.key, 135 "cert=%s" % self._opts.cert, 136 "cafile=%s" % self._opts.ca, 137 ] 138 139 if self._opts.bind is not None: 140 common_addr_opts.append("bind=%s" % self._opts.bind) 141 142 if self._mode == constants.IEM_IMPORT: 143 if self._opts.port is None: 144 port = 0 145 else: 146 port = self._opts.port 147 148 addr1 = [ 149 "OPENSSL-LISTEN:%s" % port, 150 "reuseaddr", 151 152 # Retry to listen if connection wasn't established successfully, up to 153 # 100 times a second. Note that this still leaves room for DoS attacks. 154 "forever", 155 "intervall=0.01", 156 ] + common_addr_opts 157 addr2 = ["stdout"] 158 159 elif self._mode == constants.IEM_EXPORT: 160 addr1 = ["stdin"] 161 addr2 = [ 162 "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), 163 164 # How long to wait per connection attempt 165 "connect-timeout=%s" % self._opts.connect_timeout, 166 167 # Retry a few times before giving up to connect (once per second) 168 "retry=%s" % self._opts.connect_retries, 169 "intervall=1", 170 ] + common_addr_opts 171 172 else: 173 raise errors.GenericError("Invalid mode '%s'" % self._mode) 174 175 for i in [addr1, addr2]: 176 for value in i: 177 if len(value) > SOCAT_OPTION_MAXLEN: 178 raise errors.GenericError("Socat option longer than %s" 179 " characters: %r" % 180 (SOCAT_OPTION_MAXLEN, value)) 181 if "," in value: 182 raise errors.GenericError("Comma not allowed in socat option" 183 " value: %r" % value) 184 185 return [ 186 constants.SOCAT_PATH, 187 188 # Log to stderr 189 "-ls", 190 191 # Log level 192 "-d", "-d", 193 194 # Buffer size 195 "-b%s" % BUFSIZE, 196 197 # Unidirectional mode, the first address is only used for reading, and the 198 # second address is only used for writing 199 "-u", 200 201 ",".join(addr1), ",".join(addr2) 202 ]
203
204 - def _GetMagicCommand(self):
205 """Returns the command to read/write the magic value. 206 207 """ 208 if not self._opts.magic: 209 return None 210 211 # Prefix to ensure magic isn't interpreted as option to "echo" 212 magic = "M=%s" % self._opts.magic 213 214 cmd = StringIO() 215 216 if self._mode == constants.IEM_IMPORT: 217 cmd.write("{ ") 218 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) 219 cmd.write(" && ") 220 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic)) 221 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) 222 cmd.write(" exit 1;") 223 cmd.write("fi;") 224 cmd.write(" }") 225 226 elif self._mode == constants.IEM_EXPORT: 227 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) 228 229 else: 230 raise errors.GenericError("Invalid mode '%s'" % self._mode) 231 232 return cmd.getvalue()
233
234 - def _GetDdCommand(self):
235 """Returns the command for measuring throughput. 236 237 """ 238 dd_cmd = StringIO() 239 240 magic_cmd = self._GetMagicCommand() 241 if magic_cmd: 242 dd_cmd.write("{ ") 243 dd_cmd.write(magic_cmd) 244 dd_cmd.write(" && ") 245 246 dd_cmd.write("{ ") 247 # Setting LC_ALL since we want to parse the output and explicitely 248 # redirecting stdin, as the background process (dd) would have /dev/null as 249 # stdin otherwise 250 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" % 251 (BUFSIZE, self._dd_stderr_fd)) 252 # Send PID to daemon 253 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) 254 # And wait for dd 255 dd_cmd.write(" wait $pid;") 256 dd_cmd.write(" }") 257 258 if magic_cmd: 259 dd_cmd.write(" }") 260 261 return dd_cmd.getvalue()
262
263 - def _GetTransportCommand(self):
264 """Returns the command for the transport part of the daemon. 265 266 """ 267 socat_cmd = ("%s 2>&%d" % 268 (utils.ShellQuoteArgs(self._GetSocatCommand()), 269 self._socat_stderr_fd)) 270 dd_cmd = self._GetDdCommand() 271 272 compr = self._opts.compress 273 274 assert compr in constants.IEC_ALL 275 276 parts = [] 277 278 if self._mode == constants.IEM_IMPORT: 279 parts.append(socat_cmd) 280 281 if compr == constants.IEC_GZIP: 282 parts.append("gunzip -c") 283 284 parts.append(dd_cmd) 285 286 elif self._mode == constants.IEM_EXPORT: 287 parts.append(dd_cmd) 288 289 if compr == constants.IEC_GZIP: 290 parts.append("gzip -c") 291 292 parts.append(socat_cmd) 293 294 else: 295 raise errors.GenericError("Invalid mode '%s'" % self._mode) 296 297 # TODO: Run transport as separate user 298 # The transport uses its own shell to simplify running it as a separate user 299 # in the future. 300 return self.GetBashCommand(" | ".join(parts))
301
302 - def GetCommand(self):
303 """Returns the complete child process command. 304 305 """ 306 transport_cmd = self._GetTransportCommand() 307 308 buf = StringIO() 309 310 if self._opts.cmd_prefix: 311 buf.write(self._opts.cmd_prefix) 312 buf.write(" ") 313 314 buf.write(utils.ShellQuoteArgs(transport_cmd)) 315 316 if self._opts.cmd_suffix: 317 buf.write(" ") 318 buf.write(self._opts.cmd_suffix) 319 320 return self.GetBashCommand(buf.getvalue())
321
322 323 -def _VerifyListening(family, address, port):
324 """Verify address given as listening address by socat. 325 326 """ 327 # TODO: Implement IPv6 support 328 if family != socket.AF_INET: 329 raise errors.GenericError("Address family %r not supported" % family) 330 331 try: 332 packed_address = socket.inet_pton(family, address) 333 except socket.error: 334 raise errors.GenericError("Invalid address %r for family %s" % 335 (address, family)) 336 337 return (socket.inet_ntop(family, packed_address), port)
338
339 340 -class ChildIOProcessor(object):
341 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
342 """Initializes this class. 343 344 """ 345 self._debug = debug 346 self._status_file = status_file 347 self._logger = logger 348 349 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) 350 for prog in PROG_ALL]) 351 352 self._dd_pid = None 353 self._dd_ready = False 354 self._dd_tp_samples = throughput_samples 355 self._dd_progress = [] 356 357 # Expected size of transferred data 358 self._exp_size = exp_size
359
360 - def GetLineSplitter(self, prog):
361 """Returns the line splitter for a program. 362 363 """ 364 return self._splitter[prog]
365
366 - def FlushAll(self):
367 """Flushes all line splitters. 368 369 """ 370 for ls in self._splitter.itervalues(): 371 ls.flush()
372
373 - def CloseAll(self):
374 """Closes all line splitters. 375 376 """ 377 for ls in self._splitter.itervalues(): 378 ls.close() 379 self._splitter.clear()
380
381 - def NotifyDd(self):
382 """Tells dd(1) to write statistics. 383 384 """ 385 if self._dd_pid is None: 386 # Can't notify 387 return False 388 389 if not self._dd_ready: 390 # There's a race condition between starting the program and sending 391 # signals. The signal handler is only registered after some time, so we 392 # have to check whether the program is ready. If it isn't, sending a 393 # signal will invoke the default handler (and usually abort the program). 394 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): 395 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL) 396 return False 397 398 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL) 399 self._dd_ready = True 400 401 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) 402 try: 403 os.kill(self._dd_pid, DD_INFO_SIGNAL) 404 except EnvironmentError, err: 405 if err.errno != errno.ESRCH: 406 raise 407 408 # Process no longer exists 409 logging.debug("dd exited") 410 self._dd_pid = None 411 412 return True
413
414 - def _ProcessOutput(self, line, prog):
415 """Takes care of child process output. 416 417 @type line: string 418 @param line: Child output line 419 @type prog: number 420 @param prog: Program from which the line originates 421 422 """ 423 force_update = False 424 forward_line = line 425 426 if prog == PROG_SOCAT: 427 level = None 428 parts = line.split(None, 4) 429 430 if len(parts) == 5: 431 (_, _, _, level, msg) = parts 432 433 force_update = self._ProcessSocatOutput(self._status_file, level, msg) 434 435 if self._debug or (level and level not in SOCAT_LOG_IGNORE): 436 forward_line = "socat: %s %s" % (level, msg) 437 else: 438 forward_line = None 439 else: 440 forward_line = "socat: %s" % line 441 442 elif prog == PROG_DD: 443 (should_forward, force_update) = self._ProcessDdOutput(line) 444 445 if should_forward or self._debug: 446 forward_line = "dd: %s" % line 447 else: 448 forward_line = None 449 450 elif prog == PROG_DD_PID: 451 if self._dd_pid: 452 raise RuntimeError("dd PID reported more than once") 453 logging.debug("Received dd PID %r", line) 454 self._dd_pid = int(line) 455 forward_line = None 456 457 elif prog == PROG_EXP_SIZE: 458 logging.debug("Received predicted size %r", line) 459 forward_line = None 460 461 if line: 462 try: 463 exp_size = utils.BytesToMebibyte(int(line)) 464 except (ValueError, TypeError), err: 465 logging.error("Failed to convert predicted size %r to number: %s", 466 line, err) 467 exp_size = None 468 else: 469 exp_size = None 470 471 self._exp_size = exp_size 472 473 if forward_line: 474 self._logger.info(forward_line) 475 self._status_file.AddRecentOutput(forward_line) 476 477 self._status_file.Update(force_update)
478 479 @staticmethod
480 - def _ProcessSocatOutput(status_file, level, msg):
481 """Interprets socat log output. 482 483 """ 484 if level == SOCAT_LOG_NOTICE: 485 if status_file.GetListenPort() is None: 486 # TODO: Maybe implement timeout to not listen forever 487 m = LISTENING_RE.match(msg) 488 if m: 489 (_, port) = _VerifyListening(int(m.group("family")), 490 m.group("address"), 491 int(m.group("port"))) 492 493 status_file.SetListenPort(port) 494 return True 495 496 if not status_file.GetConnected(): 497 m = TRANSFER_LOOP_RE.match(msg) 498 if m: 499 logging.debug("Connection established") 500 status_file.SetConnected() 501 return True 502 503 return False
504
505 - def _ProcessDdOutput(self, line):
506 """Interprets a line of dd(1)'s output. 507 508 """ 509 m = DD_INFO_RE.match(line) 510 if m: 511 seconds = float(m.group("seconds")) 512 mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) 513 self._UpdateDdProgress(seconds, mbytes) 514 return (False, True) 515 516 m = DD_STDERR_IGNORE.match(line) 517 if m: 518 # Ignore 519 return (False, False) 520 521 # Forward line 522 return (True, False)
523
524 - def _UpdateDdProgress(self, seconds, mbytes):
525 """Updates the internal status variables for dd(1) progress. 526 527 @type seconds: float 528 @param seconds: Timestamp of this update 529 @type mbytes: float 530 @param mbytes: Total number of MiB transferred so far 531 532 """ 533 # Add latest sample 534 self._dd_progress.append((seconds, mbytes)) 535 536 # Remove old samples 537 del self._dd_progress[:-self._dd_tp_samples] 538 539 # Calculate throughput 540 throughput = _CalcThroughput(self._dd_progress) 541 542 # Calculate percent and ETA 543 percent = None 544 eta = None 545 546 if self._exp_size is not None: 547 if self._exp_size != 0: 548 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) 549 550 if throughput: 551 eta = max(0, float(self._exp_size - mbytes) / throughput) 552 553 self._status_file.SetProgress(mbytes, throughput, percent, eta)
554
555 556 -def _CalcThroughput(samples):
557 """Calculates the throughput in MiB/second. 558 559 @type samples: sequence 560 @param samples: List of samples, each consisting of a (timestamp, mbytes) 561 tuple 562 @rtype: float or None 563 @return: Throughput in MiB/second 564 565 """ 566 if len(samples) < 2: 567 # Can't calculate throughput 568 return None 569 570 (start_time, start_mbytes) = samples[0] 571 (end_time, end_mbytes) = samples[-1] 572 573 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
574