1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Classes and functions for import/export daemon.
32
33 """
34
35 import os
36 import re
37 import socket
38 import logging
39 import signal
40 import errno
41 import time
42 from cStringIO import StringIO
43
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import utils
47 from ganeti import netutils
48 from ganeti import compat
49
50
51
52
53
54 LISTENING_RE = re.compile(r"^listening on\s+"
55 r"AF=(?P<family>\d+)\s+"
56 r"(?P<address>.+):(?P<port>\d+)$", re.I)
57
58
59 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
60 re.I)
61
62 SOCAT_LOG_DEBUG = "D"
63 SOCAT_LOG_INFO = "I"
64 SOCAT_LOG_NOTICE = "N"
65 SOCAT_LOG_WARNING = "W"
66 SOCAT_LOG_ERROR = "E"
67 SOCAT_LOG_FATAL = "F"
68
69 SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
70 SOCAT_LOG_DEBUG,
71 SOCAT_LOG_INFO,
72 SOCAT_LOG_NOTICE,
73 ])
74
75
76 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
77 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
78
79
80 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
81
82
83
84 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
85
86
87 BUFSIZE = 1024 * 1024
88
89
90 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
91 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
92 "cipher=%s" % constants.OPENSSL_CIPHERS]
93
94 if constants.SOCAT_USE_COMPRESS:
95
96
97 SOCAT_OPENSSL_OPTS.append("compress=none")
98
99 SOCAT_OPTION_MAXLEN = 400
100
101 (PROG_OTHER,
102 PROG_SOCAT,
103 PROG_DD,
104 PROG_DD_PID,
105 PROG_EXP_SIZE) = range(1, 6)
106
107 PROG_ALL = compat.UniqueFrozenset([
108 PROG_OTHER,
109 PROG_SOCAT,
110 PROG_DD,
111 PROG_DD_PID,
112 PROG_EXP_SIZE,
113 ])
117 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
118 """Initializes this class.
119
120 @param mode: Daemon mode (import or export)
121 @param opts: Options object
122 @type socat_stderr_fd: int
123 @param socat_stderr_fd: File descriptor socat should write its stderr to
124 @type dd_stderr_fd: int
125 @param dd_stderr_fd: File descriptor dd should write its stderr to
126 @type dd_pid_fd: int
127 @param dd_pid_fd: File descriptor the child should write dd's PID to
128
129 """
130 self._opts = opts
131 self._mode = mode
132 self._socat_stderr_fd = socat_stderr_fd
133 self._dd_stderr_fd = dd_stderr_fd
134 self._dd_pid_fd = dd_pid_fd
135
136 assert (self._opts.magic is None or
137 constants.IE_MAGIC_RE.match(self._opts.magic))
138
139 @staticmethod
141 """Prepares a command to be run in Bash.
142
143 """
144 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
145
147 """Returns the socat command.
148
149 """
150 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
151 "key=%s" % self._opts.key,
152 "cert=%s" % self._opts.cert,
153 "cafile=%s" % self._opts.ca,
154 ]
155
156 if self._opts.bind is not None:
157 common_addr_opts.append("bind=%s" % self._opts.bind)
158
159 assert not (self._opts.ipv4 and self._opts.ipv6)
160
161 if self._opts.ipv4:
162 common_addr_opts.append("pf=ipv4")
163 elif self._opts.ipv6:
164 common_addr_opts.append("pf=ipv6")
165
166 if self._mode == constants.IEM_IMPORT:
167 if self._opts.port is None:
168 port = 0
169 else:
170 port = self._opts.port
171
172 addr1 = [
173 "OPENSSL-LISTEN:%s" % port,
174 "reuseaddr",
175
176
177
178 "forever",
179 "intervall=0.01",
180 ] + common_addr_opts
181 addr2 = ["stdout"]
182
183 elif self._mode == constants.IEM_EXPORT:
184 if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
185 host = "[%s]" % self._opts.host
186 else:
187 host = self._opts.host
188
189 addr1 = ["stdin"]
190 addr2 = [
191 "OPENSSL:%s:%s" % (host, self._opts.port),
192
193
194 "connect-timeout=%s" % self._opts.connect_timeout,
195
196
197 "retry=%s" % self._opts.connect_retries,
198 "intervall=1",
199 ] + common_addr_opts
200
201 else:
202 raise errors.GenericError("Invalid mode '%s'" % self._mode)
203
204 for i in [addr1, addr2]:
205 for value in i:
206 if len(value) > SOCAT_OPTION_MAXLEN:
207 raise errors.GenericError("Socat option longer than %s"
208 " characters: %r" %
209 (SOCAT_OPTION_MAXLEN, value))
210 if "," in value:
211 raise errors.GenericError("Comma not allowed in socat option"
212 " value: %r" % value)
213
214 return [
215 constants.SOCAT_PATH,
216
217
218 "-ls",
219
220
221 "-d", "-d",
222
223
224 "-b%s" % BUFSIZE,
225
226
227
228 "-u",
229
230 ",".join(addr1), ",".join(addr2),
231 ]
232
234 """Returns the command to read/write the magic value.
235
236 """
237 if not self._opts.magic:
238 return None
239
240
241 magic = "M=%s" % self._opts.magic
242
243 cmd = StringIO()
244
245 if self._mode == constants.IEM_IMPORT:
246 cmd.write("{ ")
247 cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
248 cmd.write(" && ")
249 cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
250 cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
251 cmd.write(" exit 1;")
252 cmd.write("fi;")
253 cmd.write(" }")
254
255 elif self._mode == constants.IEM_EXPORT:
256 cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
257
258 else:
259 raise errors.GenericError("Invalid mode '%s'" % self._mode)
260
261 return cmd.getvalue()
262
264 """Returns the command for measuring throughput.
265
266 """
267 dd_cmd = StringIO()
268
269 magic_cmd = self._GetMagicCommand()
270 if magic_cmd:
271 dd_cmd.write("{ ")
272 dd_cmd.write(magic_cmd)
273 dd_cmd.write(" && ")
274
275 dd_cmd.write("{ ")
276
277
278
279 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
280 (BUFSIZE, self._dd_stderr_fd))
281
282 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
283
284 dd_cmd.write(" wait $pid;")
285 dd_cmd.write(" }")
286
287 if magic_cmd:
288 dd_cmd.write(" }")
289
290 return dd_cmd.getvalue()
291
293 """Returns the command for the transport part of the daemon.
294
295 """
296 socat_cmd = ("%s 2>&%d" %
297 (utils.ShellQuoteArgs(self._GetSocatCommand()),
298 self._socat_stderr_fd))
299 dd_cmd = self._GetDdCommand()
300
301 compr = self._opts.compress
302
303 assert compr in constants.IEC_ALL
304
305 parts = []
306
307 if self._mode == constants.IEM_IMPORT:
308 parts.append(socat_cmd)
309
310 if compr == constants.IEC_GZIP:
311 parts.append("gunzip -c")
312
313 parts.append(dd_cmd)
314
315 elif self._mode == constants.IEM_EXPORT:
316 parts.append(dd_cmd)
317
318 if compr == constants.IEC_GZIP:
319 parts.append("gzip -c")
320
321 parts.append(socat_cmd)
322
323 else:
324 raise errors.GenericError("Invalid mode '%s'" % self._mode)
325
326
327
328
329 return self.GetBashCommand(" | ".join(parts))
330
332 """Returns the complete child process command.
333
334 """
335 transport_cmd = self._GetTransportCommand()
336
337 buf = StringIO()
338
339 if self._opts.cmd_prefix:
340 buf.write(self._opts.cmd_prefix)
341 buf.write(" ")
342
343 buf.write(utils.ShellQuoteArgs(transport_cmd))
344
345 if self._opts.cmd_suffix:
346 buf.write(" ")
347 buf.write(self._opts.cmd_suffix)
348
349 return self.GetBashCommand(buf.getvalue())
350
353 """Verify address given as listening address by socat.
354
355 """
356 if family not in (socket.AF_INET, socket.AF_INET6):
357 raise errors.GenericError("Address family %r not supported" % family)
358
359 if (family == socket.AF_INET6 and address.startswith("[") and
360 address.endswith("]")):
361 address = address.lstrip("[").rstrip("]")
362
363 try:
364 packed_address = socket.inet_pton(family, address)
365 except socket.error:
366 raise errors.GenericError("Invalid address %r for family %s" %
367 (address, family))
368
369 return (socket.inet_ntop(family, packed_address), port)
370
373 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
374 """Initializes this class.
375
376 """
377 self._debug = debug
378 self._status_file = status_file
379 self._logger = logger
380
381 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
382 for prog in PROG_ALL])
383
384 self._dd_pid = None
385 self._dd_ready = False
386 self._dd_tp_samples = throughput_samples
387 self._dd_progress = []
388
389
390 self._exp_size = exp_size
391
393 """Returns the line splitter for a program.
394
395 """
396 return self._splitter[prog]
397
399 """Flushes all line splitters.
400
401 """
402 for ls in self._splitter.itervalues():
403 ls.flush()
404
406 """Closes all line splitters.
407
408 """
409 for ls in self._splitter.itervalues():
410 ls.close()
411 self._splitter.clear()
412
414 """Tells dd(1) to write statistics.
415
416 """
417 if self._dd_pid is None:
418
419 return False
420
421 if not self._dd_ready:
422
423
424
425
426 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
427 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
428 return False
429
430 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
431 self._dd_ready = True
432
433 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
434 try:
435 os.kill(self._dd_pid, DD_INFO_SIGNAL)
436 except EnvironmentError, err:
437 if err.errno != errno.ESRCH:
438 raise
439
440
441 logging.debug("dd exited")
442 self._dd_pid = None
443
444 return True
445
447 """Takes care of child process output.
448
449 @type line: string
450 @param line: Child output line
451 @type prog: number
452 @param prog: Program from which the line originates
453
454 """
455 force_update = False
456 forward_line = line
457
458 if prog == PROG_SOCAT:
459 level = None
460 parts = line.split(None, 4)
461
462 if len(parts) == 5:
463 (_, _, _, level, msg) = parts
464
465 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
466
467 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
468 forward_line = "socat: %s %s" % (level, msg)
469 else:
470 forward_line = None
471 else:
472 forward_line = "socat: %s" % line
473
474 elif prog == PROG_DD:
475 (should_forward, force_update) = self._ProcessDdOutput(line)
476
477 if should_forward or self._debug:
478 forward_line = "dd: %s" % line
479 else:
480 forward_line = None
481
482 elif prog == PROG_DD_PID:
483 if self._dd_pid:
484 raise RuntimeError("dd PID reported more than once")
485 logging.debug("Received dd PID %r", line)
486 self._dd_pid = int(line)
487 forward_line = None
488
489 elif prog == PROG_EXP_SIZE:
490 logging.debug("Received predicted size %r", line)
491 forward_line = None
492
493 if line:
494 try:
495 exp_size = utils.BytesToMebibyte(int(line))
496 except (ValueError, TypeError), err:
497 logging.error("Failed to convert predicted size %r to number: %s",
498 line, err)
499 exp_size = None
500 else:
501 exp_size = None
502
503 self._exp_size = exp_size
504
505 if forward_line:
506 self._logger.info(forward_line)
507 self._status_file.AddRecentOutput(forward_line)
508
509 self._status_file.Update(force_update)
510
511 @staticmethod
536
538 """Interprets a line of dd(1)'s output.
539
540 """
541 m = DD_INFO_RE.match(line)
542 if m:
543 seconds = float(m.group("seconds"))
544 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
545 self._UpdateDdProgress(seconds, mbytes)
546 return (False, True)
547
548 m = DD_STDERR_IGNORE.match(line)
549 if m:
550
551 return (False, False)
552
553
554 return (True, False)
555
557 """Updates the internal status variables for dd(1) progress.
558
559 @type seconds: float
560 @param seconds: Timestamp of this update
561 @type mbytes: float
562 @param mbytes: Total number of MiB transferred so far
563
564 """
565
566 self._dd_progress.append((seconds, mbytes))
567
568
569 del self._dd_progress[:-self._dd_tp_samples]
570
571
572 throughput = _CalcThroughput(self._dd_progress)
573
574
575 percent = None
576 eta = None
577
578 if self._exp_size is not None:
579 if self._exp_size != 0:
580 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
581
582 if throughput:
583 eta = max(0, float(self._exp_size - mbytes) / throughput)
584
585 self._status_file.SetProgress(mbytes, throughput, percent, eta)
586
589 """Calculates the throughput in MiB/second.
590
591 @type samples: sequence
592 @param samples: List of samples, each consisting of a (timestamp, mbytes)
593 tuple
594 @rtype: float or None
595 @return: Throughput in MiB/second
596
597 """
598 if len(samples) < 2:
599
600 return None
601
602 (start_time, start_mbytes) = samples[0]
603 (end_time, end_mbytes) = samples[-1]
604
605 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
606