1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 import os
27 import re
28 import socket
29 import logging
30 import signal
31 import errno
32 import time
33 from cStringIO import StringIO
34
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38
39
40
41
42
43 LISTENING_RE = re.compile(r"^listening on\s+"
44 r"AF=(?P<family>\d+)\s+"
45 r"(?P<address>.+):(?P<port>\d+)$", re.I)
46
47
48 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49 re.I)
50
51 SOCAT_LOG_DEBUG = "D"
52 SOCAT_LOG_INFO = "I"
53 SOCAT_LOG_NOTICE = "N"
54 SOCAT_LOG_WARNING = "W"
55 SOCAT_LOG_ERROR = "E"
56 SOCAT_LOG_FATAL = "F"
57
58 SOCAT_LOG_IGNORE = frozenset([
59 SOCAT_LOG_DEBUG,
60 SOCAT_LOG_INFO,
61 SOCAT_LOG_NOTICE,
62 ])
63
64
65 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67
68
69 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70
71
72
73 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74
75
76 BUFSIZE = 1024 * 1024
77
78
79 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81 "cipher=%s" % constants.OPENSSL_CIPHERS]
82
83 if constants.SOCAT_USE_COMPRESS:
84
85
86 SOCAT_OPENSSL_OPTS.append("compress=none")
87
88 SOCAT_OPTION_MAXLEN = 400
89
90 (PROG_OTHER,
91 PROG_SOCAT,
92 PROG_DD,
93 PROG_DD_PID,
94 PROG_EXP_SIZE) = range(1, 6)
95 PROG_ALL = frozenset([
96 PROG_OTHER,
97 PROG_SOCAT,
98 PROG_DD,
99 PROG_DD_PID,
100 PROG_EXP_SIZE,
101 ])
105 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
106 """Initializes this class.
107
108 @param mode: Daemon mode (import or export)
109 @param opts: Options object
110 @type socat_stderr_fd: int
111 @param socat_stderr_fd: File descriptor socat should write its stderr to
112 @type dd_stderr_fd: int
113 @param dd_stderr_fd: File descriptor dd should write its stderr to
114 @type dd_pid_fd: int
115 @param dd_pid_fd: File descriptor the child should write dd's PID to
116
117 """
118 self._opts = opts
119 self._mode = mode
120 self._socat_stderr_fd = socat_stderr_fd
121 self._dd_stderr_fd = dd_stderr_fd
122 self._dd_pid_fd = dd_pid_fd
123
124 assert (self._opts.magic is None or
125 constants.IE_MAGIC_RE.match(self._opts.magic))
126
127 @staticmethod
129 """Prepares a command to be run in Bash.
130
131 """
132 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
133
135 """Returns the socat command.
136
137 """
138 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
139 "key=%s" % self._opts.key,
140 "cert=%s" % self._opts.cert,
141 "cafile=%s" % self._opts.ca,
142 ]
143
144 if self._opts.bind is not None:
145 common_addr_opts.append("bind=%s" % self._opts.bind)
146
147 if self._mode == constants.IEM_IMPORT:
148 if self._opts.port is None:
149 port = 0
150 else:
151 port = self._opts.port
152
153 addr1 = [
154 "OPENSSL-LISTEN:%s" % port,
155 "reuseaddr",
156
157
158
159 "forever",
160 "intervall=0.01",
161 ] + common_addr_opts
162 addr2 = ["stdout"]
163
164 elif self._mode == constants.IEM_EXPORT:
165 addr1 = ["stdin"]
166 addr2 = [
167 "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
168
169
170 "connect-timeout=%s" % self._opts.connect_timeout,
171
172
173 "retry=%s" % self._opts.connect_retries,
174 "intervall=1",
175 ] + common_addr_opts
176
177 else:
178 raise errors.GenericError("Invalid mode '%s'" % self._mode)
179
180 for i in [addr1, addr2]:
181 for value in i:
182 if len(value) > SOCAT_OPTION_MAXLEN:
183 raise errors.GenericError("Socat option longer than %s"
184 " characters: %r" %
185 (SOCAT_OPTION_MAXLEN, value))
186 if "," in value:
187 raise errors.GenericError("Comma not allowed in socat option"
188 " value: %r" % value)
189
190 return [
191 constants.SOCAT_PATH,
192
193
194 "-ls",
195
196
197 "-d", "-d",
198
199
200 "-b%s" % BUFSIZE,
201
202
203
204 "-u",
205
206 ",".join(addr1), ",".join(addr2)
207 ]
208
238
240 """Returns the command for measuring throughput.
241
242 """
243 dd_cmd = StringIO()
244
245 magic_cmd = self._GetMagicCommand()
246 if magic_cmd:
247 dd_cmd.write("{ ")
248 dd_cmd.write(magic_cmd)
249 dd_cmd.write(" && ")
250
251 dd_cmd.write("{ ")
252
253
254
255 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
256 (BUFSIZE, self._dd_stderr_fd))
257
258 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
259
260 dd_cmd.write(" wait $pid;")
261 dd_cmd.write(" }")
262
263 if magic_cmd:
264 dd_cmd.write(" }")
265
266 return dd_cmd.getvalue()
267
306
308 """Returns the complete child process command.
309
310 """
311 transport_cmd = self._GetTransportCommand()
312
313 buf = StringIO()
314
315 if self._opts.cmd_prefix:
316 buf.write(self._opts.cmd_prefix)
317 buf.write(" ")
318
319 buf.write(utils.ShellQuoteArgs(transport_cmd))
320
321 if self._opts.cmd_suffix:
322 buf.write(" ")
323 buf.write(self._opts.cmd_suffix)
324
325 return self.GetBashCommand(buf.getvalue())
326
329 """Verify address given as listening address by socat.
330
331 """
332
333 if family != socket.AF_INET:
334 raise errors.GenericError("Address family %r not supported" % family)
335
336 try:
337 packed_address = socket.inet_pton(family, address)
338 except socket.error:
339 raise errors.GenericError("Invalid address %r for family %s" %
340 (address, family))
341
342 return (socket.inet_ntop(family, packed_address), port)
343
346 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
347 """Initializes this class.
348
349 """
350 self._debug = debug
351 self._status_file = status_file
352 self._logger = logger
353
354 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
355 for prog in PROG_ALL])
356
357 self._dd_pid = None
358 self._dd_ready = False
359 self._dd_tp_samples = throughput_samples
360 self._dd_progress = []
361
362
363 self._exp_size = exp_size
364
366 """Returns the line splitter for a program.
367
368 """
369 return self._splitter[prog]
370
372 """Flushes all line splitters.
373
374 """
375 for ls in self._splitter.itervalues():
376 ls.flush()
377
379 """Closes all line splitters.
380
381 """
382 for ls in self._splitter.itervalues():
383 ls.close()
384 self._splitter.clear()
385
387 """Tells dd(1) to write statistics.
388
389 """
390 if self._dd_pid is None:
391
392 return False
393
394 if not self._dd_ready:
395
396
397
398
399 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
400 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
401 return False
402
403 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
404 self._dd_ready = True
405
406 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
407 try:
408 os.kill(self._dd_pid, DD_INFO_SIGNAL)
409 except EnvironmentError, err:
410 if err.errno != errno.ESRCH:
411 raise
412
413
414 logging.debug("dd exited")
415 self._dd_pid = None
416
417 return True
418
420 """Takes care of child process output.
421
422 @type line: string
423 @param line: Child output line
424 @type prog: number
425 @param prog: Program from which the line originates
426
427 """
428 force_update = False
429 forward_line = line
430
431 if prog == PROG_SOCAT:
432 level = None
433 parts = line.split(None, 4)
434
435 if len(parts) == 5:
436 (_, _, _, level, msg) = parts
437
438 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
439
440 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
441 forward_line = "socat: %s %s" % (level, msg)
442 else:
443 forward_line = None
444 else:
445 forward_line = "socat: %s" % line
446
447 elif prog == PROG_DD:
448 (should_forward, force_update) = self._ProcessDdOutput(line)
449
450 if should_forward or self._debug:
451 forward_line = "dd: %s" % line
452 else:
453 forward_line = None
454
455 elif prog == PROG_DD_PID:
456 if self._dd_pid:
457 raise RuntimeError("dd PID reported more than once")
458 logging.debug("Received dd PID %r", line)
459 self._dd_pid = int(line)
460 forward_line = None
461
462 elif prog == PROG_EXP_SIZE:
463 logging.debug("Received predicted size %r", line)
464 forward_line = None
465
466 if line:
467 try:
468 exp_size = utils.BytesToMebibyte(int(line))
469 except (ValueError, TypeError), err:
470 logging.error("Failed to convert predicted size %r to number: %s",
471 line, err)
472 exp_size = None
473 else:
474 exp_size = None
475
476 self._exp_size = exp_size
477
478 if forward_line:
479 self._logger.info(forward_line)
480 self._status_file.AddRecentOutput(forward_line)
481
482 self._status_file.Update(force_update)
483
484 @staticmethod
509
511 """Interprets a line of dd(1)'s output.
512
513 """
514 m = DD_INFO_RE.match(line)
515 if m:
516 seconds = float(m.group("seconds"))
517 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
518 self._UpdateDdProgress(seconds, mbytes)
519 return (False, True)
520
521 m = DD_STDERR_IGNORE.match(line)
522 if m:
523
524 return (False, False)
525
526
527 return (True, False)
528
530 """Updates the internal status variables for dd(1) progress.
531
532 @type seconds: float
533 @param seconds: Timestamp of this update
534 @type mbytes: float
535 @param mbytes: Total number of MiB transferred so far
536
537 """
538
539 self._dd_progress.append((seconds, mbytes))
540
541
542 del self._dd_progress[:-self._dd_tp_samples]
543
544
545 throughput = _CalcThroughput(self._dd_progress)
546
547
548 percent = None
549 eta = None
550
551 if self._exp_size is not None:
552 if self._exp_size != 0:
553 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
554
555 if throughput:
556 eta = max(0, float(self._exp_size - mbytes) / throughput)
557
558 self._status_file.SetProgress(mbytes, throughput, percent, eta)
559
562 """Calculates the throughput in MiB/second.
563
564 @type samples: sequence
565 @param samples: List of samples, each consisting of a (timestamp, mbytes)
566 tuple
567 @rtype: float or None
568 @return: Throughput in MiB/second
569
570 """
571 if len(samples) < 2:
572
573 return None
574
575 (start_time, start_mbytes) = samples[0]
576 (end_time, end_mbytes) = samples[-1]
577
578 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
579