Package ganeti :: Package cmdlib :: Module test
[hide private]
[frames] | no frames]

Source Code for Module ganeti.cmdlib.test

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30   
 31  """Test logical units.""" 
 32   
 33  import logging 
 34  import shutil 
 35  import socket 
 36  import tempfile 
 37  import time 
 38   
 39  from ganeti import compat 
 40  from ganeti import constants 
 41  from ganeti import errors 
 42  from ganeti import locking 
 43  from ganeti import utils 
 44  from ganeti.masterd import iallocator 
 45  from ganeti.cmdlib.base import NoHooksLU 
 46  from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \ 
 47    GetWantedInstances 
48 49 50 -class TestSocketWrapper(object):
51 """ Utility class that opens a domain socket and cleans up as needed. 52 53 """
54 - def __init__(self):
55 """ Constructor cleaning up variables to be used. 56 57 """ 58 self.tmpdir = None 59 self.sock = None
60
61 - def Create(self, max_connections=1):
62 """ Creates a bound and ready socket, cleaning up in case of failure. 63 64 @type max_connections: int 65 @param max_connections: The number of max connections allowed for the 66 socket. 67 68 @rtype: tuple of socket, string 69 @return: The socket object and the path to reach it with. 70 71 """ 72 # Using a temporary directory as there's no easy way to create temporary 73 # sockets without writing a custom loop around tempfile.mktemp and 74 # socket.bind 75 self.tmpdir = tempfile.mkdtemp() 76 try: 77 tmpsock = utils.PathJoin(self.tmpdir, "sock") 78 logging.debug("Creating temporary socket at %s", tmpsock) 79 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 80 try: 81 self.sock.bind(tmpsock) 82 self.sock.listen(max_connections) 83 except: 84 self.sock.close() 85 raise 86 except: 87 shutil.rmtree(self.tmpdir) 88 raise 89 90 return self.sock, tmpsock
91
92 - def Destroy(self):
93 """ Destroys the socket and performs all necessary cleanup. 94 95 """ 96 if self.tmpdir is None or self.sock is None: 97 raise Exception("A socket must be created successfully before attempting " 98 "its destruction") 99 100 try: 101 self.sock.close() 102 finally: 103 shutil.rmtree(self.tmpdir)
104
105 106 -class LUTestDelay(NoHooksLU):
107 """Sleep for a specified amount of time. 108 109 This LU sleeps on the master and/or nodes for a specified amount of 110 time. 111 112 """ 113 REQ_BGL = False 114
115 - def ExpandNames(self):
116 """Expand names and set required locks. 117 118 This expands the node list, if any. 119 120 """ 121 self.needed_locks = {} 122 123 if self.op.duration <= 0: 124 raise errors.OpPrereqError("Duration must be greater than zero") 125 126 if not self.op.no_locks and (self.op.on_nodes or self.op.on_master): 127 self.needed_locks[locking.LEVEL_NODE] = [] 128 129 self.op.on_node_uuids = [] 130 if self.op.on_nodes: 131 # _GetWantedNodes can be used here, but is not always appropriate to use 132 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for 133 # more information. 134 (self.op.on_node_uuids, self.op.on_nodes) = \ 135 GetWantedNodes(self, self.op.on_nodes) 136 137 master_uuid = self.cfg.GetMasterNode() 138 if self.op.on_master and master_uuid not in self.op.on_node_uuids: 139 self.op.on_node_uuids.append(master_uuid) 140 141 self.needed_locks = {} 142 self.needed_locks[locking.LEVEL_NODE] = self.op.on_node_uuids
143
144 - def _InterruptibleDelay(self):
145 """Delays but provides the mechanisms necessary to interrupt the delay as 146 needed. 147 148 """ 149 socket_wrapper = TestSocketWrapper() 150 sock, path = socket_wrapper.Create() 151 152 self.Log(constants.ELOG_DELAY_TEST, (path,)) 153 154 try: 155 sock.settimeout(self.op.duration) 156 start = time.time() 157 (conn, _) = sock.accept() 158 except socket.timeout, _: 159 # If we timed out, all is well 160 return False 161 finally: 162 # Destroys the original socket, but the new connection is still usable 163 socket_wrapper.Destroy() 164 165 try: 166 # Change to remaining time 167 time_to_go = self.op.duration - (time.time() - start) 168 self.Log(constants.ELOG_MESSAGE, 169 "Received connection, time to go is %d" % time_to_go) 170 if time_to_go < 0: 171 time_to_go = 0 172 # pylint: disable=E1101 173 # Instance of '_socketobject' has no ... member 174 conn.settimeout(time_to_go) 175 conn.recv(1) 176 # pylint: enable=E1101 177 except socket.timeout, _: 178 # A second timeout can occur if no data is sent 179 return False 180 finally: 181 conn.close() 182 183 self.Log(constants.ELOG_MESSAGE, 184 "Interrupted, time spent waiting: %d" % (time.time() - start)) 185 186 # Reaching this point means we were interrupted 187 return True
188
189 - def _UninterruptibleDelay(self):
190 """Delays without allowing interruptions. 191 192 """ 193 if self.op.on_node_uuids: 194 result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration) 195 for node_uuid, node_result in result.items(): 196 node_result.Raise("Failure during rpc call to node %s" % 197 self.cfg.GetNodeName(node_uuid)) 198 else: 199 if not utils.TestDelay(self.op.duration)[0]: 200 raise errors.OpExecError("Error during master delay test")
201
202 - def _TestDelay(self):
203 """Do the actual sleep. 204 205 @rtype: bool 206 @return: Whether the delay was interrupted 207 208 """ 209 if self.op.interruptible: 210 return self._InterruptibleDelay() 211 else: 212 self._UninterruptibleDelay() 213 return False
214
215 - def Exec(self, feedback_fn):
216 """Execute the test delay opcode, with the wanted repetitions. 217 218 """ 219 if self.op.repeat == 0: 220 i = self._TestDelay() 221 else: 222 top_value = self.op.repeat - 1 223 for i in range(self.op.repeat): 224 self.LogInfo("Test delay iteration %d/%d", i, top_value) 225 # Break in case of interruption 226 if self._TestDelay(): 227 break
228
229 230 -class LUTestJqueue(NoHooksLU):
231 """Utility LU to test some aspects of the job queue. 232 233 """ 234 REQ_BGL = False 235 236 # Must be lower than default timeout for WaitForJobChange to see whether it 237 # notices changed jobs 238 _CLIENT_CONNECT_TIMEOUT = 20.0 239 _CLIENT_CONFIRM_TIMEOUT = 60.0 240 241 @classmethod
242 - def _NotifyUsingSocket(cls, cb, errcls):
243 """Opens a Unix socket and waits for another program to connect. 244 245 @type cb: callable 246 @param cb: Callback to send socket name to client 247 @type errcls: class 248 @param errcls: Exception class to use for errors 249 250 """ 251 252 # Using a temporary directory as there's no easy way to create temporary 253 # sockets without writing a custom loop around tempfile.mktemp and 254 # socket.bind 255 256 socket_wrapper = TestSocketWrapper() 257 sock, path = socket_wrapper.Create() 258 259 cb(path) 260 261 try: 262 sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) 263 (conn, _) = sock.accept() 264 except socket.error, err: 265 raise errcls("Client didn't connect in time (%s)" % err) 266 finally: 267 socket_wrapper.Destroy() 268 269 # Wait for client to close 270 try: 271 try: 272 # pylint: disable=E1101 273 # Instance of '_socketobject' has no ... member 274 conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) 275 conn.recv(1) 276 except socket.error, err: 277 raise errcls("Client failed to confirm notification (%s)" % err) 278 finally: 279 conn.close()
280
281 - def _SendNotification(self, test, arg, sockname):
282 """Sends a notification to the client. 283 284 @type test: string 285 @param test: Test name 286 @param arg: Test argument (depends on test) 287 @type sockname: string 288 @param sockname: Socket path 289 290 """ 291 self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
292
293 - def _Notify(self, prereq, test, arg):
294 """Notifies the client of a test. 295 296 @type prereq: bool 297 @param prereq: Whether this is a prereq-phase test 298 @type test: string 299 @param test: Test name 300 @param arg: Test argument (depends on test) 301 302 """ 303 if prereq: 304 errcls = errors.OpPrereqError 305 else: 306 errcls = errors.OpExecError 307 308 return self._NotifyUsingSocket(compat.partial(self._SendNotification, 309 test, arg), 310 errcls)
311
312 - def CheckArguments(self):
313 self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 314 self.expandnames_calls = 0
315
316 - def ExpandNames(self):
317 checkargs_calls = getattr(self, "checkargs_calls", 0) 318 if checkargs_calls < 1: 319 raise errors.ProgrammerError("CheckArguments was not called") 320 321 self.expandnames_calls += 1 322 323 if self.op.notify_waitlock: 324 self._Notify(True, constants.JQT_EXPANDNAMES, None) 325 326 self.LogInfo("Expanding names") 327 328 # Get lock on master node (just to get a lock, not for a particular reason) 329 self.needed_locks = { 330 locking.LEVEL_NODE: self.cfg.GetMasterNode(), 331 }
332
333 - def Exec(self, feedback_fn):
334 if self.expandnames_calls < 1: 335 raise errors.ProgrammerError("ExpandNames was not called") 336 337 if self.op.notify_exec: 338 self._Notify(False, constants.JQT_EXEC, None) 339 340 self.LogInfo("Executing") 341 342 if self.op.log_messages: 343 self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) 344 for idx, msg in enumerate(self.op.log_messages): 345 self.LogInfo("Sending log message %s", idx + 1) 346 feedback_fn(constants.JQT_MSGPREFIX + msg) 347 # Report how many test messages have been sent 348 self._Notify(False, constants.JQT_LOGMSG, idx + 1) 349 350 if self.op.fail: 351 raise errors.OpExecError("Opcode failure was requested") 352 353 return True
354
355 356 -class LUTestAllocator(NoHooksLU):
357 """Run allocator tests. 358 359 This LU runs the allocator tests 360 361 """
362 - def CheckPrereq(self):
363 """Check prerequisites. 364 365 This checks the opcode parameters depending on the director and mode test. 366 367 """ 368 if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC, 369 constants.IALLOCATOR_MODE_MULTI_ALLOC): 370 (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name) 371 if iname is not None: 372 raise errors.OpPrereqError("Instance '%s' already in the cluster" % 373 iname, errors.ECODE_EXISTS) 374 for row in self.op.disks: 375 if (not isinstance(row, dict) or 376 constants.IDISK_SIZE not in row or 377 not isinstance(row[constants.IDISK_SIZE], int) or 378 constants.IDISK_MODE not in row or 379 row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET): 380 raise errors.OpPrereqError("Invalid contents of the 'disks'" 381 " parameter", errors.ECODE_INVAL) 382 if self.op.hypervisor is None: 383 self.op.hypervisor = self.cfg.GetHypervisorType() 384 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: 385 (self.inst_uuid, self.op.name) = ExpandInstanceUuidAndName(self.cfg, None, 386 self.op.name) 387 self.relocate_from_node_uuids = \ 388 list(self.cfg.GetInstanceSecondaryNodes(self.inst_uuid)) 389 elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, 390 constants.IALLOCATOR_MODE_NODE_EVAC): 391 if not self.op.instances: 392 raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL) 393 (_, self.op.instances) = GetWantedInstances(self, self.op.instances) 394 else: 395 raise errors.OpPrereqError("Invalid test allocator mode '%s'" % 396 self.op.mode, errors.ECODE_INVAL) 397 398 if self.op.direction == constants.IALLOCATOR_DIR_OUT: 399 if self.op.iallocator is None: 400 raise errors.OpPrereqError("Missing allocator name", 401 errors.ECODE_INVAL)
402
403 - def Exec(self, feedback_fn):
404 """Run the allocator test. 405 406 """ 407 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: 408 req = iallocator.IAReqInstanceAlloc(name=self.op.name, 409 memory=self.op.memory, 410 disks=self.op.disks, 411 disk_template=self.op.disk_template, 412 os=self.op.os, 413 tags=self.op.tags, 414 nics=self.op.nics, 415 vcpus=self.op.vcpus, 416 spindle_use=self.op.spindle_use, 417 hypervisor=self.op.hypervisor, 418 node_whitelist=None) 419 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: 420 req = iallocator.IAReqRelocate( 421 inst_uuid=self.inst_uuid, 422 relocate_from_node_uuids=list(self.relocate_from_node_uuids)) 423 elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: 424 req = iallocator.IAReqGroupChange(instances=self.op.instances, 425 target_groups=self.op.target_groups) 426 elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: 427 req = iallocator.IAReqNodeEvac(instances=self.op.instances, 428 evac_mode=self.op.evac_mode) 429 elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC: 430 disk_template = self.op.disk_template 431 insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx), 432 memory=self.op.memory, 433 disks=self.op.disks, 434 disk_template=disk_template, 435 os=self.op.os, 436 tags=self.op.tags, 437 nics=self.op.nics, 438 vcpus=self.op.vcpus, 439 spindle_use=self.op.spindle_use, 440 hypervisor=self.op.hypervisor, 441 node_whitelist=None) 442 for idx in range(self.op.count)] 443 req = iallocator.IAReqMultiInstanceAlloc(instances=insts) 444 else: 445 raise errors.ProgrammerError("Uncatched mode %s in" 446 " LUTestAllocator.Exec", self.op.mode) 447 448 ial = iallocator.IAllocator(self.cfg, self.rpc, req) 449 if self.op.direction == constants.IALLOCATOR_DIR_IN: 450 result = ial.in_text 451 else: 452 ial.Run(self.op.iallocator, validate=False) 453 result = ial.out_text 454 return result
455