Package ganeti :: Package cmdlib :: Module test
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.test

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Test logical units.""" 
 32   
 33  import logging 
 34  import shutil 
 35  import socket 
 36  import tempfile 
 37  import time 
 38   
 39  from ganeti import compat 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import locking 
 43  from ganeti import utils 
 44  from ganeti.masterd import iallocator 
 45  from ganeti.cmdlib.base import NoHooksLU 
 46  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \ 
 47    GetWantedInstances 
48 49 50 -class TestSocketWrapper(object):
51 """ Utility class that opens a domain socket and cleans up as needed. 52 53 """
54 - def __init__(self):
55 """ Constructor cleaning up variables to be used. 56 57 """ 58 self.tmpdir = None 59 self.sock = None
60
61 - def Create(self, max_connections=1):
62 """ Creates a bound and ready socket, cleaning up in case of failure. 63 64 @type max_connections: int 65 @param max_connections: The number of max connections allowed for the 66 socket. 67 68 @rtype: tuple of socket, string 69 @return: The socket object and the path to reach it with. 70 71 """ 72 # Using a temporary directory as there's no easy way to create temporary 73 # sockets without writing a custom loop around tempfile.mktemp and 74 # socket.bind 75 self.tmpdir = tempfile.mkdtemp() 76 try: 77 tmpsock = utils.PathJoin(self.tmpdir, "sock") 78 logging.debug("Creating temporary socket at %s", tmpsock) 79 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 80 try: 81 self.sock.bind(tmpsock) 82 self.sock.listen(max_connections) 83 except: 84 self.sock.close() 85 raise 86 except: 87 shutil.rmtree(self.tmpdir) 88 raise 89 90 return self.sock, tmpsock
91
92 - def Destroy(self):
93 """ Destroys the socket and performs all necessary cleanup. 94 95 """ 96 if self.tmpdir is None or self.sock is None: 97 raise Exception("A socket must be created successfully before attempting " 98 "its destruction") 99 100 try: 101 self.sock.close() 102 finally: 103 shutil.rmtree(self.tmpdir)
104
105 106 -class LUTestDelay(NoHooksLU):
107 """Sleep for a specified amount of time. 108 109 This LU sleeps on the master and/or nodes for a specified amount of 110 time. 111 112 """ 113 REQ_BGL = False 114
115 - def ExpandNames(self):
116 """Expand names and set required locks. 117 118 This expands the node list, if any. 119 120 """ 121 self.needed_locks = {} 122 123 if self.op.duration <= 0: 124 raise errors.OpPrereqError("Duration must be greater than zero") 125 126 if not self.op.no_locks and (self.op.on_nodes or self.op.on_master): 127 self.needed_locks[locking.LEVEL_NODE] = [] 128 129 self.op.on_node_uuids = [] 130 if self.op.on_nodes: 131 # _GetWantedNodes can be used here, but is not always appropriate to use 132 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for 133 # more information. 134 (self.op.on_node_uuids, self.op.on_nodes) = \ 135 GetWantedNodes(self, self.op.on_nodes) 136 137 master_uuid = self.cfg.GetMasterNode() 138 if self.op.on_master and master_uuid not in self.op.on_node_uuids: 139 self.op.on_node_uuids.append(master_uuid) 140 141 self.needed_locks = {} 142 self.needed_locks[locking.LEVEL_NODE] = self.op.on_node_uuids
143
144 - def _InterruptibleDelay(self):
145 """Delays but provides the mechanisms necessary to interrupt the delay as 146 needed. 147 148 """ 149 socket_wrapper = TestSocketWrapper() 150 sock, path = socket_wrapper.Create() 151 152 self.Log(constants.ELOG_DELAY_TEST, (path,)) 153 154 try: 155 sock.settimeout(self.op.duration) 156 start = time.time() 157 (conn, _) = sock.accept() 158 except socket.timeout, _: 159 # If we timed out, all is well 160 return False 161 finally: 162 # Destroys the original socket, but the new connection is still usable 163 socket_wrapper.Destroy() 164 165 try: 166 # Change to remaining time 167 time_to_go = self.op.duration - (time.time() - start) 168 self.Log(constants.ELOG_MESSAGE, 169 "Received connection, time to go is %d" % time_to_go) 170 if time_to_go < 0: 171 time_to_go = 0 172 # pylint: disable=E1101 173 # Instance of '_socketobject' has no ... member 174 conn.settimeout(time_to_go) 175 conn.recv(1) 176 # pylint: enable=E1101 177 except socket.timeout, _: 178 # A second timeout can occur if no data is sent 179 return False 180 finally: 181 conn.close() 182 183 self.Log(constants.ELOG_MESSAGE, 184 "Interrupted, time spent waiting: %d" % (time.time() - start)) 185 186 # Reaching this point means we were interrupted 187 return True
188
189 - def _UninterruptibleDelay(self):
190 """Delays without allowing interruptions. 191 192 """ 193 if self.op.on_node_uuids: 194 result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration) 195 for node_uuid, node_result in result.items(): 196 node_result.Raise("Failure during rpc call to node %s" % 197 self.cfg.GetNodeName(node_uuid)) 198 else: 199 if not utils.TestDelay(self.op.duration)[0]: 200 raise errors.OpExecError("Error during master delay test")
201
202 - def _TestDelay(self):
203 """Do the actual sleep. 204 205 @rtype: bool 206 @return: Whether the delay was interrupted 207 208 """ 209 if self.op.interruptible: 210 return self._InterruptibleDelay() 211 else: 212 self._UninterruptibleDelay() 213 return False
214
215 - def Exec(self, feedback_fn):
216 """Execute the test delay opcode, with the wanted repetitions. 217 218 """ 219 if self.op.repeat == 0: 220 i = self._TestDelay() 221 else: 222 top_value = self.op.repeat - 1 223 for i in range(self.op.repeat): 224 self.LogInfo("Test delay iteration %d/%d", i, top_value) 225 # Break in case of interruption 226 if self._TestDelay(): 227 break
228
229 230 -class LUTestJqueue(NoHooksLU):
231 """Utility LU to test some aspects of the job queue. 232 233 """ 234 REQ_BGL = False 235 236 # Must be lower than default timeout for WaitForJobChange to see whether it 237 # notices changed jobs 238 _CLIENT_CONNECT_TIMEOUT = 20.0 239 _CLIENT_CONFIRM_TIMEOUT = 60.0 240 241 @classmethod
242 - def _NotifyUsingSocket(cls, cb, errcls):
243 """Opens a Unix socket and waits for another program to connect. 244 245 @type cb: callable 246 @param cb: Callback to send socket name to client 247 @type errcls: class 248 @param errcls: Exception class to use for errors 249 250 """ 251 252 # Using a temporary directory as there's no easy way to create temporary 253 # sockets without writing a custom loop around tempfile.mktemp and 254 # socket.bind 255 256 socket_wrapper = TestSocketWrapper() 257 sock, path = socket_wrapper.Create() 258 259 cb(path) 260 261 try: 262 sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) 263 (conn, _) = sock.accept() 264 except socket.error, err: 265 raise errcls("Client didn't connect in time (%s)" % err) 266 finally: 267 socket_wrapper.Destroy() 268 269 # Wait for client to close 270 try: 271 try: 272 # pylint: disable=E1101 273 # Instance of '_socketobject' has no ... member 274 conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) 275 conn.recv(1) 276 except socket.error, err: 277 raise errcls("Client failed to confirm notification (%s)" % err) 278 finally: 279 conn.close()
280
281 - def _SendNotification(self, test, arg, sockname):
282 """Sends a notification to the client. 283 284 @type test: string 285 @param test: Test name 286 @param arg: Test argument (depends on test) 287 @type sockname: string 288 @param sockname: Socket path 289 290 """ 291 self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
292
293 - def _Notify(self, prereq, test, arg):
294 """Notifies the client of a test. 295 296 @type prereq: bool 297 @param prereq: Whether this is a prereq-phase test 298 @type test: string 299 @param test: Test name 300 @param arg: Test argument (depends on test) 301 302 """ 303 if prereq: 304 errcls = errors.OpPrereqError 305 else: 306 errcls = errors.OpExecError 307 308 return self._NotifyUsingSocket(compat.partial(self._SendNotification, 309 test, arg), 310 errcls)
311
312 - def CheckArguments(self):
313 self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 314 self.expandnames_calls = 0
315
316 - def ExpandNames(self):
317 checkargs_calls = getattr(self, "checkargs_calls", 0) 318 if checkargs_calls < 1: 319 raise errors.ProgrammerError("CheckArguments was not called") 320 321 self.expandnames_calls += 1 322 323 if self.op.notify_waitlock: 324 self._Notify(True, constants.JQT_EXPANDNAMES, None) 325 326 self.LogInfo("Expanding names") 327 328 # Get lock on master node (just to get a lock, not for a particular reason) 329 self.needed_locks = { 330 locking.LEVEL_NODE: self.cfg.GetMasterNode(), 331 }
332
333 - def Exec(self, feedback_fn):
334 if self.expandnames_calls < 1: 335 raise errors.ProgrammerError("ExpandNames was not called") 336 337 if self.op.notify_exec: 338 self._Notify(False, constants.JQT_EXEC, None) 339 340 self.LogInfo("Executing") 341 342 if self.op.log_messages: 343 self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) 344 for idx, msg in enumerate(self.op.log_messages): 345 self.LogInfo("Sending log message %s", idx + 1) 346 feedback_fn(constants.JQT_MSGPREFIX + msg) 347 # Report how many test messages have been sent 348 self._Notify(False, constants.JQT_LOGMSG, idx + 1) 349 350 if self.op.fail: 351 raise errors.OpExecError("Opcode failure was requested") 352 353 return True
354
355 356 -class LUTestOsParams(NoHooksLU):
357 """Utility LU to test secret OS parameter transmission. 358 359 """ 360 REQ_BGL = False 361
362 - def ExpandNames(self):
363 self.needed_locks = {}
364
365 - def Exec(self, feedback_fn):
366 if self.op.osparams_secret: 367 msg = "Secret OS parameters: %s" % self.op.osparams_secret.Unprivate() 368 feedback_fn(msg) 369 else: 370 raise errors.OpExecError("Opcode needs secret parameters")
371
372 373 -class LUTestAllocator(NoHooksLU):
374 """Run allocator tests. 375 376 This LU runs the allocator tests 377 378 """
379 - def CheckPrereq(self):
380 """Check prerequisites. 381 382 This checks the opcode parameters depending on the director and mode test. 383 384 """ 385 if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC, 386 constants.IALLOCATOR_MODE_MULTI_ALLOC): 387 (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name) 388 if iname is not None: 389 raise errors.OpPrereqError("Instance '%s' already in the cluster" % 390 iname, errors.ECODE_EXISTS) 391 for row in self.op.disks: 392 if (not isinstance(row, dict) or 393 constants.IDISK_SIZE not in row or 394 not isinstance(row[constants.IDISK_SIZE], int) or 395 constants.IDISK_MODE not in row or 396 row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET): 397 raise errors.OpPrereqError("Invalid contents of the 'disks'" 398 " parameter", errors.ECODE_INVAL) 399 if self.op.hypervisor is None: 400 self.op.hypervisor = self.cfg.GetHypervisorType() 401 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: 402 (self.inst_uuid, self.op.name) = ExpandInstanceUuidAndName(self.cfg, None, 403 self.op.name) 404 self.relocate_from_node_uuids = \ 405 list(self.cfg.GetInstanceSecondaryNodes(self.inst_uuid)) 406 elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, 407 constants.IALLOCATOR_MODE_NODE_EVAC): 408 if not self.op.instances: 409 raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL) 410 (_, self.op.instances) = GetWantedInstances(self, self.op.instances) 411 else: 412 raise errors.OpPrereqError("Invalid test allocator mode '%s'" % 413 self.op.mode, errors.ECODE_INVAL) 414 415 if self.op.direction == constants.IALLOCATOR_DIR_OUT: 416 if self.op.iallocator is None: 417 raise errors.OpPrereqError("Missing allocator name", 418 errors.ECODE_INVAL)
419
420 - def Exec(self, feedback_fn):
421 """Run the allocator test. 422 423 """ 424 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: 425 req = iallocator.IAReqInstanceAlloc(name=self.op.name, 426 memory=self.op.memory, 427 disks=self.op.disks, 428 disk_template=self.op.disk_template, 429 group_name=self.op.group_name, 430 os=self.op.os, 431 tags=self.op.tags, 432 nics=self.op.nics, 433 vcpus=self.op.vcpus, 434 spindle_use=self.op.spindle_use, 435 hypervisor=self.op.hypervisor, 436 node_whitelist=None) 437 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: 438 req = iallocator.IAReqRelocate( 439 inst_uuid=self.inst_uuid, 440 relocate_from_node_uuids=list(self.relocate_from_node_uuids)) 441 elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: 442 req = iallocator.IAReqGroupChange(instances=self.op.instances, 443 target_groups=self.op.target_groups) 444 elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: 445 req = iallocator.IAReqNodeEvac(instances=self.op.instances, 446 evac_mode=self.op.evac_mode, 447 ignore_soft_errors=False) 448 elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC: 449 disk_template = self.op.disk_template 450 insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx), 451 memory=self.op.memory, 452 disks=self.op.disks, 453 disk_template=disk_template, 454 group_name=self.op.group_name, 455 os=self.op.os, 456 tags=self.op.tags, 457 nics=self.op.nics, 458 vcpus=self.op.vcpus, 459 spindle_use=self.op.spindle_use, 460 hypervisor=self.op.hypervisor, 461 node_whitelist=None) 462 for idx in range(self.op.count)] 463 req = iallocator.IAReqMultiInstanceAlloc(instances=insts) 464 else: 465 raise errors.ProgrammerError("Uncatched mode %s in" 466 " LUTestAllocator.Exec", self.op.mode) 467 468 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 469 if self.op.direction == constants.IALLOCATOR_DIR_IN: 470 result = ial.in_text 471 else: 472 ial.Run(self.op.iallocator, validate=False) 473 result = ial.out_text 474 return result
475