Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21  """Module implementing the Ganeti locking code.""" 
  22   
  23  # pylint: disable-msg=W0212 
  24   
  25  # W0212 since e.g. LockSet methods use (a lot) the internals of 
  26  # SharedLock 
  27   
  28  import os 
  29  import select 
  30  import threading 
  31  import time 
  32  import errno 
  33   
  34  from ganeti import errors 
  35  from ganeti import utils 
  36  from ganeti import compat 
37 38 39 -def ssynchronized(lock, shared=0):
40 """Shared Synchronization decorator. 41 42 Calls the function holding the given lock, either in exclusive or shared 43 mode. It requires the passed lock to be a SharedLock (or support its 44 semantics). 45 46 """ 47 def wrap(fn): 48 def sync_function(*args, **kwargs): 49 lock.acquire(shared=shared) 50 try: 51 return fn(*args, **kwargs) 52 finally: 53 lock.release()
54 return sync_function 55 return wrap 56
57 58 -class RunningTimeout(object):
59 """Class to calculate remaining timeout when doing several operations. 60 61 """ 62 __slots__ = [ 63 "_allow_negative", 64 "_start_time", 65 "_time_fn", 66 "_timeout", 67 ] 68
69 - def __init__(self, timeout, allow_negative, _time_fn=time.time):
70 """Initializes this class. 71 72 @type timeout: float 73 @param timeout: Timeout duration 74 @type allow_negative: bool 75 @param allow_negative: Whether to return values below zero 76 @param _time_fn: Time function for unittests 77 78 """ 79 object.__init__(self) 80 81 if timeout is not None and timeout < 0.0: 82 raise ValueError("Timeout must not be negative") 83 84 self._timeout = timeout 85 self._allow_negative = allow_negative 86 self._time_fn = _time_fn 87 88 self._start_time = None
89
90 - def Remaining(self):
91 """Returns the remaining timeout. 92 93 """ 94 if self._timeout is None: 95 return None 96 97 # Get start time on first calculation 98 if self._start_time is None: 99 self._start_time = self._time_fn() 100 101 # Calculate remaining time 102 remaining_timeout = self._start_time + self._timeout - self._time_fn() 103 104 if not self._allow_negative: 105 # Ensure timeout is always >= 0 106 return max(0.0, remaining_timeout) 107 108 return remaining_timeout
109
110 111 -class _SingleNotifyPipeConditionWaiter(object):
112 """Helper class for SingleNotifyPipeCondition 113 114 """ 115 __slots__ = [ 116 "_fd", 117 "_poller", 118 ] 119
120 - def __init__(self, poller, fd):
121 """Constructor for _SingleNotifyPipeConditionWaiter 122 123 @type poller: select.poll 124 @param poller: Poller object 125 @type fd: int 126 @param fd: File descriptor to wait for 127 128 """ 129 object.__init__(self) 130 self._poller = poller 131 self._fd = fd
132
133 - def __call__(self, timeout):
134 """Wait for something to happen on the pipe. 135 136 @type timeout: float or None 137 @param timeout: Timeout for waiting (can be None) 138 139 """ 140 running_timeout = RunningTimeout(timeout, True) 141 142 while True: 143 remaining_time = running_timeout.Remaining() 144 145 if remaining_time is not None: 146 if remaining_time < 0.0: 147 break 148 149 # Our calculation uses seconds, poll() wants milliseconds 150 remaining_time *= 1000 151 152 try: 153 result = self._poller.poll(remaining_time) 154 except EnvironmentError, err: 155 if err.errno != errno.EINTR: 156 raise 157 result = None 158 159 # Check whether we were notified 160 if result and result[0][0] == self._fd: 161 break
162
163 164 -class _BaseCondition(object):
165 """Base class containing common code for conditions. 166 167 Some of this code is taken from python's threading module. 168 169 """ 170 __slots__ = [ 171 "_lock", 172 "acquire", 173 "release", 174 ] 175
176 - def __init__(self, lock):
177 """Constructor for _BaseCondition. 178 179 @type lock: threading.Lock 180 @param lock: condition base lock 181 182 """ 183 object.__init__(self) 184 185 # Recursive locks are not supported 186 assert not hasattr(lock, "_acquire_restore") 187 assert not hasattr(lock, "_release_save") 188 189 self._lock = lock 190 191 # Export the lock's acquire() and release() methods 192 self.acquire = lock.acquire 193 self.release = lock.release
194
195 - def _is_owned(self):
196 """Check whether lock is owned by current thread. 197 198 """ 199 if self._lock.acquire(0): 200 self._lock.release() 201 return False 202 203 return True
204
205 - def _check_owned(self):
206 """Raise an exception if the current thread doesn't own the lock. 207 208 """ 209 if not self._is_owned(): 210 raise RuntimeError("cannot work with un-aquired lock")
211
212 213 -class SingleNotifyPipeCondition(_BaseCondition):
214 """Condition which can only be notified once. 215 216 This condition class uses pipes and poll, internally, to be able to wait for 217 notification with a timeout, without resorting to polling. It is almost 218 compatible with Python's threading.Condition, with the following differences: 219 - notifyAll can only be called once, and no wait can happen after that 220 - notify is not supported, only notifyAll 221 222 """ 223 224 __slots__ = [ 225 "_poller", 226 "_read_fd", 227 "_write_fd", 228 "_nwaiters", 229 "_notified", 230 ] 231 232 _waiter_class = _SingleNotifyPipeConditionWaiter 233
234 - def __init__(self, lock):
235 """Constructor for SingleNotifyPipeCondition 236 237 """ 238 _BaseCondition.__init__(self, lock) 239 self._nwaiters = 0 240 self._notified = False 241 self._read_fd = None 242 self._write_fd = None 243 self._poller = None
244
245 - def _check_unnotified(self):
246 """Throws an exception if already notified. 247 248 """ 249 if self._notified: 250 raise RuntimeError("cannot use already notified condition")
251
252 - def _Cleanup(self):
253 """Cleanup open file descriptors, if any. 254 255 """ 256 if self._read_fd is not None: 257 os.close(self._read_fd) 258 self._read_fd = None 259 260 if self._write_fd is not None: 261 os.close(self._write_fd) 262 self._write_fd = None 263 self._poller = None
264
265 - def wait(self, timeout=None):
266 """Wait for a notification. 267 268 @type timeout: float or None 269 @param timeout: Waiting timeout (can be None) 270 271 """ 272 self._check_owned() 273 self._check_unnotified() 274 275 self._nwaiters += 1 276 try: 277 if self._poller is None: 278 (self._read_fd, self._write_fd) = os.pipe() 279 self._poller = select.poll() 280 self._poller.register(self._read_fd, select.POLLHUP) 281 282 wait_fn = self._waiter_class(self._poller, self._read_fd) 283 self.release() 284 try: 285 # Wait for notification 286 wait_fn(timeout) 287 finally: 288 # Re-acquire lock 289 self.acquire() 290 finally: 291 self._nwaiters -= 1 292 if self._nwaiters == 0: 293 self._Cleanup()
294
295 - def notifyAll(self): # pylint: disable-msg=C0103
296 """Close the writing side of the pipe to notify all waiters. 297 298 """ 299 self._check_owned() 300 self._check_unnotified() 301 self._notified = True 302 if self._write_fd is not None: 303 os.close(self._write_fd) 304 self._write_fd = None
305
306 307 -class PipeCondition(_BaseCondition):
308 """Group-only non-polling condition with counters. 309 310 This condition class uses pipes and poll, internally, to be able to wait for 311 notification with a timeout, without resorting to polling. It is almost 312 compatible with Python's threading.Condition, but only supports notifyAll and 313 non-recursive locks. As an additional features it's able to report whether 314 there are any waiting threads. 315 316 """ 317 __slots__ = [ 318 "_nwaiters", 319 "_single_condition", 320 ] 321 322 _single_condition_class = SingleNotifyPipeCondition 323
324 - def __init__(self, lock):
325 """Initializes this class. 326 327 """ 328 _BaseCondition.__init__(self, lock) 329 self._nwaiters = 0 330 self._single_condition = self._single_condition_class(self._lock)
331
332 - def wait(self, timeout=None):
333 """Wait for a notification. 334 335 @type timeout: float or None 336 @param timeout: Waiting timeout (can be None) 337 338 """ 339 self._check_owned() 340 341 # Keep local reference to the pipe. It could be replaced by another thread 342 # notifying while we're waiting. 343 my_condition = self._single_condition 344 345 assert self._nwaiters >= 0 346 self._nwaiters += 1 347 try: 348 my_condition.wait(timeout) 349 finally: 350 assert self._nwaiters > 0 351 self._nwaiters -= 1
352
353 - def notifyAll(self): # pylint: disable-msg=C0103
354 """Notify all currently waiting threads. 355 356 """ 357 self._check_owned() 358 self._single_condition.notifyAll() 359 self._single_condition = self._single_condition_class(self._lock)
360
361 - def has_waiting(self):
362 """Returns whether there are active waiters. 363 364 """ 365 self._check_owned() 366 367 return bool(self._nwaiters)
368
369 370 -class _CountingCondition(object):
371 """Wrapper for Python's built-in threading.Condition class. 372 373 This wrapper keeps a count of active waiters. We can't access the internal 374 "__waiters" attribute of threading.Condition because it's not thread-safe. 375 376 """ 377 __slots__ = [ 378 "_cond", 379 "_nwaiters", 380 ] 381
382 - def __init__(self, lock):
383 """Initializes this class. 384 385 """ 386 object.__init__(self) 387 self._cond = threading.Condition(lock=lock) 388 self._nwaiters = 0
389
390 - def notifyAll(self): # pylint: disable-msg=C0103
391 """Notifies the condition. 392 393 """ 394 return self._cond.notifyAll()
395
396 - def wait(self, timeout=None):
397 """Waits for the condition to be notified. 398 399 @type timeout: float or None 400 @param timeout: Waiting timeout (can be None) 401 402 """ 403 assert self._nwaiters >= 0 404 405 self._nwaiters += 1 406 try: 407 return self._cond.wait(timeout=timeout) 408 finally: 409 self._nwaiters -= 1
410
411 - def has_waiting(self):
412 """Returns whether there are active waiters. 413 414 """ 415 return bool(self._nwaiters)
416
417 418 -class SharedLock(object):
419 """Implements a shared lock. 420 421 Multiple threads can acquire the lock in a shared way, calling 422 acquire_shared(). In order to acquire the lock in an exclusive way threads 423 can call acquire_exclusive(). 424 425 The lock prevents starvation but does not guarantee that threads will acquire 426 the shared lock in the order they queued for it, just that they will 427 eventually do so. 428 429 """ 430 __slots__ = [ 431 "__active_shr_c", 432 "__inactive_shr_c", 433 "__deleted", 434 "__exc", 435 "__lock", 436 "__pending", 437 "__shr", 438 ] 439 440 __condition_class = PipeCondition 441
442 - def __init__(self):
443 """Construct a new SharedLock. 444 445 """ 446 object.__init__(self) 447 448 # Internal lock 449 self.__lock = threading.Lock() 450 451 # Queue containing waiting acquires 452 self.__pending = [] 453 454 # Active and inactive conditions for shared locks 455 self.__active_shr_c = self.__condition_class(self.__lock) 456 self.__inactive_shr_c = self.__condition_class(self.__lock) 457 458 # Current lock holders 459 self.__shr = set() 460 self.__exc = None 461 462 # is this lock in the deleted state? 463 self.__deleted = False
464
465 - def __check_deleted(self):
466 """Raises an exception if the lock has been deleted. 467 468 """ 469 if self.__deleted: 470 raise errors.LockError("Deleted lock")
471
472 - def __is_sharer(self):
473 """Is the current thread sharing the lock at this time? 474 475 """ 476 return threading.currentThread() in self.__shr
477
478 - def __is_exclusive(self):
479 """Is the current thread holding the lock exclusively at this time? 480 481 """ 482 return threading.currentThread() == self.__exc
483
484 - def __is_owned(self, shared=-1):
485 """Is the current thread somehow owning the lock at this time? 486 487 This is a private version of the function, which presumes you're holding 488 the internal lock. 489 490 """ 491 if shared < 0: 492 return self.__is_sharer() or self.__is_exclusive() 493 elif shared: 494 return self.__is_sharer() 495 else: 496 return self.__is_exclusive()
497
498 - def _is_owned(self, shared=-1):
499 """Is the current thread somehow owning the lock at this time? 500 501 @param shared: 502 - < 0: check for any type of ownership (default) 503 - 0: check for exclusive ownership 504 - > 0: check for shared ownership 505 506 """ 507 self.__lock.acquire() 508 try: 509 return self.__is_owned(shared=shared) 510 finally: 511 self.__lock.release()
512
513 - def _count_pending(self):
514 """Returns the number of pending acquires. 515 516 @rtype: int 517 518 """ 519 self.__lock.acquire() 520 try: 521 return len(self.__pending) 522 finally: 523 self.__lock.release()
524
525 - def __do_acquire(self, shared):
526 """Actually acquire the lock. 527 528 """ 529 if shared: 530 self.__shr.add(threading.currentThread()) 531 else: 532 self.__exc = threading.currentThread()
533
534 - def __can_acquire(self, shared):
535 """Determine whether lock can be acquired. 536 537 """ 538 if shared: 539 return self.__exc is None 540 else: 541 return len(self.__shr) == 0 and self.__exc is None
542
543 - def __is_on_top(self, cond):
544 """Checks whether the passed condition is on top of the queue. 545 546 The caller must make sure the queue isn't empty. 547 548 """ 549 return self.__pending[0] == cond
550
551 - def __acquire_unlocked(self, shared, timeout):
552 """Acquire a shared lock. 553 554 @param shared: whether to acquire in shared mode; by default an 555 exclusive lock will be acquired 556 @param timeout: maximum waiting time before giving up 557 558 """ 559 self.__check_deleted() 560 561 # We cannot acquire the lock if we already have it 562 assert not self.__is_owned(), "double acquire() on a non-recursive lock" 563 564 # Check whether someone else holds the lock or there are pending acquires. 565 if not self.__pending and self.__can_acquire(shared): 566 # Apparently not, can acquire lock directly. 567 self.__do_acquire(shared) 568 return True 569 570 if shared: 571 wait_condition = self.__active_shr_c 572 573 # Check if we're not yet in the queue 574 if wait_condition not in self.__pending: 575 self.__pending.append(wait_condition) 576 else: 577 wait_condition = self.__condition_class(self.__lock) 578 # Always add to queue 579 self.__pending.append(wait_condition) 580 581 try: 582 # Wait until we become the topmost acquire in the queue or the timeout 583 # expires. 584 while not (self.__is_on_top(wait_condition) and 585 self.__can_acquire(shared)): 586 # Wait for notification 587 wait_condition.wait(timeout) 588 self.__check_deleted() 589 590 # A lot of code assumes blocking acquires always succeed. Loop 591 # internally for that case. 592 if timeout is not None: 593 break 594 595 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 596 self.__do_acquire(shared) 597 return True 598 finally: 599 # Remove condition from queue if there are no more waiters 600 if not wait_condition.has_waiting() and not self.__deleted: 601 self.__pending.remove(wait_condition) 602 603 return False
604
605 - def acquire(self, shared=0, timeout=None, test_notify=None):
606 """Acquire a shared lock. 607 608 @type shared: integer (0/1) used as a boolean 609 @param shared: whether to acquire in shared mode; by default an 610 exclusive lock will be acquired 611 @type timeout: float 612 @param timeout: maximum waiting time before giving up 613 @type test_notify: callable or None 614 @param test_notify: Special callback function for unittesting 615 616 """ 617 self.__lock.acquire() 618 try: 619 # We already got the lock, notify now 620 if __debug__ and callable(test_notify): 621 test_notify() 622 623 return self.__acquire_unlocked(shared, timeout) 624 finally: 625 self.__lock.release()
626
627 - def release(self):
628 """Release a Shared Lock. 629 630 You must have acquired the lock, either in shared or in exclusive mode, 631 before calling this function. 632 633 """ 634 self.__lock.acquire() 635 try: 636 assert self.__is_exclusive() or self.__is_sharer(), \ 637 "Cannot release non-owned lock" 638 639 # Autodetect release type 640 if self.__is_exclusive(): 641 self.__exc = None 642 else: 643 self.__shr.remove(threading.currentThread()) 644 645 # Notify topmost condition in queue 646 if self.__pending: 647 first_condition = self.__pending[0] 648 first_condition.notifyAll() 649 650 if first_condition == self.__active_shr_c: 651 self.__active_shr_c = self.__inactive_shr_c 652 self.__inactive_shr_c = first_condition 653 654 finally: 655 self.__lock.release()
656
657 - def delete(self, timeout=None):
658 """Delete a Shared Lock. 659 660 This operation will declare the lock for removal. First the lock will be 661 acquired in exclusive mode if you don't already own it, then the lock 662 will be put in a state where any future and pending acquire() fail. 663 664 @type timeout: float 665 @param timeout: maximum waiting time before giving up 666 667 """ 668 self.__lock.acquire() 669 try: 670 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 671 672 self.__check_deleted() 673 674 # The caller is allowed to hold the lock exclusively already. 675 acquired = self.__is_exclusive() 676 677 if not acquired: 678 acquired = self.__acquire_unlocked(0, timeout) 679 680 assert self.__is_exclusive() and not self.__is_sharer(), \ 681 "Lock wasn't acquired in exclusive mode" 682 683 if acquired: 684 self.__deleted = True 685 self.__exc = None 686 687 # Notify all acquires. They'll throw an error. 688 while self.__pending: 689 self.__pending.pop().notifyAll() 690 691 return acquired 692 finally: 693 self.__lock.release()
694 695 696 # Whenever we want to acquire a full LockSet we pass None as the value 697 # to acquire. Hide this behind this nicely named constant. 698 ALL_SET = None
699 700 701 -class _AcquireTimeout(Exception):
702 """Internal exception to abort an acquire on a timeout. 703 704 """
705
706 707 -class LockSet:
708 """Implements a set of locks. 709 710 This abstraction implements a set of shared locks for the same resource type, 711 distinguished by name. The user can lock a subset of the resources and the 712 LockSet will take care of acquiring the locks always in the same order, thus 713 preventing deadlock. 714 715 All the locks needed in the same set must be acquired together, though. 716 717 """
718 - def __init__(self, members=None):
719 """Constructs a new LockSet. 720 721 @type members: list of strings 722 @param members: initial members of the set 723 724 """ 725 # Used internally to guarantee coherency. 726 self.__lock = SharedLock() 727 728 # The lockdict indexes the relationship name -> lock 729 # The order-of-locking is implied by the alphabetical order of names 730 self.__lockdict = {} 731 732 if members is not None: 733 for name in members: 734 self.__lockdict[name] = SharedLock() 735 736 # The owner dict contains the set of locks each thread owns. For 737 # performance each thread can access its own key without a global lock on 738 # this structure. It is paramount though that *no* other type of access is 739 # done to this structure (eg. no looping over its keys). *_owner helper 740 # function are defined to guarantee access is correct, but in general never 741 # do anything different than __owners[threading.currentThread()], or there 742 # will be trouble. 743 self.__owners = {}
744
745 - def _is_owned(self):
746 """Is the current thread a current level owner?""" 747 return threading.currentThread() in self.__owners
748
749 - def _add_owned(self, name=None):
750 """Note the current thread owns the given lock""" 751 if name is None: 752 if not self._is_owned(): 753 self.__owners[threading.currentThread()] = set() 754 else: 755 if self._is_owned(): 756 self.__owners[threading.currentThread()].add(name) 757 else: 758 self.__owners[threading.currentThread()] = set([name])
759
760 - def _del_owned(self, name=None):
761 """Note the current thread owns the given lock""" 762 763 assert not (name is None and self.__lock._is_owned()), \ 764 "Cannot hold internal lock when deleting owner status" 765 766 if name is not None: 767 self.__owners[threading.currentThread()].remove(name) 768 769 # Only remove the key if we don't hold the set-lock as well 770 if (not self.__lock._is_owned() and 771 not self.__owners[threading.currentThread()]): 772 del self.__owners[threading.currentThread()]
773
774 - def _list_owned(self):
775 """Get the set of resource names owned by the current thread""" 776 if self._is_owned(): 777 return self.__owners[threading.currentThread()].copy() 778 else: 779 return set()
780
781 - def _release_and_delete_owned(self):
782 """Release and delete all resources owned by the current thread""" 783 for lname in self._list_owned(): 784 lock = self.__lockdict[lname] 785 if lock._is_owned(): 786 lock.release() 787 self._del_owned(name=lname)
788
789 - def __names(self):
790 """Return the current set of names. 791 792 Only call this function while holding __lock and don't iterate on the 793 result after releasing the lock. 794 795 """ 796 return self.__lockdict.keys()
797
798 - def _names(self):
799 """Return a copy of the current set of elements. 800 801 Used only for debugging purposes. 802 803 """ 804 # If we don't already own the set-level lock acquired 805 # we'll get it and note we need to release it later. 806 release_lock = False 807 if not self.__lock._is_owned(): 808 release_lock = True 809 self.__lock.acquire(shared=1) 810 try: 811 result = self.__names() 812 finally: 813 if release_lock: 814 self.__lock.release() 815 return set(result)
816
817 - def acquire(self, names, timeout=None, shared=0, test_notify=None):
818 """Acquire a set of resource locks. 819 820 @type names: list of strings (or string) 821 @param names: the names of the locks which shall be acquired 822 (special lock names, or instance/node names) 823 @type shared: integer (0/1) used as a boolean 824 @param shared: whether to acquire in shared mode; by default an 825 exclusive lock will be acquired 826 @type timeout: float or None 827 @param timeout: Maximum time to acquire all locks 828 @type test_notify: callable or None 829 @param test_notify: Special callback function for unittesting 830 831 @return: Set of all locks successfully acquired or None in case of timeout 832 833 @raise errors.LockError: when any lock we try to acquire has 834 been deleted before we succeed. In this case none of the 835 locks requested will be acquired. 836 837 """ 838 assert timeout is None or timeout >= 0.0 839 840 # Check we don't already own locks at this level 841 assert not self._is_owned(), "Cannot acquire locks in the same set twice" 842 843 # We need to keep track of how long we spent waiting for a lock. The 844 # timeout passed to this function is over all lock acquires. 845 running_timeout = RunningTimeout(timeout, False) 846 847 try: 848 if names is not None: 849 # Support passing in a single resource to acquire rather than many 850 if isinstance(names, basestring): 851 names = [names] 852 853 return self.__acquire_inner(names, False, shared, 854 running_timeout.Remaining, test_notify) 855 856 else: 857 # If no names are given acquire the whole set by not letting new names 858 # being added before we release, and getting the current list of names. 859 # Some of them may then be deleted later, but we'll cope with this. 860 # 861 # We'd like to acquire this lock in a shared way, as it's nice if 862 # everybody else can use the instances at the same time. If are 863 # acquiring them exclusively though they won't be able to do this 864 # anyway, though, so we'll get the list lock exclusively as well in 865 # order to be able to do add() on the set while owning it. 866 if not self.__lock.acquire(shared=shared, 867 timeout=running_timeout.Remaining()): 868 raise _AcquireTimeout() 869 try: 870 # note we own the set-lock 871 self._add_owned() 872 873 return self.__acquire_inner(self.__names(), True, shared, 874 running_timeout.Remaining, test_notify) 875 except: 876 # We shouldn't have problems adding the lock to the owners list, but 877 # if we did we'll try to release this lock and re-raise exception. 878 # Of course something is going to be really wrong, after this. 879 self.__lock.release() 880 self._del_owned() 881 raise 882 883 except _AcquireTimeout: 884 return None
885
886 - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
887 """Inner logic for acquiring a number of locks. 888 889 @param names: Names of the locks to be acquired 890 @param want_all: Whether all locks in the set should be acquired 891 @param shared: Whether to acquire in shared mode 892 @param timeout_fn: Function returning remaining timeout 893 @param test_notify: Special callback function for unittesting 894 895 """ 896 acquire_list = [] 897 898 # First we look the locks up on __lockdict. We have no way of being sure 899 # they will still be there after, but this makes it a lot faster should 900 # just one of them be the already wrong. Using a sorted sequence to prevent 901 # deadlocks. 902 for lname in sorted(utils.UniqueSequence(names)): 903 try: 904 lock = self.__lockdict[lname] # raises KeyError if lock is not there 905 except KeyError: 906 if want_all: 907 # We are acquiring all the set, it doesn't matter if this particular 908 # element is not there anymore. 909 continue 910 911 raise errors.LockError("Non-existing lock in set (%s)" % lname) 912 913 acquire_list.append((lname, lock)) 914 915 # This will hold the locknames we effectively acquired. 916 acquired = set() 917 918 try: 919 # Now acquire_list contains a sorted list of resources and locks we 920 # want. In order to get them we loop on this (private) list and 921 # acquire() them. We gave no real guarantee they will still exist till 922 # this is done but .acquire() itself is safe and will alert us if the 923 # lock gets deleted. 924 for (lname, lock) in acquire_list: 925 if __debug__ and callable(test_notify): 926 test_notify_fn = lambda: test_notify(lname) 927 else: 928 test_notify_fn = None 929 930 timeout = timeout_fn() 931 932 try: 933 # raises LockError if the lock was deleted 934 acq_success = lock.acquire(shared=shared, timeout=timeout, 935 test_notify=test_notify_fn) 936 except errors.LockError: 937 if want_all: 938 # We are acquiring all the set, it doesn't matter if this 939 # particular element is not there anymore. 940 continue 941 942 raise errors.LockError("Non-existing lock in set (%s)" % lname) 943 944 if not acq_success: 945 # Couldn't get lock or timeout occurred 946 if timeout is None: 947 # This shouldn't happen as SharedLock.acquire(timeout=None) is 948 # blocking. 949 raise errors.LockError("Failed to get lock %s" % lname) 950 951 raise _AcquireTimeout() 952 953 try: 954 # now the lock cannot be deleted, we have it! 955 self._add_owned(name=lname) 956 acquired.add(lname) 957 958 except: 959 # We shouldn't have problems adding the lock to the owners list, but 960 # if we did we'll try to release this lock and re-raise exception. 961 # Of course something is going to be really wrong after this. 962 if lock._is_owned(): 963 lock.release() 964 raise 965 966 except: 967 # Release all owned locks 968 self._release_and_delete_owned() 969 raise 970 971 return acquired
972
973 - def release(self, names=None):
974 """Release a set of resource locks, at the same level. 975 976 You must have acquired the locks, either in shared or in exclusive mode, 977 before releasing them. 978 979 @type names: list of strings, or None 980 @param names: the names of the locks which shall be released 981 (defaults to all the locks acquired at that level). 982 983 """ 984 assert self._is_owned(), "release() on lock set while not owner" 985 986 # Support passing in a single resource to release rather than many 987 if isinstance(names, basestring): 988 names = [names] 989 990 if names is None: 991 names = self._list_owned() 992 else: 993 names = set(names) 994 assert self._list_owned().issuperset(names), ( 995 "release() on unheld resources %s" % 996 names.difference(self._list_owned())) 997 998 # First of all let's release the "all elements" lock, if set. 999 # After this 'add' can work again 1000 if self.__lock._is_owned(): 1001 self.__lock.release() 1002 self._del_owned() 1003 1004 for lockname in names: 1005 # If we are sure the lock doesn't leave __lockdict without being 1006 # exclusively held we can do this... 1007 self.__lockdict[lockname].release() 1008 self._del_owned(name=lockname)
1009
1010 - def add(self, names, acquired=0, shared=0):
1011 """Add a new set of elements to the set 1012 1013 @type names: list of strings 1014 @param names: names of the new elements to add 1015 @type acquired: integer (0/1) used as a boolean 1016 @param acquired: pre-acquire the new resource? 1017 @type shared: integer (0/1) used as a boolean 1018 @param shared: is the pre-acquisition shared? 1019 1020 """ 1021 # Check we don't already own locks at this level 1022 assert not self._is_owned() or self.__lock._is_owned(shared=0), \ 1023 "Cannot add locks if the set is only partially owned, or shared" 1024 1025 # Support passing in a single resource to add rather than many 1026 if isinstance(names, basestring): 1027 names = [names] 1028 1029 # If we don't already own the set-level lock acquired in an exclusive way 1030 # we'll get it and note we need to release it later. 1031 release_lock = False 1032 if not self.__lock._is_owned(): 1033 release_lock = True 1034 self.__lock.acquire() 1035 1036 try: 1037 invalid_names = set(self.__names()).intersection(names) 1038 if invalid_names: 1039 # This must be an explicit raise, not an assert, because assert is 1040 # turned off when using optimization, and this can happen because of 1041 # concurrency even if the user doesn't want it. 1042 raise errors.LockError("duplicate add() (%s)" % invalid_names) 1043 1044 for lockname in names: 1045 lock = SharedLock() 1046 1047 if acquired: 1048 lock.acquire(shared=shared) 1049 # now the lock cannot be deleted, we have it! 1050 try: 1051 self._add_owned(name=lockname) 1052 except: 1053 # We shouldn't have problems adding the lock to the owners list, 1054 # but if we did we'll try to release this lock and re-raise 1055 # exception. Of course something is going to be really wrong, 1056 # after this. On the other hand the lock hasn't been added to the 1057 # __lockdict yet so no other threads should be pending on it. This 1058 # release is just a safety measure. 1059 lock.release() 1060 raise 1061 1062 self.__lockdict[lockname] = lock 1063 1064 finally: 1065 # Only release __lock if we were not holding it previously. 1066 if release_lock: 1067 self.__lock.release() 1068 1069 return True
1070
1071 - def remove(self, names):
1072 """Remove elements from the lock set. 1073 1074 You can either not hold anything in the lockset or already hold a superset 1075 of the elements you want to delete, exclusively. 1076 1077 @type names: list of strings 1078 @param names: names of the resource to remove. 1079 1080 @return: a list of locks which we removed; the list is always 1081 equal to the names list if we were holding all the locks 1082 exclusively 1083 1084 """ 1085 # Support passing in a single resource to remove rather than many 1086 if isinstance(names, basestring): 1087 names = [names] 1088 1089 # If we own any subset of this lock it must be a superset of what we want 1090 # to delete. The ownership must also be exclusive, but that will be checked 1091 # by the lock itself. 1092 assert not self._is_owned() or self._list_owned().issuperset(names), ( 1093 "remove() on acquired lockset while not owning all elements") 1094 1095 removed = [] 1096 1097 for lname in names: 1098 # Calling delete() acquires the lock exclusively if we don't already own 1099 # it, and causes all pending and subsequent lock acquires to fail. It's 1100 # fine to call it out of order because delete() also implies release(), 1101 # and the assertion above guarantees that if we either already hold 1102 # everything we want to delete, or we hold none. 1103 try: 1104 self.__lockdict[lname].delete() 1105 removed.append(lname) 1106 except (KeyError, errors.LockError): 1107 # This cannot happen if we were already holding it, verify: 1108 assert not self._is_owned(), "remove failed while holding lockset" 1109 else: 1110 # If no LockError was raised we are the ones who deleted the lock. 1111 # This means we can safely remove it from lockdict, as any further or 1112 # pending delete() or acquire() will fail (and nobody can have the lock 1113 # since before our call to delete()). 1114 # 1115 # This is done in an else clause because if the exception was thrown 1116 # it's the job of the one who actually deleted it. 1117 del self.__lockdict[lname] 1118 # And let's remove it from our private list if we owned it. 1119 if self._is_owned(): 1120 self._del_owned(name=lname) 1121 1122 return removed
1123 1124 1125 # Locking levels, must be acquired in increasing order. 1126 # Current rules are: 1127 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 1128 # acquired before performing any operation, either in shared or in exclusive 1129 # mode. acquiring the BGL in exclusive mode is discouraged and should be 1130 # avoided. 1131 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. 1132 # If you need more than one node, or more than one instance, acquire them at 1133 # the same time. 1134 LEVEL_CLUSTER = 0 1135 LEVEL_INSTANCE = 1 1136 LEVEL_NODE = 2 1137 1138 LEVELS = [LEVEL_CLUSTER, 1139 LEVEL_INSTANCE, 1140 LEVEL_NODE] 1141 1142 # Lock levels which are modifiable 1143 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] 1144 1145 LEVEL_NAMES = { 1146 LEVEL_CLUSTER: "cluster", 1147 LEVEL_INSTANCE: "instance", 1148 LEVEL_NODE: "node", 1149 } 1150 1151 # Constant for the big ganeti lock 1152 BGL = 'BGL'
1153 1154 1155 -class GanetiLockManager:
1156 """The Ganeti Locking Library 1157 1158 The purpose of this small library is to manage locking for ganeti clusters 1159 in a central place, while at the same time doing dynamic checks against 1160 possible deadlocks. It will also make it easier to transition to a different 1161 lock type should we migrate away from python threads. 1162 1163 """ 1164 _instance = None 1165
1166 - def __init__(self, nodes=None, instances=None):
1167 """Constructs a new GanetiLockManager object. 1168 1169 There should be only a GanetiLockManager object at any time, so this 1170 function raises an error if this is not the case. 1171 1172 @param nodes: list of node names 1173 @param instances: list of instance names 1174 1175 """ 1176 assert self.__class__._instance is None, \ 1177 "double GanetiLockManager instance" 1178 1179 self.__class__._instance = self 1180 1181 # The keyring contains all the locks, at their level and in the correct 1182 # locking order. 1183 self.__keyring = { 1184 LEVEL_CLUSTER: LockSet([BGL]), 1185 LEVEL_NODE: LockSet(nodes), 1186 LEVEL_INSTANCE: LockSet(instances), 1187 }
1188
1189 - def _names(self, level):
1190 """List the lock names at the given level. 1191 1192 This can be used for debugging/testing purposes. 1193 1194 @param level: the level whose list of locks to get 1195 1196 """ 1197 assert level in LEVELS, "Invalid locking level %s" % level 1198 return self.__keyring[level]._names()
1199
1200 - def _is_owned(self, level):
1201 """Check whether we are owning locks at the given level 1202 1203 """ 1204 return self.__keyring[level]._is_owned()
1205 1206 is_owned = _is_owned 1207
1208 - def _list_owned(self, level):
1209 """Get the set of owned locks at the given level 1210 1211 """ 1212 return self.__keyring[level]._list_owned()
1213
1214 - def _upper_owned(self, level):
1215 """Check that we don't own any lock at a level greater than the given one. 1216 1217 """ 1218 # This way of checking only works if LEVELS[i] = i, which we check for in 1219 # the test cases. 1220 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1221
1222 - def _BGL_owned(self): # pylint: disable-msg=C0103
1223 """Check if the current thread owns the BGL. 1224 1225 Both an exclusive or a shared acquisition work. 1226 1227 """ 1228 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1229 1230 @staticmethod
1231 - def _contains_BGL(level, names): # pylint: disable-msg=C0103
1232 """Check if the level contains the BGL. 1233 1234 Check if acting on the given level and set of names will change 1235 the status of the Big Ganeti Lock. 1236 1237 """ 1238 return level == LEVEL_CLUSTER and (names is None or BGL in names) 1239
1240 - def acquire(self, level, names, timeout=None, shared=0):
1241 """Acquire a set of resource locks, at the same level. 1242 1243 @type level: member of locking.LEVELS 1244 @param level: the level at which the locks shall be acquired 1245 @type names: list of strings (or string) 1246 @param names: the names of the locks which shall be acquired 1247 (special lock names, or instance/node names) 1248 @type shared: integer (0/1) used as a boolean 1249 @param shared: whether to acquire in shared mode; by default 1250 an exclusive lock will be acquired 1251 @type timeout: float 1252 @param timeout: Maximum time to acquire all locks 1253 1254 """ 1255 assert level in LEVELS, "Invalid locking level %s" % level 1256 1257 # Check that we are either acquiring the Big Ganeti Lock or we already own 1258 # it. Some "legacy" opcodes need to be sure they are run non-concurrently 1259 # so even if we've migrated we need to at least share the BGL to be 1260 # compatible with them. Of course if we own the BGL exclusively there's no 1261 # point in acquiring any other lock, unless perhaps we are half way through 1262 # the migration of the current opcode. 1263 assert (self._contains_BGL(level, names) or self._BGL_owned()), ( 1264 "You must own the Big Ganeti Lock before acquiring any other") 1265 1266 # Check we don't own locks at the same or upper levels. 1267 assert not self._upper_owned(level), ("Cannot acquire locks at a level" 1268 " while owning some at a greater one") 1269 1270 # Acquire the locks in the set. 1271 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1272
1273 - def release(self, level, names=None):
1274 """Release a set of resource locks, at the same level. 1275 1276 You must have acquired the locks, either in shared or in exclusive 1277 mode, before releasing them. 1278 1279 @type level: member of locking.LEVELS 1280 @param level: the level at which the locks shall be released 1281 @type names: list of strings, or None 1282 @param names: the names of the locks which shall be released 1283 (defaults to all the locks acquired at that level) 1284 1285 """ 1286 assert level in LEVELS, "Invalid locking level %s" % level 1287 assert (not self._contains_BGL(level, names) or 1288 not self._upper_owned(LEVEL_CLUSTER)), ( 1289 "Cannot release the Big Ganeti Lock while holding something" 1290 " at upper levels (%r)" % 1291 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) 1292 for i in self.__keyring.keys()]), )) 1293 1294 # Release will complain if we don't own the locks already 1295 return self.__keyring[level].release(names)
1296
1297 - def add(self, level, names, acquired=0, shared=0):
1298 """Add locks at the specified level. 1299 1300 @type level: member of locking.LEVELS_MOD 1301 @param level: the level at which the locks shall be added 1302 @type names: list of strings 1303 @param names: names of the locks to acquire 1304 @type acquired: integer (0/1) used as a boolean 1305 @param acquired: whether to acquire the newly added locks 1306 @type shared: integer (0/1) used as a boolean 1307 @param shared: whether the acquisition will be shared 1308 1309 """ 1310 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1311 assert self._BGL_owned(), ("You must own the BGL before performing other" 1312 " operations") 1313 assert not self._upper_owned(level), ("Cannot add locks at a level" 1314 " while owning some at a greater one") 1315 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1316
1317 - def remove(self, level, names):
1318 """Remove locks from the specified level. 1319 1320 You must either already own the locks you are trying to remove 1321 exclusively or not own any lock at an upper level. 1322 1323 @type level: member of locking.LEVELS_MOD 1324 @param level: the level at which the locks shall be removed 1325 @type names: list of strings 1326 @param names: the names of the locks which shall be removed 1327 (special lock names, or instance/node names) 1328 1329 """ 1330 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1331 assert self._BGL_owned(), ("You must own the BGL before performing other" 1332 " operations") 1333 # Check we either own the level or don't own anything from here 1334 # up. LockSet.remove() will check the case in which we don't own 1335 # all the needed resources, or we have a shared ownership. 1336 assert self._is_owned(level) or not self._upper_owned(level), ( 1337 "Cannot remove locks at a level while not owning it or" 1338 " owning some at a greater one") 1339 return self.__keyring[level].remove(names)
1340