1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Logical units dealing with backup operations."""
23
24 import OpenSSL
25 import logging
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import locking
31 from ganeti import masterd
32 from ganeti import qlang
33 from ganeti import query
34 from ganeti import utils
35
36 from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37 from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38 ExpandNodeUuidAndName
39 from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40 ShutdownInstanceDisks
41 from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42 BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43
44
46 FIELDS = query.EXPORT_FIELDS
47
48
49 SORT_FIELD = "node"
50
52 lu.needed_locks = {}
53
54
55 if self.names:
56 (self.wanted, _) = GetWantedNodes(lu, self.names)
57 else:
58 self.wanted = locking.ALL_SET
59
60 self.do_locking = self.use_locking
61
62 if self.do_locking:
63 lu.share_locks = ShareAll()
64 lu.needed_locks = {
65 locking.LEVEL_NODE: self.wanted,
66 }
67
68 if not self.names:
69 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70
73
75 """Computes the list of nodes and their attributes.
76
77 """
78
79
80 assert not (compat.any(lu.glm.is_owned(level)
81 for level in locking.LEVELS
82 if level != locking.LEVEL_CLUSTER) or
83 self.do_locking or self.use_locking)
84
85 node_uuids = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86
87 result = []
88
89 for (node_uuid, nres) in lu.rpc.call_export_list(node_uuids).items():
90 if nres.fail_msg:
91 result.append((node_uuid, None))
92 else:
93 result.extend((node_uuid, expname) for expname in nres.payload)
94
95 return result
96
97
99 """Query the exports list
100
101 """
102 REQ_BGL = False
103
107
110
113
114 - def Exec(self, feedback_fn):
115 result = {}
116
117 for (node, expname) in self.expq.OldStyleQuery(self):
118 if expname is None:
119 result[node] = False
120 else:
121 result.setdefault(node, []).append(expname)
122
123 return result
124
125
127 """Prepares an instance for an export and returns useful information.
128
129 """
130 REQ_BGL = False
131
134
145
146 - def Exec(self, feedback_fn):
147 """Prepares an instance for an export.
148
149 """
150 if self.op.mode == constants.EXPORT_MODE_REMOTE:
151 salt = utils.GenerateSecret(8)
152
153 feedback_fn("Generating X509 certificate on %s" %
154 self.cfg.GetNodeName(self.instance.primary_node))
155 result = self.rpc.call_x509_cert_create(self.instance.primary_node,
156 constants.RIE_CERT_VALIDITY)
157 result.Raise("Can't create X509 key and certificate on %s" %
158 self.cfg.GetNodeName(result.node))
159
160 (name, cert_pem) = result.payload
161
162 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
163 cert_pem)
164
165 return {
166 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
167 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
168 salt),
169 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
170 }
171
172 return None
173
174
176 """Export an instance to an image in the cluster.
177
178 """
179 HPATH = "instance-export"
180 HTYPE = constants.HTYPE_INSTANCE
181 REQ_BGL = False
182
198
221
223 """Last minute lock declaration."""
224
225
227 """Build hooks env.
228
229 This will run on the master, primary node and target node.
230
231 """
232 env = {
233 "EXPORT_MODE": self.op.mode,
234 "EXPORT_NODE": self.op.target_node,
235 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
236 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
237
238 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
239 }
240
241 env.update(BuildInstanceHookEnvByObject(self, self.instance))
242
243 return env
244
255
257 """Check prerequisites.
258
259 This checks that the instance and node names are valid.
260
261 """
262 self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
263 assert self.instance is not None, \
264 "Cannot retrieve locked instance %s" % self.op.instance_name
265 CheckNodeOnline(self, self.instance.primary_node)
266
267 if (self.op.remove_instance and
268 self.instance.admin_state == constants.ADMINST_UP and
269 not self.op.shutdown):
270 raise errors.OpPrereqError("Can not remove instance without shutting it"
271 " down before", errors.ECODE_STATE)
272
273 if self.op.mode == constants.EXPORT_MODE_LOCAL:
274 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
275 assert self.dst_node is not None
276
277 CheckNodeOnline(self, self.dst_node.uuid)
278 CheckNodeNotDrained(self, self.dst_node.uuid)
279
280 self._cds = None
281 self.dest_disk_info = None
282 self.dest_x509_ca = None
283
284 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
285 self.dst_node = None
286
287 if len(self.op.target_node) != len(self.instance.disks):
288 raise errors.OpPrereqError(("Received destination information for %s"
289 " disks, but instance %s has %s disks") %
290 (len(self.op.target_node),
291 self.op.instance_name,
292 len(self.instance.disks)),
293 errors.ECODE_INVAL)
294
295 cds = GetClusterDomainSecret()
296
297
298 try:
299 (key_name, hmac_digest, hmac_salt) = self.x509_key_name
300 except (TypeError, ValueError), err:
301 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
302 errors.ECODE_INVAL)
303
304 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
305 raise errors.OpPrereqError("HMAC for X509 key name is wrong",
306 errors.ECODE_INVAL)
307
308
309 try:
310 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
311 except OpenSSL.crypto.Error, err:
312 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
313 (err, ), errors.ECODE_INVAL)
314
315 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
316 if errcode is not None:
317 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
318 (msg, ), errors.ECODE_INVAL)
319
320 self.dest_x509_ca = cert
321
322
323 disk_info = []
324 for idx, disk_data in enumerate(self.op.target_node):
325 try:
326 (host, port, magic) = \
327 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
328 except errors.GenericError, err:
329 raise errors.OpPrereqError("Target info for disk %s: %s" %
330 (idx, err), errors.ECODE_INVAL)
331
332 disk_info.append((host, port, magic))
333
334 assert len(disk_info) == len(self.op.target_node)
335 self.dest_disk_info = disk_info
336
337 else:
338 raise errors.ProgrammerError("Unhandled export mode %r" %
339 self.op.mode)
340
341
342
343 for disk in self.instance.disks:
344 if disk.dev_type in [constants.DT_FILE, constants.DT_SHARED_FILE]:
345 raise errors.OpPrereqError("Export not supported for instances with"
346 " file-based disks", errors.ECODE_INVAL)
347
349 """Removes exports of current instance from all other nodes.
350
351 If an instance in a cluster with nodes A..D was exported to node C, its
352 exports will be removed from the nodes A, B and D.
353
354 """
355 assert self.op.mode != constants.EXPORT_MODE_REMOTE
356
357 node_uuids = self.cfg.GetNodeList()
358 node_uuids.remove(self.dst_node.uuid)
359
360
361
362
363 iname = self.instance.name
364 if node_uuids:
365 feedback_fn("Removing old exports for instance %s" % iname)
366 exportlist = self.rpc.call_export_list(node_uuids)
367 for node_uuid in exportlist:
368 if exportlist[node_uuid].fail_msg:
369 continue
370 if iname in exportlist[node_uuid].payload:
371 msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
372 if msg:
373 self.LogWarning("Could not remove older export for instance %s"
374 " on node %s: %s", iname,
375 self.cfg.GetNodeName(node_uuid), msg)
376
377 - def Exec(self, feedback_fn):
378 """Export an instance to an image in the cluster.
379
380 """
381 assert self.op.mode in constants.EXPORT_MODES
382
383 src_node_uuid = self.instance.primary_node
384
385 if self.op.shutdown:
386
387 feedback_fn("Shutting down instance %s" % self.instance.name)
388 result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
389 self.op.shutdown_timeout,
390 self.op.reason)
391
392 result.Raise("Could not shutdown instance %s on"
393 " node %s" % (self.instance.name,
394 self.cfg.GetNodeName(src_node_uuid)))
395
396
397
398 for disk in self.instance.disks:
399 self.cfg.SetDiskID(disk, src_node_uuid)
400
401 activate_disks = not self.instance.disks_active
402
403 if activate_disks:
404
405 feedback_fn("Activating disks for %s" % self.instance.name)
406 StartInstanceDisks(self, self.instance, None)
407
408 try:
409 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
410 self.instance)
411
412 helper.CreateSnapshots()
413 try:
414 if (self.op.shutdown and
415 self.instance.admin_state == constants.ADMINST_UP and
416 not self.op.remove_instance):
417 assert not activate_disks
418 feedback_fn("Starting instance %s" % self.instance.name)
419 result = self.rpc.call_instance_start(src_node_uuid,
420 (self.instance, None, None),
421 False, self.op.reason)
422 msg = result.fail_msg
423 if msg:
424 feedback_fn("Failed to start instance: %s" % msg)
425 ShutdownInstanceDisks(self, self.instance)
426 raise errors.OpExecError("Could not start instance: %s" % msg)
427
428 if self.op.mode == constants.EXPORT_MODE_LOCAL:
429 (fin_resu, dresults) = helper.LocalExport(self.dst_node)
430 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
431 connect_timeout = constants.RIE_CONNECT_TIMEOUT
432 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
433
434 (key_name, _, _) = self.x509_key_name
435
436 dest_ca_pem = \
437 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
438 self.dest_x509_ca)
439
440 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
441 key_name, dest_ca_pem,
442 timeouts)
443 finally:
444 helper.Cleanup()
445
446
447 assert len(dresults) == len(self.instance.disks)
448 assert compat.all(isinstance(i, bool) for i in dresults), \
449 "Not all results are boolean: %r" % dresults
450
451 finally:
452 if activate_disks:
453 feedback_fn("Deactivating disks for %s" % self.instance.name)
454 ShutdownInstanceDisks(self, self.instance)
455
456 if not (compat.all(dresults) and fin_resu):
457 failures = []
458 if not fin_resu:
459 failures.append("export finalization")
460 if not compat.all(dresults):
461 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
462 if not dsk)
463 failures.append("disk export: disk(s) %s" % fdsk)
464
465 raise errors.OpExecError("Export failed, errors in %s" %
466 utils.CommaJoin(failures))
467
468
469
470
471 if self.op.remove_instance:
472 feedback_fn("Removing instance %s" % self.instance.name)
473 RemoveInstance(self, feedback_fn, self.instance,
474 self.op.ignore_remove_failures)
475
476 if self.op.mode == constants.EXPORT_MODE_LOCAL:
477 self._CleanupExports(feedback_fn)
478
479 return fin_resu, dresults
480
481
483 """Remove exports related to the named instance.
484
485 """
486 REQ_BGL = False
487
502
503 - def Exec(self, feedback_fn):
504 """Remove any export.
505
506 """
507 (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
508
509
510 fqdn_warn = False
511 if not inst_name:
512 fqdn_warn = True
513 inst_name = self.op.instance_name
514
515 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
516 exportlist = self.rpc.call_export_list(locked_nodes)
517 found = False
518 for node_uuid in exportlist:
519 msg = exportlist[node_uuid].fail_msg
520 if msg:
521 self.LogWarning("Failed to query node %s (continuing): %s",
522 self.cfg.GetNodeName(node_uuid), msg)
523 continue
524 if inst_name in exportlist[node_uuid].payload:
525 found = True
526 result = self.rpc.call_export_remove(node_uuid, inst_name)
527 msg = result.fail_msg
528 if msg:
529 logging.error("Could not remove export for instance %s"
530 " on node %s: %s", inst_name,
531 self.cfg.GetNodeName(node_uuid), msg)
532
533 if fqdn_warn and not found:
534 feedback_fn("Export not found. If trying to remove an export belonging"
535 " to a deleted instance please use its Fully Qualified"
536 " Domain Name.")
537