1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 import os
27 import re
28 import socket
29 import logging
30 import signal
31 import errno
32 import time
33 from cStringIO import StringIO
34
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import netutils
39 from ganeti import compat
40
41
42
43
44
45 LISTENING_RE = re.compile(r"^listening on\s+"
46 r"AF=(?P<family>\d+)\s+"
47 r"(?P<address>.+):(?P<port>\d+)$", re.I)
48
49
50 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
51 re.I)
52
53 SOCAT_LOG_DEBUG = "D"
54 SOCAT_LOG_INFO = "I"
55 SOCAT_LOG_NOTICE = "N"
56 SOCAT_LOG_WARNING = "W"
57 SOCAT_LOG_ERROR = "E"
58 SOCAT_LOG_FATAL = "F"
59
60 SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
61 SOCAT_LOG_DEBUG,
62 SOCAT_LOG_INFO,
63 SOCAT_LOG_NOTICE,
64 ])
65
66
67 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
68 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
69
70
71 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
72
73
74
75 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
76
77
78 BUFSIZE = 1024 * 1024
79
80
81 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
82 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
83 "cipher=%s" % constants.OPENSSL_CIPHERS]
84
85 if constants.SOCAT_USE_COMPRESS:
86
87
88 SOCAT_OPENSSL_OPTS.append("compress=none")
89
90 SOCAT_OPTION_MAXLEN = 400
91
92 (PROG_OTHER,
93 PROG_SOCAT,
94 PROG_DD,
95 PROG_DD_PID,
96 PROG_EXP_SIZE) = range(1, 6)
97
98 PROG_ALL = compat.UniqueFrozenset([
99 PROG_OTHER,
100 PROG_SOCAT,
101 PROG_DD,
102 PROG_DD_PID,
103 PROG_EXP_SIZE,
104 ])
108 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
109 """Initializes this class.
110
111 @param mode: Daemon mode (import or export)
112 @param opts: Options object
113 @type socat_stderr_fd: int
114 @param socat_stderr_fd: File descriptor socat should write its stderr to
115 @type dd_stderr_fd: int
116 @param dd_stderr_fd: File descriptor dd should write its stderr to
117 @type dd_pid_fd: int
118 @param dd_pid_fd: File descriptor the child should write dd's PID to
119
120 """
121 self._opts = opts
122 self._mode = mode
123 self._socat_stderr_fd = socat_stderr_fd
124 self._dd_stderr_fd = dd_stderr_fd
125 self._dd_pid_fd = dd_pid_fd
126
127 assert (self._opts.magic is None or
128 constants.IE_MAGIC_RE.match(self._opts.magic))
129
130 @staticmethod
132 """Prepares a command to be run in Bash.
133
134 """
135 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
136
138 """Returns the socat command.
139
140 """
141 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
142 "key=%s" % self._opts.key,
143 "cert=%s" % self._opts.cert,
144 "cafile=%s" % self._opts.ca,
145 ]
146
147 if self._opts.bind is not None:
148 common_addr_opts.append("bind=%s" % self._opts.bind)
149
150 assert not (self._opts.ipv4 and self._opts.ipv6)
151
152 if self._opts.ipv4:
153 common_addr_opts.append("pf=ipv4")
154 elif self._opts.ipv6:
155 common_addr_opts.append("pf=ipv6")
156
157 if self._mode == constants.IEM_IMPORT:
158 if self._opts.port is None:
159 port = 0
160 else:
161 port = self._opts.port
162
163 addr1 = [
164 "OPENSSL-LISTEN:%s" % port,
165 "reuseaddr",
166
167
168
169 "forever",
170 "intervall=0.01",
171 ] + common_addr_opts
172 addr2 = ["stdout"]
173
174 elif self._mode == constants.IEM_EXPORT:
175 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
176 host = "[%s]" % self._opts.host
177 else:
178 host = self._opts.host
179
180 addr1 = ["stdin"]
181 addr2 = [
182 "OPENSSL:%s:%s" % (host, self._opts.port),
183
184
185 "connect-timeout=%s" % self._opts.connect_timeout,
186
187
188 "retry=%s" % self._opts.connect_retries,
189 "intervall=1",
190 ] + common_addr_opts
191
192 else:
193 raise errors.GenericError("Invalid mode '%s'" % self._mode)
194
195 for i in [addr1, addr2]:
196 for value in i:
197 if len(value) > SOCAT_OPTION_MAXLEN:
198 raise errors.GenericError("Socat option longer than %s"
199 " characters: %r" %
200 (SOCAT_OPTION_MAXLEN, value))
201 if "," in value:
202 raise errors.GenericError("Comma not allowed in socat option"
203 " value: %r" % value)
204
205 return [
206 constants.SOCAT_PATH,
207
208
209 "-ls",
210
211
212 "-d", "-d",
213
214
215 "-b%s" % BUFSIZE,
216
217
218
219 "-u",
220
221 ",".join(addr1), ",".join(addr2),
222 ]
223
225 """Returns the command to read/write the magic value.
226
227 """
228 if not self._opts.magic:
229 return None
230
231
232 magic = "M=%s" % self._opts.magic
233
234 cmd = StringIO()
235
236 if self._mode == constants.IEM_IMPORT:
237 cmd.write("{ ")
238 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
239 cmd.write(" && ")
240 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
241 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
242 cmd.write(" exit 1;")
243 cmd.write("fi;")
244 cmd.write(" }")
245
246 elif self._mode == constants.IEM_EXPORT:
247 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
248
249 else:
250 raise errors.GenericError("Invalid mode '%s'" % self._mode)
251
252 return cmd.getvalue()
253
255 """Returns the command for measuring throughput.
256
257 """
258 dd_cmd = StringIO()
259
260 magic_cmd = self._GetMagicCommand()
261 if magic_cmd:
262 dd_cmd.write("{ ")
263 dd_cmd.write(magic_cmd)
264 dd_cmd.write(" && ")
265
266 dd_cmd.write("{ ")
267
268
269
270 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
271 (BUFSIZE, self._dd_stderr_fd))
272
273 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
274
275 dd_cmd.write(" wait $pid;")
276 dd_cmd.write(" }")
277
278 if magic_cmd:
279 dd_cmd.write(" }")
280
281 return dd_cmd.getvalue()
282
284 """Returns the command for the transport part of the daemon.
285
286 """
287 socat_cmd = ("%s 2>&%d" %
288 (utils.ShellQuoteArgs(self._GetSocatCommand()),
289 self._socat_stderr_fd))
290 dd_cmd = self._GetDdCommand()
291
292 compr = self._opts.compress
293
294 assert compr in constants.IEC_ALL
295
296 parts = []
297
298 if self._mode == constants.IEM_IMPORT:
299 parts.append(socat_cmd)
300
301 if compr == constants.IEC_GZIP:
302 parts.append("gunzip -c")
303
304 parts.append(dd_cmd)
305
306 elif self._mode == constants.IEM_EXPORT:
307 parts.append(dd_cmd)
308
309 if compr == constants.IEC_GZIP:
310 parts.append("gzip -c")
311
312 parts.append(socat_cmd)
313
314 else:
315 raise errors.GenericError("Invalid mode '%s'" % self._mode)
316
317
318
319
320 return self.GetBashCommand(" | ".join(parts))
321
323 """Returns the complete child process command.
324
325 """
326 transport_cmd = self._GetTransportCommand()
327
328 buf = StringIO()
329
330 if self._opts.cmd_prefix:
331 buf.write(self._opts.cmd_prefix)
332 buf.write(" ")
333
334 buf.write(utils.ShellQuoteArgs(transport_cmd))
335
336 if self._opts.cmd_suffix:
337 buf.write(" ")
338 buf.write(self._opts.cmd_suffix)
339
340 return self.GetBashCommand(buf.getvalue())
341
344 """Verify address given as listening address by socat.
345
346 """
347 if family not in (socket.AF_INET, socket.AF_INET6):
348 raise errors.GenericError("Address family %r not supported" % family)
349
350 if (family == socket.AF_INET6 and address.startswith("[") and
351 address.endswith("]")):
352 address = address.lstrip("[").rstrip("]")
353
354 try:
355 packed_address = socket.inet_pton(family, address)
356 except socket.error:
357 raise errors.GenericError("Invalid address %r for family %s" %
358 (address, family))
359
360 return (socket.inet_ntop(family, packed_address), port)
361
364 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
365 """Initializes this class.
366
367 """
368 self._debug = debug
369 self._status_file = status_file
370 self._logger = logger
371
372 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
373 for prog in PROG_ALL])
374
375 self._dd_pid = None
376 self._dd_ready = False
377 self._dd_tp_samples = throughput_samples
378 self._dd_progress = []
379
380
381 self._exp_size = exp_size
382
384 """Returns the line splitter for a program.
385
386 """
387 return self._splitter[prog]
388
390 """Flushes all line splitters.
391
392 """
393 for ls in self._splitter.itervalues():
394 ls.flush()
395
397 """Closes all line splitters.
398
399 """
400 for ls in self._splitter.itervalues():
401 ls.close()
402 self._splitter.clear()
403
405 """Tells dd(1) to write statistics.
406
407 """
408 if self._dd_pid is None:
409
410 return False
411
412 if not self._dd_ready:
413
414
415
416
417 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
418 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
419 return False
420
421 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
422 self._dd_ready = True
423
424 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
425 try:
426 os.kill(self._dd_pid, DD_INFO_SIGNAL)
427 except EnvironmentError, err:
428 if err.errno != errno.ESRCH:
429 raise
430
431
432 logging.debug("dd exited")
433 self._dd_pid = None
434
435 return True
436
438 """Takes care of child process output.
439
440 @type line: string
441 @param line: Child output line
442 @type prog: number
443 @param prog: Program from which the line originates
444
445 """
446 force_update = False
447 forward_line = line
448
449 if prog == PROG_SOCAT:
450 level = None
451 parts = line.split(None, 4)
452
453 if len(parts) == 5:
454 (_, _, _, level, msg) = parts
455
456 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
457
458 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
459 forward_line = "socat: %s %s" % (level, msg)
460 else:
461 forward_line = None
462 else:
463 forward_line = "socat: %s" % line
464
465 elif prog == PROG_DD:
466 (should_forward, force_update) = self._ProcessDdOutput(line)
467
468 if should_forward or self._debug:
469 forward_line = "dd: %s" % line
470 else:
471 forward_line = None
472
473 elif prog == PROG_DD_PID:
474 if self._dd_pid:
475 raise RuntimeError("dd PID reported more than once")
476 logging.debug("Received dd PID %r", line)
477 self._dd_pid = int(line)
478 forward_line = None
479
480 elif prog == PROG_EXP_SIZE:
481 logging.debug("Received predicted size %r", line)
482 forward_line = None
483
484 if line:
485 try:
486 exp_size = utils.BytesToMebibyte(int(line))
487 except (ValueError, TypeError), err:
488 logging.error("Failed to convert predicted size %r to number: %s",
489 line, err)
490 exp_size = None
491 else:
492 exp_size = None
493
494 self._exp_size = exp_size
495
496 if forward_line:
497 self._logger.info(forward_line)
498 self._status_file.AddRecentOutput(forward_line)
499
500 self._status_file.Update(force_update)
501
502 @staticmethod
527
529 """Interprets a line of dd(1)'s output.
530
531 """
532 m = DD_INFO_RE.match(line)
533 if m:
534 seconds = float(m.group("seconds"))
535 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
536 self._UpdateDdProgress(seconds, mbytes)
537 return (False, True)
538
539 m = DD_STDERR_IGNORE.match(line)
540 if m:
541
542 return (False, False)
543
544
545 return (True, False)
546
548 """Updates the internal status variables for dd(1) progress.
549
550 @type seconds: float
551 @param seconds: Timestamp of this update
552 @type mbytes: float
553 @param mbytes: Total number of MiB transferred so far
554
555 """
556
557 self._dd_progress.append((seconds, mbytes))
558
559
560 del self._dd_progress[:-self._dd_tp_samples]
561
562
563 throughput = _CalcThroughput(self._dd_progress)
564
565
566 percent = None
567 eta = None
568
569 if self._exp_size is not None:
570 if self._exp_size != 0:
571 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
572
573 if throughput:
574 eta = max(0, float(self._exp_size - mbytes) / throughput)
575
576 self._status_file.SetProgress(mbytes, throughput, percent, eta)
577
580 """Calculates the throughput in MiB/second.
581
582 @type samples: sequence
583 @param samples: List of samples, each consisting of a (timestamp, mbytes)
584 tuple
585 @rtype: float or None
586 @return: Throughput in MiB/second
587
588 """
589 if len(samples) < 2:
590
591 return None
592
593 (start_time, start_mbytes) = samples[0]
594 (end_time, end_mbytes) = samples[-1]
595
596 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
597