1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 import os
27 import re
28 import socket
29 import logging
30 import signal
31 import errno
32 import time
33 from cStringIO import StringIO
34
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import netutils
39
40
41
42
43
44 LISTENING_RE = re.compile(r"^listening on\s+"
45 r"AF=(?P<family>\d+)\s+"
46 r"(?P<address>.+):(?P<port>\d+)$", re.I)
47
48
49 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
50 re.I)
51
52 SOCAT_LOG_DEBUG = "D"
53 SOCAT_LOG_INFO = "I"
54 SOCAT_LOG_NOTICE = "N"
55 SOCAT_LOG_WARNING = "W"
56 SOCAT_LOG_ERROR = "E"
57 SOCAT_LOG_FATAL = "F"
58
59 SOCAT_LOG_IGNORE = frozenset([
60 SOCAT_LOG_DEBUG,
61 SOCAT_LOG_INFO,
62 SOCAT_LOG_NOTICE,
63 ])
64
65
66 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
67 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
68
69
70 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
71
72
73
74 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
75
76
77 BUFSIZE = 1024 * 1024
78
79
80 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
81 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
82 "cipher=%s" % constants.OPENSSL_CIPHERS]
83
84 if constants.SOCAT_USE_COMPRESS:
85
86
87 SOCAT_OPENSSL_OPTS.append("compress=none")
88
89 SOCAT_OPTION_MAXLEN = 400
90
91 (PROG_OTHER,
92 PROG_SOCAT,
93 PROG_DD,
94 PROG_DD_PID,
95 PROG_EXP_SIZE) = range(1, 6)
96 PROG_ALL = frozenset([
97 PROG_OTHER,
98 PROG_SOCAT,
99 PROG_DD,
100 PROG_DD_PID,
101 PROG_EXP_SIZE,
102 ])
106 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
107 """Initializes this class.
108
109 @param mode: Daemon mode (import or export)
110 @param opts: Options object
111 @type socat_stderr_fd: int
112 @param socat_stderr_fd: File descriptor socat should write its stderr to
113 @type dd_stderr_fd: int
114 @param dd_stderr_fd: File descriptor dd should write its stderr to
115 @type dd_pid_fd: int
116 @param dd_pid_fd: File descriptor the child should write dd's PID to
117
118 """
119 self._opts = opts
120 self._mode = mode
121 self._socat_stderr_fd = socat_stderr_fd
122 self._dd_stderr_fd = dd_stderr_fd
123 self._dd_pid_fd = dd_pid_fd
124
125 assert (self._opts.magic is None or
126 constants.IE_MAGIC_RE.match(self._opts.magic))
127
128 @staticmethod
130 """Prepares a command to be run in Bash.
131
132 """
133 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
134
136 """Returns the socat command.
137
138 """
139 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
140 "key=%s" % self._opts.key,
141 "cert=%s" % self._opts.cert,
142 "cafile=%s" % self._opts.ca,
143 ]
144
145 if self._opts.bind is not None:
146 common_addr_opts.append("bind=%s" % self._opts.bind)
147
148 assert not (self._opts.ipv4 and self._opts.ipv6)
149
150 if self._opts.ipv4:
151 common_addr_opts.append("pf=ipv4")
152 elif self._opts.ipv6:
153 common_addr_opts.append("pf=ipv6")
154
155 if self._mode == constants.IEM_IMPORT:
156 if self._opts.port is None:
157 port = 0
158 else:
159 port = self._opts.port
160
161 addr1 = [
162 "OPENSSL-LISTEN:%s" % port,
163 "reuseaddr",
164
165
166
167 "forever",
168 "intervall=0.01",
169 ] + common_addr_opts
170 addr2 = ["stdout"]
171
172 elif self._mode == constants.IEM_EXPORT:
173 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
174 host = "[%s]" % self._opts.host
175 else:
176 host = self._opts.host
177
178 addr1 = ["stdin"]
179 addr2 = [
180 "OPENSSL:%s:%s" % (host, self._opts.port),
181
182
183 "connect-timeout=%s" % self._opts.connect_timeout,
184
185
186 "retry=%s" % self._opts.connect_retries,
187 "intervall=1",
188 ] + common_addr_opts
189
190 else:
191 raise errors.GenericError("Invalid mode '%s'" % self._mode)
192
193 for i in [addr1, addr2]:
194 for value in i:
195 if len(value) > SOCAT_OPTION_MAXLEN:
196 raise errors.GenericError("Socat option longer than %s"
197 " characters: %r" %
198 (SOCAT_OPTION_MAXLEN, value))
199 if "," in value:
200 raise errors.GenericError("Comma not allowed in socat option"
201 " value: %r" % value)
202
203 return [
204 constants.SOCAT_PATH,
205
206
207 "-ls",
208
209
210 "-d", "-d",
211
212
213 "-b%s" % BUFSIZE,
214
215
216
217 "-u",
218
219 ",".join(addr1), ",".join(addr2)
220 ]
221
251
253 """Returns the command for measuring throughput.
254
255 """
256 dd_cmd = StringIO()
257
258 magic_cmd = self._GetMagicCommand()
259 if magic_cmd:
260 dd_cmd.write("{ ")
261 dd_cmd.write(magic_cmd)
262 dd_cmd.write(" && ")
263
264 dd_cmd.write("{ ")
265
266
267
268 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
269 (BUFSIZE, self._dd_stderr_fd))
270
271 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
272
273 dd_cmd.write(" wait $pid;")
274 dd_cmd.write(" }")
275
276 if magic_cmd:
277 dd_cmd.write(" }")
278
279 return dd_cmd.getvalue()
280
319
321 """Returns the complete child process command.
322
323 """
324 transport_cmd = self._GetTransportCommand()
325
326 buf = StringIO()
327
328 if self._opts.cmd_prefix:
329 buf.write(self._opts.cmd_prefix)
330 buf.write(" ")
331
332 buf.write(utils.ShellQuoteArgs(transport_cmd))
333
334 if self._opts.cmd_suffix:
335 buf.write(" ")
336 buf.write(self._opts.cmd_suffix)
337
338 return self.GetBashCommand(buf.getvalue())
339
342 """Verify address given as listening address by socat.
343
344 """
345 if family not in (socket.AF_INET, socket.AF_INET6):
346 raise errors.GenericError("Address family %r not supported" % family)
347
348 if (family == socket.AF_INET6 and address.startswith("[") and
349 address.endswith("]")):
350 address = address.lstrip("[").rstrip("]")
351
352 try:
353 packed_address = socket.inet_pton(family, address)
354 except socket.error:
355 raise errors.GenericError("Invalid address %r for family %s" %
356 (address, family))
357
358 return (socket.inet_ntop(family, packed_address), port)
359
362 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
363 """Initializes this class.
364
365 """
366 self._debug = debug
367 self._status_file = status_file
368 self._logger = logger
369
370 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
371 for prog in PROG_ALL])
372
373 self._dd_pid = None
374 self._dd_ready = False
375 self._dd_tp_samples = throughput_samples
376 self._dd_progress = []
377
378
379 self._exp_size = exp_size
380
382 """Returns the line splitter for a program.
383
384 """
385 return self._splitter[prog]
386
388 """Flushes all line splitters.
389
390 """
391 for ls in self._splitter.itervalues():
392 ls.flush()
393
395 """Closes all line splitters.
396
397 """
398 for ls in self._splitter.itervalues():
399 ls.close()
400 self._splitter.clear()
401
403 """Tells dd(1) to write statistics.
404
405 """
406 if self._dd_pid is None:
407
408 return False
409
410 if not self._dd_ready:
411
412
413
414
415 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
416 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
417 return False
418
419 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
420 self._dd_ready = True
421
422 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
423 try:
424 os.kill(self._dd_pid, DD_INFO_SIGNAL)
425 except EnvironmentError, err:
426 if err.errno != errno.ESRCH:
427 raise
428
429
430 logging.debug("dd exited")
431 self._dd_pid = None
432
433 return True
434
436 """Takes care of child process output.
437
438 @type line: string
439 @param line: Child output line
440 @type prog: number
441 @param prog: Program from which the line originates
442
443 """
444 force_update = False
445 forward_line = line
446
447 if prog == PROG_SOCAT:
448 level = None
449 parts = line.split(None, 4)
450
451 if len(parts) == 5:
452 (_, _, _, level, msg) = parts
453
454 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
455
456 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
457 forward_line = "socat: %s %s" % (level, msg)
458 else:
459 forward_line = None
460 else:
461 forward_line = "socat: %s" % line
462
463 elif prog == PROG_DD:
464 (should_forward, force_update) = self._ProcessDdOutput(line)
465
466 if should_forward or self._debug:
467 forward_line = "dd: %s" % line
468 else:
469 forward_line = None
470
471 elif prog == PROG_DD_PID:
472 if self._dd_pid:
473 raise RuntimeError("dd PID reported more than once")
474 logging.debug("Received dd PID %r", line)
475 self._dd_pid = int(line)
476 forward_line = None
477
478 elif prog == PROG_EXP_SIZE:
479 logging.debug("Received predicted size %r", line)
480 forward_line = None
481
482 if line:
483 try:
484 exp_size = utils.BytesToMebibyte(int(line))
485 except (ValueError, TypeError), err:
486 logging.error("Failed to convert predicted size %r to number: %s",
487 line, err)
488 exp_size = None
489 else:
490 exp_size = None
491
492 self._exp_size = exp_size
493
494 if forward_line:
495 self._logger.info(forward_line)
496 self._status_file.AddRecentOutput(forward_line)
497
498 self._status_file.Update(force_update)
499
500 @staticmethod
525
527 """Interprets a line of dd(1)'s output.
528
529 """
530 m = DD_INFO_RE.match(line)
531 if m:
532 seconds = float(m.group("seconds"))
533 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
534 self._UpdateDdProgress(seconds, mbytes)
535 return (False, True)
536
537 m = DD_STDERR_IGNORE.match(line)
538 if m:
539
540 return (False, False)
541
542
543 return (True, False)
544
546 """Updates the internal status variables for dd(1) progress.
547
548 @type seconds: float
549 @param seconds: Timestamp of this update
550 @type mbytes: float
551 @param mbytes: Total number of MiB transferred so far
552
553 """
554
555 self._dd_progress.append((seconds, mbytes))
556
557
558 del self._dd_progress[:-self._dd_tp_samples]
559
560
561 throughput = _CalcThroughput(self._dd_progress)
562
563
564 percent = None
565 eta = None
566
567 if self._exp_size is not None:
568 if self._exp_size != 0:
569 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
570
571 if throughput:
572 eta = max(0, float(self._exp_size - mbytes) / throughput)
573
574 self._status_file.SetProgress(mbytes, throughput, percent, eta)
575
578 """Calculates the throughput in MiB/second.
579
580 @type samples: sequence
581 @param samples: List of samples, each consisting of a (timestamp, mbytes)
582 tuple
583 @rtype: float or None
584 @return: Throughput in MiB/second
585
586 """
587 if len(samples) < 2:
588
589 return None
590
591 (start_time, start_mbytes) = samples[0]
592 (end_time, end_mbytes) = samples[-1]
593
594 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
595