1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Classes and functions for import/export daemon.
32
33 """
34
35 import os
36 import re
37 import socket
38 import logging
39 import signal
40 import errno
41 import time
42 from cStringIO import StringIO
43
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import utils
47 from ganeti import netutils
48 from ganeti import compat
49
50
51
52
53
54 LISTENING_RE = re.compile(r"^listening on\s+"
55 r"AF=(?P<family>\d+)\s+"
56 r"(?P<address>.+):(?P<port>\d+)$", re.I)
57
58
59 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
60 re.I)
61
62 SOCAT_LOG_DEBUG = "D"
63 SOCAT_LOG_INFO = "I"
64 SOCAT_LOG_NOTICE = "N"
65 SOCAT_LOG_WARNING = "W"
66 SOCAT_LOG_ERROR = "E"
67 SOCAT_LOG_FATAL = "F"
68
69 SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
70 SOCAT_LOG_DEBUG,
71 SOCAT_LOG_INFO,
72 SOCAT_LOG_NOTICE,
73 ])
74
75
76 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
77 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
78
79
80 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
81
82
83
84 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
85
86
87 BUFSIZE = 1024 * 1024
88
89
90 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
92 "cipher=%s" % constants.OPENSSL_CIPHERS]
93
94 if constants.SOCAT_USE_COMPRESS:
95
96
97 SOCAT_OPENSSL_OPTS.append("compress=none")
98
99 SOCAT_OPTION_MAXLEN = 400
100
101 (PROG_OTHER,
102 PROG_SOCAT,
103 PROG_DD,
104 PROG_DD_PID,
105 PROG_EXP_SIZE) = range(1, 6)
106
107 PROG_ALL = compat.UniqueFrozenset([
108 PROG_OTHER,
109 PROG_SOCAT,
110 PROG_DD,
111 PROG_DD_PID,
112 PROG_EXP_SIZE,
113 ])
117 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
118 """Initializes this class.
119
120 @param mode: Daemon mode (import or export)
121 @param opts: Options object
122 @type socat_stderr_fd: int
123 @param socat_stderr_fd: File descriptor socat should write its stderr to
124 @type dd_stderr_fd: int
125 @param dd_stderr_fd: File descriptor dd should write its stderr to
126 @type dd_pid_fd: int
127 @param dd_pid_fd: File descriptor the child should write dd's PID to
128
129 """
130 self._opts = opts
131 self._mode = mode
132 self._socat_stderr_fd = socat_stderr_fd
133 self._dd_stderr_fd = dd_stderr_fd
134 self._dd_pid_fd = dd_pid_fd
135
136 assert (self._opts.magic is None or
137 constants.IE_MAGIC_RE.match(self._opts.magic))
138
139 @staticmethod
141 """Prepares a command to be run in Bash.
142
143 """
144 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
145
147 """Returns the socat command.
148
149 """
150 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
151 "key=%s" % self._opts.key,
152 "cert=%s" % self._opts.cert,
153 "cafile=%s" % self._opts.ca,
154 ]
155
156 if self._opts.bind is not None:
157 common_addr_opts.append("bind=%s" % self._opts.bind)
158
159 assert not (self._opts.ipv4 and self._opts.ipv6)
160
161 if self._opts.ipv4:
162 common_addr_opts.append("pf=ipv4")
163 elif self._opts.ipv6:
164 common_addr_opts.append("pf=ipv6")
165
166 if self._mode == constants.IEM_IMPORT:
167 if self._opts.port is None:
168 port = 0
169 else:
170 port = self._opts.port
171
172 addr1 = [
173 "OPENSSL-LISTEN:%s" % port,
174 "reuseaddr",
175
176
177
178 "forever",
179 "intervall=0.01",
180 ] + common_addr_opts
181 addr2 = ["stdout"]
182
183 elif self._mode == constants.IEM_EXPORT:
184 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
185 host = "[%s]" % self._opts.host
186 else:
187 host = self._opts.host
188
189 addr1 = ["stdin"]
190 addr2 = [
191 "OPENSSL:%s:%s" % (host, self._opts.port),
192
193
194 "connect-timeout=%s" % self._opts.connect_timeout,
195
196
197 "retry=%s" % self._opts.connect_retries,
198 "intervall=1",
199 ] + common_addr_opts
200
201 else:
202 raise errors.GenericError("Invalid mode '%s'" % self._mode)
203
204 for i in [addr1, addr2]:
205 for value in i:
206 if len(value) > SOCAT_OPTION_MAXLEN:
207 raise errors.GenericError("Socat option longer than %s"
208 " characters: %r" %
209 (SOCAT_OPTION_MAXLEN, value))
210 if "," in value:
211 raise errors.GenericError("Comma not allowed in socat option"
212 " value: %r" % value)
213
214 return [
215 constants.SOCAT_PATH,
216
217
218 "-ls",
219
220
221 "-d", "-d",
222
223
224 "-b%s" % BUFSIZE,
225
226
227
228 "-u",
229
230 ",".join(addr1), ",".join(addr2),
231 ]
232
234 """Returns the command to read/write the magic value.
235
236 """
237 if not self._opts.magic:
238 return None
239
240
241 magic = "M=%s" % self._opts.magic
242
243 cmd = StringIO()
244
245 if self._mode == constants.IEM_IMPORT:
246 cmd.write("{ ")
247 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
248 cmd.write(" && ")
249 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
250 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
251 cmd.write(" exit 1;")
252 cmd.write("fi;")
253 cmd.write(" }")
254
255 elif self._mode == constants.IEM_EXPORT:
256 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
257
258 else:
259 raise errors.GenericError("Invalid mode '%s'" % self._mode)
260
261 return cmd.getvalue()
262
264 """Returns the command for measuring throughput.
265
266 """
267 dd_cmd = StringIO()
268
269 magic_cmd = self._GetMagicCommand()
270 if magic_cmd:
271 dd_cmd.write("{ ")
272 dd_cmd.write(magic_cmd)
273 dd_cmd.write(" && ")
274
275 dd_cmd.write("{ ")
276
277
278
279 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
280 (BUFSIZE, self._dd_stderr_fd))
281
282 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
283
284 dd_cmd.write(" wait $pid;")
285 dd_cmd.write(" }")
286
287 if magic_cmd:
288 dd_cmd.write(" }")
289
290 return dd_cmd.getvalue()
291
343
345 """Returns the complete child process command.
346
347 """
348 transport_cmd = self._GetTransportCommand()
349
350 buf = StringIO()
351
352 if self._opts.cmd_prefix:
353 buf.write(self._opts.cmd_prefix)
354 buf.write(" ")
355
356 buf.write(utils.ShellQuoteArgs(transport_cmd))
357
358 if self._opts.cmd_suffix:
359 buf.write(" ")
360 buf.write(self._opts.cmd_suffix)
361
362 return self.GetBashCommand(buf.getvalue())
363
366 """Verify address given as listening address by socat.
367
368 """
369 if family not in (socket.AF_INET, socket.AF_INET6):
370 raise errors.GenericError("Address family %r not supported" % family)
371
372 if (family == socket.AF_INET6 and address.startswith("[") and
373 address.endswith("]")):
374 address = address.lstrip("[").rstrip("]")
375
376 try:
377 packed_address = socket.inet_pton(family, address)
378 except socket.error:
379 raise errors.GenericError("Invalid address %r for family %s" %
380 (address, family))
381
382 return (socket.inet_ntop(family, packed_address), port)
383
386 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
387 """Initializes this class.
388
389 """
390 self._debug = debug
391 self._status_file = status_file
392 self._logger = logger
393
394 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
395 for prog in PROG_ALL])
396
397 self._dd_pid = None
398 self._dd_ready = False
399 self._dd_tp_samples = throughput_samples
400 self._dd_progress = []
401
402
403 self._exp_size = exp_size
404
406 """Returns the line splitter for a program.
407
408 """
409 return self._splitter[prog]
410
412 """Flushes all line splitters.
413
414 """
415 for ls in self._splitter.itervalues():
416 ls.flush()
417
419 """Closes all line splitters.
420
421 """
422 for ls in self._splitter.itervalues():
423 ls.close()
424 self._splitter.clear()
425
427 """Tells dd(1) to write statistics.
428
429 """
430 if self._dd_pid is None:
431
432 return False
433
434 if not self._dd_ready:
435
436
437
438
439 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
440 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
441 return False
442
443 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
444 self._dd_ready = True
445
446 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
447 try:
448 os.kill(self._dd_pid, DD_INFO_SIGNAL)
449 except EnvironmentError, err:
450 if err.errno != errno.ESRCH:
451 raise
452
453
454 logging.debug("dd exited")
455 self._dd_pid = None
456
457 return True
458
460 """Takes care of child process output.
461
462 @type line: string
463 @param line: Child output line
464 @type prog: number
465 @param prog: Program from which the line originates
466
467 """
468 force_update = False
469 forward_line = line
470
471 if prog == PROG_SOCAT:
472 level = None
473 parts = line.split(None, 4)
474
475 if len(parts) == 5:
476 (_, _, _, level, msg) = parts
477
478 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
479
480 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
481 forward_line = "socat: %s %s" % (level, msg)
482 else:
483 forward_line = None
484 else:
485 forward_line = "socat: %s" % line
486
487 elif prog == PROG_DD:
488 (should_forward, force_update) = self._ProcessDdOutput(line)
489
490 if should_forward or self._debug:
491 forward_line = "dd: %s" % line
492 else:
493 forward_line = None
494
495 elif prog == PROG_DD_PID:
496 if self._dd_pid:
497 raise RuntimeError("dd PID reported more than once")
498 logging.debug("Received dd PID %r", line)
499 self._dd_pid = int(line)
500 forward_line = None
501
502 elif prog == PROG_EXP_SIZE:
503 logging.debug("Received predicted size %r", line)
504 forward_line = None
505
506 if line:
507 try:
508 exp_size = utils.BytesToMebibyte(int(line))
509 except (ValueError, TypeError), err:
510 logging.error("Failed to convert predicted size %r to number: %s",
511 line, err)
512 exp_size = None
513 else:
514 exp_size = None
515
516 self._exp_size = exp_size
517
518 if forward_line:
519 self._logger.info(forward_line)
520 self._status_file.AddRecentOutput(forward_line)
521
522 self._status_file.Update(force_update)
523
524 @staticmethod
549
551 """Interprets a line of dd(1)'s output.
552
553 """
554 m = DD_INFO_RE.match(line)
555 if m:
556 seconds = float(m.group("seconds"))
557 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
558 self._UpdateDdProgress(seconds, mbytes)
559 return (False, True)
560
561 m = DD_STDERR_IGNORE.match(line)
562 if m:
563
564 return (False, False)
565
566
567 return (True, False)
568
570 """Updates the internal status variables for dd(1) progress.
571
572 @type seconds: float
573 @param seconds: Timestamp of this update
574 @type mbytes: float
575 @param mbytes: Total number of MiB transferred so far
576
577 """
578
579 self._dd_progress.append((seconds, mbytes))
580
581
582 del self._dd_progress[:-self._dd_tp_samples]
583
584
585 throughput = _CalcThroughput(self._dd_progress)
586
587
588 percent = None
589 eta = None
590
591 if self._exp_size is not None:
592 if self._exp_size != 0:
593 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
594
595 if throughput:
596 eta = max(0, float(self._exp_size - mbytes) / throughput)
597
598 self._status_file.SetProgress(mbytes, throughput, percent, eta)
599
602 """Calculates the throughput in MiB/second.
603
604 @type samples: sequence
605 @param samples: List of samples, each consisting of a (timestamp, mbytes)
606 tuple
607 @rtype: float or None
608 @return: Throughput in MiB/second
609
610 """
611 if len(samples) < 2:
612
613 return None
614
615 (start_time, start_mbytes) = samples[0]
616 (end_time, end_mbytes) = samples[-1]
617
618 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
619