1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 """Logical units dealing with backup operations."""
32
33 import OpenSSL
34 import logging
35
36 from ganeti import compat
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import locking
40 from ganeti import masterd
41 from ganeti import qlang
42 from ganeti import query
43 from ganeti import utils
44
45 from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
46 from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
47 ExpandNodeUuidAndName
48 from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
49 ShutdownInstanceDisks
50 from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
51 BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
52
53
55 FIELDS = query.EXPORT_FIELDS
56
57
58 SORT_FIELD = "node"
59
61 lu.needed_locks = {}
62
63
64 if self.names:
65 (self.wanted, _) = GetWantedNodes(lu, self.names)
66 else:
67 self.wanted = locking.ALL_SET
68
69 self.do_locking = self.use_locking
70
71 if self.do_locking:
72 lu.share_locks = ShareAll()
73 lu.needed_locks = {
74 locking.LEVEL_NODE: self.wanted,
75 }
76
77 if not self.names:
78 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
79
82
84 """Computes the list of nodes and their attributes.
85
86 """
87
88
89 assert not (compat.any(lu.glm.is_owned(level)
90 for level in locking.LEVELS
91 if level != locking.LEVEL_CLUSTER) or
92 self.do_locking or self.use_locking)
93
94 node_uuids = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
95
96 result = []
97 for (node_uuid, nres) in lu.rpc.call_export_list(node_uuids).items():
98 node = lu.cfg.GetNodeInfo(node_uuid)
99 if nres.fail_msg:
100 result.append((node.name, None))
101 else:
102 result.extend((node.name, expname) for expname in nres.payload)
103
104 return result
105
106
108 """Query the exports list
109
110 """
111 REQ_BGL = False
112
116
119
122
123 - def Exec(self, feedback_fn):
124 result = {}
125
126 for (node, expname) in self.expq.OldStyleQuery(self):
127 if expname is None:
128 result[node] = False
129 else:
130 result.setdefault(node, []).append(expname)
131
132 return result
133
134
136 """Prepares an instance for an export and returns useful information.
137
138 """
139 REQ_BGL = False
140
143
154
155 - def Exec(self, feedback_fn):
156 """Prepares an instance for an export.
157
158 """
159 if self.op.mode == constants.EXPORT_MODE_REMOTE:
160 salt = utils.GenerateSecret(8)
161
162 feedback_fn("Generating X509 certificate on %s" %
163 self.cfg.GetNodeName(self.instance.primary_node))
164 result = self.rpc.call_x509_cert_create(self.instance.primary_node,
165 constants.RIE_CERT_VALIDITY)
166 result.Raise("Can't create X509 key and certificate on %s" %
167 self.cfg.GetNodeName(result.node))
168
169 (name, cert_pem) = result.payload
170
171 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
172 cert_pem)
173
174 return {
175 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
176 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
177 salt),
178 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
179 }
180
181 return None
182
183
185 """Export an instance to an image in the cluster.
186
187 """
188 HPATH = "instance-export"
189 HTYPE = constants.HTYPE_INSTANCE
190 REQ_BGL = False
191
207
230
232 """Last minute lock declaration."""
233
234
236 """Build hooks env.
237
238 This will run on the master, primary node and target node.
239
240 """
241 env = {
242 "EXPORT_MODE": self.op.mode,
243 "EXPORT_NODE": self.op.target_node,
244 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
245 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
246
247 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
248 }
249
250 env.update(BuildInstanceHookEnvByObject(self, self.instance))
251
252 return env
253
264
266 """Check prerequisites.
267
268 This checks that the instance and node names are valid.
269
270 """
271 self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
272 assert self.instance is not None, \
273 "Cannot retrieve locked instance %s" % self.op.instance_name
274 CheckNodeOnline(self, self.instance.primary_node)
275
276 if (self.op.remove_instance and
277 self.instance.admin_state == constants.ADMINST_UP and
278 not self.op.shutdown):
279 raise errors.OpPrereqError("Can not remove instance without shutting it"
280 " down before", errors.ECODE_STATE)
281
282 if self.op.mode == constants.EXPORT_MODE_LOCAL:
283 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
284 assert self.dst_node is not None
285
286 CheckNodeOnline(self, self.dst_node.uuid)
287 CheckNodeNotDrained(self, self.dst_node.uuid)
288
289 self._cds = None
290 self.dest_disk_info = None
291 self.dest_x509_ca = None
292
293 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
294 self.dst_node = None
295
296 if len(self.op.target_node) != len(self.instance.disks):
297 raise errors.OpPrereqError(("Received destination information for %s"
298 " disks, but instance %s has %s disks") %
299 (len(self.op.target_node),
300 self.op.instance_name,
301 len(self.instance.disks)),
302 errors.ECODE_INVAL)
303
304 cds = GetClusterDomainSecret()
305
306
307 try:
308 (key_name, hmac_digest, hmac_salt) = self.x509_key_name
309 except (TypeError, ValueError), err:
310 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
311 errors.ECODE_INVAL)
312
313 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
314 raise errors.OpPrereqError("HMAC for X509 key name is wrong",
315 errors.ECODE_INVAL)
316
317
318 try:
319 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
320 except OpenSSL.crypto.Error, err:
321 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
322 (err, ), errors.ECODE_INVAL)
323
324 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
325 if errcode is not None:
326 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
327 (msg, ), errors.ECODE_INVAL)
328
329 self.dest_x509_ca = cert
330
331
332 disk_info = []
333 for idx, disk_data in enumerate(self.op.target_node):
334 try:
335 (host, port, magic) = \
336 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
337 except errors.GenericError, err:
338 raise errors.OpPrereqError("Target info for disk %s: %s" %
339 (idx, err), errors.ECODE_INVAL)
340
341 disk_info.append((host, port, magic))
342
343 assert len(disk_info) == len(self.op.target_node)
344 self.dest_disk_info = disk_info
345
346 else:
347 raise errors.ProgrammerError("Unhandled export mode %r" %
348 self.op.mode)
349
350
351
352 for disk in self.instance.disks:
353 if disk.dev_type in [constants.DT_FILE, constants.DT_SHARED_FILE]:
354 raise errors.OpPrereqError("Export not supported for instances with"
355 " file-based disks", errors.ECODE_INVAL)
356
358 """Removes exports of current instance from all other nodes.
359
360 If an instance in a cluster with nodes A..D was exported to node C, its
361 exports will be removed from the nodes A, B and D.
362
363 """
364 assert self.op.mode != constants.EXPORT_MODE_REMOTE
365
366 node_uuids = self.cfg.GetNodeList()
367 node_uuids.remove(self.dst_node.uuid)
368
369
370
371
372 iname = self.instance.name
373 if node_uuids:
374 feedback_fn("Removing old exports for instance %s" % iname)
375 exportlist = self.rpc.call_export_list(node_uuids)
376 for node_uuid in exportlist:
377 if exportlist[node_uuid].fail_msg:
378 continue
379 if iname in exportlist[node_uuid].payload:
380 msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
381 if msg:
382 self.LogWarning("Could not remove older export for instance %s"
383 " on node %s: %s", iname,
384 self.cfg.GetNodeName(node_uuid), msg)
385
386 - def Exec(self, feedback_fn):
387 """Export an instance to an image in the cluster.
388
389 """
390 assert self.op.mode in constants.EXPORT_MODES
391
392 src_node_uuid = self.instance.primary_node
393
394 if self.op.shutdown:
395
396 feedback_fn("Shutting down instance %s" % self.instance.name)
397 result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
398 self.op.shutdown_timeout,
399 self.op.reason)
400
401 result.Raise("Could not shutdown instance %s on"
402 " node %s" % (self.instance.name,
403 self.cfg.GetNodeName(src_node_uuid)))
404
405 activate_disks = not self.instance.disks_active
406
407 if activate_disks:
408
409 feedback_fn("Activating disks for %s" % self.instance.name)
410 StartInstanceDisks(self, self.instance, None)
411
412 try:
413 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
414 self.instance)
415
416 helper.CreateSnapshots()
417 try:
418 if (self.op.shutdown and
419 self.instance.admin_state == constants.ADMINST_UP and
420 not self.op.remove_instance):
421 assert not activate_disks
422 feedback_fn("Starting instance %s" % self.instance.name)
423 result = self.rpc.call_instance_start(src_node_uuid,
424 (self.instance, None, None),
425 False, self.op.reason)
426 msg = result.fail_msg
427 if msg:
428 feedback_fn("Failed to start instance: %s" % msg)
429 ShutdownInstanceDisks(self, self.instance)
430 raise errors.OpExecError("Could not start instance: %s" % msg)
431
432 if self.op.mode == constants.EXPORT_MODE_LOCAL:
433 (fin_resu, dresults) = helper.LocalExport(self.dst_node)
434 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
435 connect_timeout = constants.RIE_CONNECT_TIMEOUT
436 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
437
438 (key_name, _, _) = self.x509_key_name
439
440 dest_ca_pem = \
441 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
442 self.dest_x509_ca)
443
444 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
445 key_name, dest_ca_pem,
446 timeouts)
447 finally:
448 helper.Cleanup()
449
450
451 assert len(dresults) == len(self.instance.disks)
452 assert compat.all(isinstance(i, bool) for i in dresults), \
453 "Not all results are boolean: %r" % dresults
454
455 finally:
456 if activate_disks:
457 feedback_fn("Deactivating disks for %s" % self.instance.name)
458 ShutdownInstanceDisks(self, self.instance)
459
460 if not (compat.all(dresults) and fin_resu):
461 failures = []
462 if not fin_resu:
463 failures.append("export finalization")
464 if not compat.all(dresults):
465 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
466 if not dsk)
467 failures.append("disk export: disk(s) %s" % fdsk)
468
469 raise errors.OpExecError("Export failed, errors in %s" %
470 utils.CommaJoin(failures))
471
472
473
474
475 if self.op.remove_instance:
476 feedback_fn("Removing instance %s" % self.instance.name)
477 RemoveInstance(self, feedback_fn, self.instance,
478 self.op.ignore_remove_failures)
479
480 if self.op.mode == constants.EXPORT_MODE_LOCAL:
481 self._CleanupExports(feedback_fn)
482
483 return fin_resu, dresults
484
485
487 """Remove exports related to the named instance.
488
489 """
490 REQ_BGL = False
491
506
507 - def Exec(self, feedback_fn):
508 """Remove any export.
509
510 """
511 (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
512
513
514 fqdn_warn = False
515 if not inst_name:
516 fqdn_warn = True
517 inst_name = self.op.instance_name
518
519 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
520 exportlist = self.rpc.call_export_list(locked_nodes)
521 found = False
522 for node_uuid in exportlist:
523 msg = exportlist[node_uuid].fail_msg
524 if msg:
525 self.LogWarning("Failed to query node %s (continuing): %s",
526 self.cfg.GetNodeName(node_uuid), msg)
527 continue
528 if inst_name in exportlist[node_uuid].payload:
529 found = True
530 result = self.rpc.call_export_remove(node_uuid, inst_name)
531 msg = result.fail_msg
532 if msg:
533 logging.error("Could not remove export for instance %s"
534 " on node %s: %s", inst_name,
535 self.cfg.GetNodeName(node_uuid), msg)
536
537 if fqdn_warn and not found:
538 feedback_fn("Export not found. If trying to remove an export belonging"
539 " to a deleted instance please use its Fully Qualified"
540 " Domain Name.")
541