1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with backup operations."""
32
33 import OpenSSL
34 import logging
35
36 from ganeti import compat
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import locking
40 from ganeti import masterd
41 from ganeti import utils
42 from ganeti.utils import retry
43
44 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit
45 from ganeti.cmdlib.common import CheckNodeOnline, ExpandNodeUuidAndName, \
46 IsInstanceRunning, DetermineImageSize
47 from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
48 ShutdownInstanceDisks, TemporaryDisk, ImageDisks
49 from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
50 BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance, \
51 CheckCompressionTool
52
53
55 """Prepares an instance for an export and returns useful information.
56
57 """
58 REQ_BGL = False
59
62
73
74 - def Exec(self, feedback_fn):
75 """Prepares an instance for an export.
76
77 """
78 if self.op.mode == constants.EXPORT_MODE_REMOTE:
79 salt = utils.GenerateSecret(8)
80
81 feedback_fn("Generating X509 certificate on %s" %
82 self.cfg.GetNodeName(self.instance.primary_node))
83 result = self.rpc.call_x509_cert_create(self.instance.primary_node,
84 constants.RIE_CERT_VALIDITY)
85 result.Raise("Can't create X509 key and certificate on %s" %
86 self.cfg.GetNodeName(result.node))
87
88 (name, cert_pem) = result.payload
89
90 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
91 cert_pem)
92
93 return {
94 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
95 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
96 salt),
97 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
98 }
99
100 return None
101
102
104 """Export an instance to an image in the cluster.
105
106 """
107 HPATH = "instance-export"
108 HTYPE = constants.HTYPE_INSTANCE
109 REQ_BGL = False
110
112 """Check the arguments.
113
114 """
115 self.x509_key_name = self.op.x509_key_name
116 self.dest_x509_ca_pem = self.op.destination_x509_ca
117
118 if self.op.mode == constants.EXPORT_MODE_REMOTE:
119 if not self.x509_key_name:
120 raise errors.OpPrereqError("Missing X509 key name for encryption",
121 errors.ECODE_INVAL)
122
123 if not self.dest_x509_ca_pem:
124 raise errors.OpPrereqError("Missing destination X509 CA",
125 errors.ECODE_INVAL)
126
127 if self.op.zero_free_space and not self.op.compress:
128 raise errors.OpPrereqError("Zeroing free space does not make sense "
129 "unless compression is used")
130
131 if self.op.zero_free_space and not self.op.shutdown:
132 raise errors.OpPrereqError("Unless the instance is shut down, zeroing "
133 "cannot be used.")
134
164
166 """Last minute lock declaration."""
167
168
170 """Build hooks env.
171
172 This will run on the master, primary node and target node.
173
174 """
175 env = {
176 "EXPORT_MODE": self.op.mode,
177 "EXPORT_NODE": self.op.target_node,
178 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
179 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
180
181 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
182 }
183
184 env.update(BuildInstanceHookEnvByObject(
185 self, self.instance,
186 secondary_nodes=self.secondary_nodes, disks=self.inst_disks))
187
188 return env
189
200
202 """Check prerequisites.
203
204 This checks that the instance and node names are valid.
205
206 """
207 self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
208 assert self.instance is not None, \
209 "Cannot retrieve locked instance %s" % self.op.instance_name
210 CheckNodeOnline(self, self.instance.primary_node)
211
212 if (self.op.remove_instance and
213 self.instance.admin_state == constants.ADMINST_UP and
214 not self.op.shutdown):
215 raise errors.OpPrereqError("Can not remove instance without shutting it"
216 " down before", errors.ECODE_STATE)
217
218 if self.op.mode == constants.EXPORT_MODE_LOCAL:
219 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
220 assert self.dst_node is not None
221
222 CheckNodeOnline(self, self.dst_node.uuid)
223 CheckNodeNotDrained(self, self.dst_node.uuid)
224
225 self._cds = None
226 self.dest_disk_info = None
227 self.dest_x509_ca = None
228
229 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
230 self.dst_node = None
231
232 if len(self.op.target_node) != len(self.instance.disks):
233 raise errors.OpPrereqError(("Received destination information for %s"
234 " disks, but instance %s has %s disks") %
235 (len(self.op.target_node),
236 self.op.instance_name,
237 len(self.instance.disks)),
238 errors.ECODE_INVAL)
239
240 cds = GetClusterDomainSecret()
241
242
243 try:
244 (key_name, hmac_digest, hmac_salt) = self.x509_key_name
245 except (TypeError, ValueError), err:
246 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
247 errors.ECODE_INVAL)
248
249 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
250 raise errors.OpPrereqError("HMAC for X509 key name is wrong",
251 errors.ECODE_INVAL)
252
253
254 try:
255 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
256 except OpenSSL.crypto.Error, err:
257 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
258 (err, ), errors.ECODE_INVAL)
259
260 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
261 if errcode is not None:
262 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
263 (msg, ), errors.ECODE_INVAL)
264
265 self.dest_x509_ca = cert
266
267
268 disk_info = []
269 for idx, disk_data in enumerate(self.op.target_node):
270 try:
271 (host, port, magic) = \
272 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
273 except errors.GenericError, err:
274 raise errors.OpPrereqError("Target info for disk %s: %s" %
275 (idx, err), errors.ECODE_INVAL)
276
277 disk_info.append((host, port, magic))
278
279 assert len(disk_info) == len(self.op.target_node)
280 self.dest_disk_info = disk_info
281
282 else:
283 raise errors.ProgrammerError("Unhandled export mode %r" %
284 self.op.mode)
285
286
287
288 for disk in self.cfg.GetInstanceDisks(self.instance.uuid):
289 if disk.dev_type in constants.DTS_FILEBASED:
290 raise errors.OpPrereqError("Export not supported for instances with"
291 " file-based disks", errors.ECODE_INVAL)
292
293
294 if self.op.zero_free_space:
295
296 hvparams = self.cfg.GetClusterInfo().FillHV(self.instance)
297 if self.instance.hypervisor == constants.HT_KVM and \
298 not hvparams.get(constants.HV_KVM_USER_SHUTDOWN, False):
299 raise errors.OpPrereqError("Instance shutdown detection must be "
300 "enabled for zeroing to work")
301
302
303 if constants.HV_BOOT_ORDER in hvparams and \
304 hvparams[constants.HV_BOOT_ORDER] != constants.HT_BO_DISK:
305 raise errors.OpPrereqError("Booting from disk must be set for zeroing "
306 "to work")
307
308
309 if not self.cfg.GetZeroingImage():
310 raise errors.OpPrereqError("A zeroing image must be set for zeroing to"
311 " work")
312
313 if self.op.zeroing_timeout_fixed is None:
314 self.op.zeroing_timeout_fixed = constants.HELPER_VM_STARTUP
315
316 if self.op.zeroing_timeout_per_mib is None:
317 self.op.zeroing_timeout_per_mib = constants.ZEROING_TIMEOUT_PER_MIB
318
319 else:
320 if (self.op.zeroing_timeout_fixed is not None or
321 self.op.zeroing_timeout_per_mib is not None):
322 raise errors.OpPrereqError("Zeroing timeout options can only be used"
323 " only with the --zero-free-space option")
324
325 self.secondary_nodes = \
326 self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
327 self.inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
328
329
330 CheckCompressionTool(self, self.op.compress)
331
333 """Removes exports of current instance from all other nodes.
334
335 If an instance in a cluster with nodes A..D was exported to node C, its
336 exports will be removed from the nodes A, B and D.
337
338 """
339 assert self.op.mode != constants.EXPORT_MODE_REMOTE
340
341 node_uuids = self.cfg.GetNodeList()
342 node_uuids.remove(self.dst_node.uuid)
343
344
345
346
347 iname = self.instance.name
348 if node_uuids:
349 feedback_fn("Removing old exports for instance %s" % iname)
350 exportlist = self.rpc.call_export_list(node_uuids)
351 for node_uuid in exportlist:
352 if exportlist[node_uuid].fail_msg:
353 continue
354 if iname in exportlist[node_uuid].payload:
355 msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
356 if msg:
357 self.LogWarning("Could not remove older export for instance %s"
358 " on node %s: %s", iname,
359 self.cfg.GetNodeName(node_uuid), msg)
360
362 """Calculates the size of all the disks of the instance used in this LU.
363
364 @rtype: int
365 @return: Size of the disks in MiB
366
367 """
368 inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
369 return sum([d.size for d in inst_disks])
370
372 """Zeroes the free space on a shutdown instance.
373
374 @type feedback_fn: function
375 @param feedback_fn: Function used to log progress
376
377 """
378 assert self.op.zeroing_timeout_fixed is not None
379 assert self.op.zeroing_timeout_per_mib is not None
380
381 zeroing_image = self.cfg.GetZeroingImage()
382 src_node_uuid = self.instance.primary_node
383
384 try:
385 disk_size = DetermineImageSize(self, zeroing_image, src_node_uuid)
386 except errors.OpExecError, err:
387 raise errors.OpExecError("Could not create temporary disk for zeroing:"
388 " %s", err)
389
390
391 instance_disks_size_sum = self._InstanceDiskSizeSum()
392
393 with TemporaryDisk(self,
394 self.instance,
395 [(constants.DT_PLAIN, constants.DISK_RDWR, disk_size)],
396 feedback_fn):
397 feedback_fn("Activating instance disks")
398 StartInstanceDisks(self, self.instance, False)
399
400 feedback_fn("Imaging disk with zeroing image")
401 ImageDisks(self, self.instance, zeroing_image)
402
403 feedback_fn("Starting instance with zeroing image")
404 result = self.rpc.call_instance_start(src_node_uuid,
405 (self.instance, [], []),
406 False, self.op.reason)
407 result.Raise("Could not start instance %s when using the zeroing image "
408 "%s" % (self.instance.name, zeroing_image))
409
410
411 running_check = lambda: IsInstanceRunning(self, self.instance,
412 prereq=False)
413 instance_up = retry.SimpleRetry(True, running_check, 5.0,
414 self.op.shutdown_timeout)
415 if not instance_up:
416 raise errors.OpExecError("Could not boot instance when using the "
417 "zeroing image %s" % zeroing_image)
418
419 feedback_fn("Instance is up, now awaiting shutdown")
420
421
422 timeout = self.op.zeroing_timeout_fixed + \
423 self.op.zeroing_timeout_per_mib * instance_disks_size_sum
424 instance_up = retry.SimpleRetry(False, running_check, 20.0, timeout)
425 if instance_up:
426 self.LogWarning("Zeroing not completed prior to timeout; instance will"
427 "be shut down forcibly")
428
429 feedback_fn("Zeroing completed!")
430
431 - def Exec(self, feedback_fn):
432 """Export an instance to an image in the cluster.
433
434 """
435 assert self.op.mode in constants.EXPORT_MODES
436
437 src_node_uuid = self.instance.primary_node
438
439 if self.op.shutdown:
440
441 feedback_fn("Shutting down instance %s" % self.instance.name)
442 result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
443 self.op.shutdown_timeout,
444 self.op.reason)
445
446 result.Raise("Could not shutdown instance %s on"
447 " node %s" % (self.instance.name,
448 self.cfg.GetNodeName(src_node_uuid)))
449
450 if self.op.zero_free_space:
451 self.ZeroFreeSpace(feedback_fn)
452
453 activate_disks = not self.instance.disks_active
454
455 if activate_disks:
456
457 feedback_fn("Activating disks for %s" % self.instance.name)
458 StartInstanceDisks(self, self.instance, None)
459 self.instance = self.cfg.GetInstanceInfo(self.instance.uuid)
460
461 try:
462 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
463 self.instance)
464
465 helper.CreateSnapshots()
466 try:
467 if (self.op.shutdown and
468 self.instance.admin_state == constants.ADMINST_UP and
469 not self.op.remove_instance):
470 assert self.instance.disks_active
471 feedback_fn("Starting instance %s" % self.instance.name)
472 result = self.rpc.call_instance_start(src_node_uuid,
473 (self.instance, None, None),
474 False, self.op.reason)
475 msg = result.fail_msg
476 if msg:
477 feedback_fn("Failed to start instance: %s" % msg)
478 ShutdownInstanceDisks(self, self.instance)
479 raise errors.OpExecError("Could not start instance: %s" % msg)
480
481 if self.op.mode == constants.EXPORT_MODE_LOCAL:
482 (fin_resu, dresults) = helper.LocalExport(self.dst_node,
483 self.op.compress)
484 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
485 connect_timeout = constants.RIE_CONNECT_TIMEOUT
486 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
487
488 (key_name, _, _) = self.x509_key_name
489
490 dest_ca_pem = \
491 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
492 self.dest_x509_ca)
493
494 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
495 key_name, dest_ca_pem,
496 self.op.compress,
497 timeouts)
498 finally:
499 helper.Cleanup()
500
501
502 assert len(dresults) == len(self.instance.disks)
503 assert compat.all(isinstance(i, bool) for i in dresults), \
504 "Not all results are boolean: %r" % dresults
505
506 finally:
507 if activate_disks:
508 feedback_fn("Deactivating disks for %s" % self.instance.name)
509 ShutdownInstanceDisks(self, self.instance)
510
511 if not (compat.all(dresults) and fin_resu):
512 failures = []
513 if not fin_resu:
514 failures.append("export finalization")
515 if not compat.all(dresults):
516 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
517 if not dsk)
518 failures.append("disk export: disk(s) %s" % fdsk)
519
520 raise errors.OpExecError("Export failed, errors in %s" %
521 utils.CommaJoin(failures))
522
523
524
525
526 if self.op.remove_instance:
527 feedback_fn("Removing instance %s" % self.instance.name)
528 RemoveInstance(self, feedback_fn, self.instance,
529 self.op.ignore_remove_failures)
530
531 if self.op.mode == constants.EXPORT_MODE_LOCAL:
532 self._CleanupExports(feedback_fn)
533
534 return fin_resu, dresults
535
536
538 """Remove exports related to the named instance.
539
540 """
541 REQ_BGL = False
542
557
558 - def Exec(self, feedback_fn):
559 """Remove any export.
560
561 """
562 (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
563
564
565 fqdn_warn = False
566 if not inst_name:
567 fqdn_warn = True
568 inst_name = self.op.instance_name
569
570 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
571 exportlist = self.rpc.call_export_list(locked_nodes)
572 found = False
573 for node_uuid in exportlist:
574 msg = exportlist[node_uuid].fail_msg
575 if msg:
576 self.LogWarning("Failed to query node %s (continuing): %s",
577 self.cfg.GetNodeName(node_uuid), msg)
578 continue
579 if inst_name in exportlist[node_uuid].payload:
580 found = True
581 result = self.rpc.call_export_remove(node_uuid, inst_name)
582 msg = result.fail_msg
583 if msg:
584 logging.error("Could not remove export for instance %s"
585 " on node %s: %s", inst_name,
586 self.cfg.GetNodeName(node_uuid), msg)
587
588 if fqdn_warn and not found:
589 feedback_fn("Export not found. If trying to remove an export belonging"
590 " to a deleted instance please use its Fully Qualified"
591 " Domain Name.")
592