1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Classes and functions for import/export daemon.
23
24 """
25
26 import os
27 import re
28 import socket
29 import logging
30 import signal
31 import errno
32 import time
33 from cStringIO import StringIO
34
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import utils
38
39
40
41
42
43 LISTENING_RE = re.compile(r"^listening on\s+"
44 r"AF=(?P<family>\d+)\s+"
45 r"(?P<address>.+):(?P<port>\d+)$", re.I)
46
47
48 TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
49 re.I)
50
51 SOCAT_LOG_DEBUG = "D"
52 SOCAT_LOG_INFO = "I"
53 SOCAT_LOG_NOTICE = "N"
54 SOCAT_LOG_WARNING = "W"
55 SOCAT_LOG_ERROR = "E"
56 SOCAT_LOG_FATAL = "F"
57
58 SOCAT_LOG_IGNORE = frozenset([
59 SOCAT_LOG_DEBUG,
60 SOCAT_LOG_INFO,
61 SOCAT_LOG_NOTICE,
62 ])
63
64
65 DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
66 r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
67
68
69 DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
70
71
72
73 DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
74
75
76 BUFSIZE = 1024 * 1024
77
78
79 SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
80 SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
81 "cipher=%s" % constants.OPENSSL_CIPHERS]
82
83 SOCAT_OPTION_MAXLEN = 400
84
85 (PROG_OTHER,
86 PROG_SOCAT,
87 PROG_DD,
88 PROG_DD_PID,
89 PROG_EXP_SIZE) = range(1, 6)
90 PROG_ALL = frozenset([
91 PROG_OTHER,
92 PROG_SOCAT,
93 PROG_DD,
94 PROG_DD_PID,
95 PROG_EXP_SIZE,
96 ])
100 - def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
101 """Initializes this class.
102
103 @param mode: Daemon mode (import or export)
104 @param opts: Options object
105 @type socat_stderr_fd: int
106 @param socat_stderr_fd: File descriptor socat should write its stderr to
107 @type dd_stderr_fd: int
108 @param dd_stderr_fd: File descriptor dd should write its stderr to
109 @type dd_pid_fd: int
110 @param dd_pid_fd: File descriptor the child should write dd's PID to
111
112 """
113 self._opts = opts
114 self._mode = mode
115 self._socat_stderr_fd = socat_stderr_fd
116 self._dd_stderr_fd = dd_stderr_fd
117 self._dd_pid_fd = dd_pid_fd
118
119 assert (self._opts.magic is None or
120 constants.IE_MAGIC_RE.match(self._opts.magic))
121
122 @staticmethod
124 """Prepares a command to be run in Bash.
125
126 """
127 return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
128
130 """Returns the socat command.
131
132 """
133 common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
134 "key=%s" % self._opts.key,
135 "cert=%s" % self._opts.cert,
136 "cafile=%s" % self._opts.ca,
137 ]
138
139 if self._opts.bind is not None:
140 common_addr_opts.append("bind=%s" % self._opts.bind)
141
142 if self._mode == constants.IEM_IMPORT:
143 if self._opts.port is None:
144 port = 0
145 else:
146 port = self._opts.port
147
148 addr1 = [
149 "OPENSSL-LISTEN:%s" % port,
150 "reuseaddr",
151
152
153
154 "forever",
155 "intervall=0.01",
156 ] + common_addr_opts
157 addr2 = ["stdout"]
158
159 elif self._mode == constants.IEM_EXPORT:
160 addr1 = ["stdin"]
161 addr2 = [
162 "OPENSSL:%s:%s" % (self._opts.host, self._opts.port),
163
164
165 "connect-timeout=%s" % self._opts.connect_timeout,
166
167
168 "retry=%s" % self._opts.connect_retries,
169 "intervall=1",
170 ] + common_addr_opts
171
172 else:
173 raise errors.GenericError("Invalid mode '%s'" % self._mode)
174
175 for i in [addr1, addr2]:
176 for value in i:
177 if len(value) > SOCAT_OPTION_MAXLEN:
178 raise errors.GenericError("Socat option longer than %s"
179 " characters: %r" %
180 (SOCAT_OPTION_MAXLEN, value))
181 if "," in value:
182 raise errors.GenericError("Comma not allowed in socat option"
183 " value: %r" % value)
184
185 return [
186 constants.SOCAT_PATH,
187
188
189 "-ls",
190
191
192 "-d", "-d",
193
194
195 "-b%s" % BUFSIZE,
196
197
198
199 "-u",
200
201 ",".join(addr1), ",".join(addr2)
202 ]
203
233
235 """Returns the command for measuring throughput.
236
237 """
238 dd_cmd = StringIO()
239
240 magic_cmd = self._GetMagicCommand()
241 if magic_cmd:
242 dd_cmd.write("{ ")
243 dd_cmd.write(magic_cmd)
244 dd_cmd.write(" && ")
245
246 dd_cmd.write("{ ")
247
248
249
250 dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
251 (BUFSIZE, self._dd_stderr_fd))
252
253 dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
254
255 dd_cmd.write(" wait $pid;")
256 dd_cmd.write(" }")
257
258 if magic_cmd:
259 dd_cmd.write(" }")
260
261 return dd_cmd.getvalue()
262
301
303 """Returns the complete child process command.
304
305 """
306 transport_cmd = self._GetTransportCommand()
307
308 buf = StringIO()
309
310 if self._opts.cmd_prefix:
311 buf.write(self._opts.cmd_prefix)
312 buf.write(" ")
313
314 buf.write(utils.ShellQuoteArgs(transport_cmd))
315
316 if self._opts.cmd_suffix:
317 buf.write(" ")
318 buf.write(self._opts.cmd_suffix)
319
320 return self.GetBashCommand(buf.getvalue())
321
324 """Verify address given as listening address by socat.
325
326 """
327
328 if family != socket.AF_INET:
329 raise errors.GenericError("Address family %r not supported" % family)
330
331 try:
332 packed_address = socket.inet_pton(family, address)
333 except socket.error:
334 raise errors.GenericError("Invalid address %r for family %s" %
335 (address, family))
336
337 return (socket.inet_ntop(family, packed_address), port)
338
341 - def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
342 """Initializes this class.
343
344 """
345 self._debug = debug
346 self._status_file = status_file
347 self._logger = logger
348
349 self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
350 for prog in PROG_ALL])
351
352 self._dd_pid = None
353 self._dd_ready = False
354 self._dd_tp_samples = throughput_samples
355 self._dd_progress = []
356
357
358 self._exp_size = exp_size
359
361 """Returns the line splitter for a program.
362
363 """
364 return self._splitter[prog]
365
367 """Flushes all line splitters.
368
369 """
370 for ls in self._splitter.itervalues():
371 ls.flush()
372
374 """Closes all line splitters.
375
376 """
377 for ls in self._splitter.itervalues():
378 ls.close()
379 self._splitter.clear()
380
382 """Tells dd(1) to write statistics.
383
384 """
385 if self._dd_pid is None:
386
387 return False
388
389 if not self._dd_ready:
390
391
392
393
394 if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
395 logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
396 return False
397
398 logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
399 self._dd_ready = True
400
401 logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
402 try:
403 os.kill(self._dd_pid, DD_INFO_SIGNAL)
404 except EnvironmentError, err:
405 if err.errno != errno.ESRCH:
406 raise
407
408
409 logging.debug("dd exited")
410 self._dd_pid = None
411
412 return True
413
415 """Takes care of child process output.
416
417 @type line: string
418 @param line: Child output line
419 @type prog: number
420 @param prog: Program from which the line originates
421
422 """
423 force_update = False
424 forward_line = line
425
426 if prog == PROG_SOCAT:
427 level = None
428 parts = line.split(None, 4)
429
430 if len(parts) == 5:
431 (_, _, _, level, msg) = parts
432
433 force_update = self._ProcessSocatOutput(self._status_file, level, msg)
434
435 if self._debug or (level and level not in SOCAT_LOG_IGNORE):
436 forward_line = "socat: %s %s" % (level, msg)
437 else:
438 forward_line = None
439 else:
440 forward_line = "socat: %s" % line
441
442 elif prog == PROG_DD:
443 (should_forward, force_update) = self._ProcessDdOutput(line)
444
445 if should_forward or self._debug:
446 forward_line = "dd: %s" % line
447 else:
448 forward_line = None
449
450 elif prog == PROG_DD_PID:
451 if self._dd_pid:
452 raise RuntimeError("dd PID reported more than once")
453 logging.debug("Received dd PID %r", line)
454 self._dd_pid = int(line)
455 forward_line = None
456
457 elif prog == PROG_EXP_SIZE:
458 logging.debug("Received predicted size %r", line)
459 forward_line = None
460
461 if line:
462 try:
463 exp_size = utils.BytesToMebibyte(int(line))
464 except (ValueError, TypeError), err:
465 logging.error("Failed to convert predicted size %r to number: %s",
466 line, err)
467 exp_size = None
468 else:
469 exp_size = None
470
471 self._exp_size = exp_size
472
473 if forward_line:
474 self._logger.info(forward_line)
475 self._status_file.AddRecentOutput(forward_line)
476
477 self._status_file.Update(force_update)
478
479 @staticmethod
504
506 """Interprets a line of dd(1)'s output.
507
508 """
509 m = DD_INFO_RE.match(line)
510 if m:
511 seconds = float(m.group("seconds"))
512 mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
513 self._UpdateDdProgress(seconds, mbytes)
514 return (False, True)
515
516 m = DD_STDERR_IGNORE.match(line)
517 if m:
518
519 return (False, False)
520
521
522 return (True, False)
523
525 """Updates the internal status variables for dd(1) progress.
526
527 @type seconds: float
528 @param seconds: Timestamp of this update
529 @type mbytes: float
530 @param mbytes: Total number of MiB transferred so far
531
532 """
533
534 self._dd_progress.append((seconds, mbytes))
535
536
537 del self._dd_progress[:-self._dd_tp_samples]
538
539
540 throughput = _CalcThroughput(self._dd_progress)
541
542
543 percent = None
544 eta = None
545
546 if self._exp_size is not None:
547 if self._exp_size != 0:
548 percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
549
550 if throughput:
551 eta = max(0, float(self._exp_size - mbytes) / throughput)
552
553 self._status_file.SetProgress(mbytes, throughput, percent, eta)
554
557 """Calculates the throughput in MiB/second.
558
559 @type samples: sequence
560 @param samples: List of samples, each consisting of a (timestamp, mbytes)
561 tuple
562 @rtype: float or None
563 @return: Throughput in MiB/second
564
565 """
566 if len(samples) < 2:
567
568 return None
569
570 (start_time, start_mbytes) = samples[0]
571 (end_time, end_mbytes) = samples[-1]
572
573 return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)
574