Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # All rights reserved. 
   6  # 
   7  # Redistribution and use in source and binary forms, with or without 
   8  # modification, are permitted provided that the following conditions are 
   9  # met: 
  10  # 
  11  # 1. Redistributions of source code must retain the above copyright notice, 
  12  # this list of conditions and the following disclaimer. 
  13  # 
  14  # 2. Redistributions in binary form must reproduce the above copyright 
  15  # notice, this list of conditions and the following disclaimer in the 
  16  # documentation and/or other materials provided with the distribution. 
  17  # 
  18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
  19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
  20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
  22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
  23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
  24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
  25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
  26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
  27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
  28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  29   
  30  """Module implementing the Ganeti locking code.""" 
  31   
  32  # pylint: disable=W0212 
  33   
  34  # W0212 since e.g. LockSet methods use (a lot) the internals of 
  35  # SharedLock 
  36   
  37  import os 
  38  import select 
  39  import threading 
  40  import errno 
  41  import weakref 
  42  import logging 
  43  import heapq 
  44  import itertools 
  45  import time 
  46   
  47  from ganeti import errors 
  48  from ganeti import utils 
  49  from ganeti import compat 
  50  from ganeti import query 
  51   
  52   
  53  _EXCLUSIVE_TEXT = "exclusive" 
  54  _SHARED_TEXT = "shared" 
  55  _DELETED_TEXT = "deleted" 
  56   
  57  _DEFAULT_PRIORITY = 0 
  58   
  59  #: Minimum timeout required to consider scheduling a pending acquisition 
  60  #: (seconds) 
  61  _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000) 
  62   
  63  # Internal lock acquisition modes for L{LockSet} 
  64  (_LS_ACQUIRE_EXACT, 
  65   _LS_ACQUIRE_ALL, 
  66   _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4) 
  67   
  68  _LS_ACQUIRE_MODES = compat.UniqueFrozenset([ 
  69    _LS_ACQUIRE_EXACT, 
  70    _LS_ACQUIRE_ALL, 
  71    _LS_ACQUIRE_OPPORTUNISTIC, 
  72    ]) 
73 74 75 -def ssynchronized(mylock, shared=0):
76 """Shared Synchronization decorator. 77 78 Calls the function holding the given lock, either in exclusive or shared 79 mode. It requires the passed lock to be a SharedLock (or support its 80 semantics). 81 82 @type mylock: lockable object or string 83 @param mylock: lock to acquire or class member name of the lock to acquire 84 85 """ 86 def wrap(fn): 87 def sync_function(*args, **kwargs): 88 if isinstance(mylock, basestring): 89 assert args, "cannot ssynchronize on non-class method: self not found" 90 # args[0] is "self" 91 lock = getattr(args[0], mylock) 92 else: 93 lock = mylock 94 lock.acquire(shared=shared) 95 try: 96 return fn(*args, **kwargs) 97 finally: 98 lock.release()
99 return sync_function 100 return wrap 101
102 103 -class _SingleNotifyPipeConditionWaiter(object):
104 """Helper class for SingleNotifyPipeCondition 105 106 """ 107 __slots__ = [ 108 "_fd", 109 ] 110
111 - def __init__(self, fd):
112 """Constructor for _SingleNotifyPipeConditionWaiter 113 114 @type fd: int 115 @param fd: File descriptor to wait for 116 117 """ 118 object.__init__(self) 119 self._fd = fd
120
121 - def __call__(self, timeout):
122 """Wait for something to happen on the pipe. 123 124 @type timeout: float or None 125 @param timeout: Timeout for waiting (can be None) 126 127 """ 128 running_timeout = utils.RunningTimeout(timeout, True) 129 poller = select.poll() 130 poller.register(self._fd, select.POLLHUP) 131 132 while True: 133 remaining_time = running_timeout.Remaining() 134 135 if remaining_time is not None: 136 if remaining_time < 0.0: 137 break 138 139 # Our calculation uses seconds, poll() wants milliseconds 140 remaining_time *= 1000 141 142 try: 143 result = poller.poll(remaining_time) 144 except EnvironmentError, err: 145 if err.errno != errno.EINTR: 146 raise 147 result = None 148 149 # Check whether we were notified 150 if result and result[0][0] == self._fd: 151 break
152
153 154 -class _BaseCondition(object):
155 """Base class containing common code for conditions. 156 157 Some of this code is taken from python's threading module. 158 159 """ 160 __slots__ = [ 161 "_lock", 162 "acquire", 163 "release", 164 "_is_owned", 165 "_acquire_restore", 166 "_release_save", 167 ] 168
169 - def __init__(self, lock):
170 """Constructor for _BaseCondition. 171 172 @type lock: threading.Lock 173 @param lock: condition base lock 174 175 """ 176 object.__init__(self) 177 178 try: 179 self._release_save = lock._release_save 180 except AttributeError: 181 self._release_save = self._base_release_save 182 try: 183 self._acquire_restore = lock._acquire_restore 184 except AttributeError: 185 self._acquire_restore = self._base_acquire_restore 186 try: 187 self._is_owned = lock.is_owned 188 except AttributeError: 189 self._is_owned = self._base_is_owned 190 191 self._lock = lock 192 193 # Export the lock's acquire() and release() methods 194 self.acquire = lock.acquire 195 self.release = lock.release
196
197 - def _base_is_owned(self):
198 """Check whether lock is owned by current thread. 199 200 """ 201 if self._lock.acquire(0): 202 self._lock.release() 203 return False 204 return True
205
206 - def _base_release_save(self):
207 self._lock.release()
208
209 - def _base_acquire_restore(self, _):
210 self._lock.acquire()
211
212 - def _check_owned(self):
213 """Raise an exception if the current thread doesn't own the lock. 214 215 """ 216 if not self._is_owned(): 217 raise RuntimeError("cannot work with un-aquired lock")
218
219 220 -class SingleNotifyPipeCondition(_BaseCondition):
221 """Condition which can only be notified once. 222 223 This condition class uses pipes and poll, internally, to be able to wait for 224 notification with a timeout, without resorting to polling. It is almost 225 compatible with Python's threading.Condition, with the following differences: 226 - notifyAll can only be called once, and no wait can happen after that 227 - notify is not supported, only notifyAll 228 229 """ 230 231 __slots__ = [ 232 "_read_fd", 233 "_write_fd", 234 "_nwaiters", 235 "_notified", 236 ] 237 238 _waiter_class = _SingleNotifyPipeConditionWaiter 239
240 - def __init__(self, lock):
241 """Constructor for SingleNotifyPipeCondition 242 243 """ 244 _BaseCondition.__init__(self, lock) 245 self._nwaiters = 0 246 self._notified = False 247 self._read_fd = None 248 self._write_fd = None
249
250 - def _check_unnotified(self):
251 """Throws an exception if already notified. 252 253 """ 254 if self._notified: 255 raise RuntimeError("cannot use already notified condition")
256
257 - def _Cleanup(self):
258 """Cleanup open file descriptors, if any. 259 260 """ 261 if self._read_fd is not None: 262 os.close(self._read_fd) 263 self._read_fd = None 264 265 if self._write_fd is not None: 266 os.close(self._write_fd) 267 self._write_fd = None
268
269 - def wait(self, timeout):
270 """Wait for a notification. 271 272 @type timeout: float or None 273 @param timeout: Waiting timeout (can be None) 274 275 """ 276 self._check_owned() 277 self._check_unnotified() 278 279 self._nwaiters += 1 280 try: 281 if self._read_fd is None: 282 (self._read_fd, self._write_fd) = os.pipe() 283 284 wait_fn = self._waiter_class(self._read_fd) 285 state = self._release_save() 286 try: 287 # Wait for notification 288 wait_fn(timeout) 289 finally: 290 # Re-acquire lock 291 self._acquire_restore(state) 292 finally: 293 self._nwaiters -= 1 294 if self._nwaiters == 0: 295 self._Cleanup()
296
297 - def notifyAll(self): # pylint: disable=C0103
298 """Close the writing side of the pipe to notify all waiters. 299 300 """ 301 self._check_owned() 302 self._check_unnotified() 303 self._notified = True 304 if self._write_fd is not None: 305 os.close(self._write_fd) 306 self._write_fd = None
307
308 309 -class PipeCondition(_BaseCondition):
310 """Group-only non-polling condition with counters. 311 312 This condition class uses pipes and poll, internally, to be able to wait for 313 notification with a timeout, without resorting to polling. It is almost 314 compatible with Python's threading.Condition, but only supports notifyAll and 315 non-recursive locks. As an additional features it's able to report whether 316 there are any waiting threads. 317 318 """ 319 __slots__ = [ 320 "_waiters", 321 "_single_condition", 322 ] 323 324 _single_condition_class = SingleNotifyPipeCondition 325
326 - def __init__(self, lock):
327 """Initializes this class. 328 329 """ 330 _BaseCondition.__init__(self, lock) 331 self._waiters = set() 332 self._single_condition = self._single_condition_class(self._lock)
333
334 - def wait(self, timeout):
335 """Wait for a notification. 336 337 @type timeout: float or None 338 @param timeout: Waiting timeout (can be None) 339 340 """ 341 self._check_owned() 342 343 # Keep local reference to the pipe. It could be replaced by another thread 344 # notifying while we're waiting. 345 cond = self._single_condition 346 347 self._waiters.add(threading.currentThread()) 348 try: 349 cond.wait(timeout) 350 finally: 351 self._check_owned() 352 self._waiters.remove(threading.currentThread())
353
354 - def notifyAll(self): # pylint: disable=C0103
355 """Notify all currently waiting threads. 356 357 """ 358 self._check_owned() 359 self._single_condition.notifyAll() 360 self._single_condition = self._single_condition_class(self._lock)
361
362 - def get_waiting(self):
363 """Returns a list of all waiting threads. 364 365 """ 366 self._check_owned() 367 368 return self._waiters
369
370 - def has_waiting(self):
371 """Returns whether there are active waiters. 372 373 """ 374 self._check_owned() 375 376 return bool(self._waiters)
377
378 - def __repr__(self):
379 return ("<%s.%s waiters=%s at %#x>" % 380 (self.__class__.__module__, self.__class__.__name__, 381 self._waiters, id(self)))
382
383 384 -class _PipeConditionWithMode(PipeCondition):
385 __slots__ = [ 386 "shared", 387 ] 388
389 - def __init__(self, lock, shared):
390 """Initializes this class. 391 392 """ 393 self.shared = shared 394 PipeCondition.__init__(self, lock)
395
396 397 -class SharedLock(object):
398 """Implements a shared lock. 399 400 Multiple threads can acquire the lock in a shared way by calling 401 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way 402 threads can call C{acquire(shared=0)}. 403 404 Notes on data structures: C{__pending} contains a priority queue (heapq) of 405 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), 406 ...]}. Each per-priority queue contains a normal in-order list of conditions 407 to be notified when the lock can be acquired. Shared locks are grouped 408 together by priority and the condition for them is stored in 409 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps 410 references for the per-priority queues indexed by priority for faster access. 411 412 @type name: string 413 @ivar name: the name of the lock 414 415 """ 416 __slots__ = [ 417 "__weakref__", 418 "__deleted", 419 "__exc", 420 "__lock", 421 "__pending", 422 "__pending_by_prio", 423 "__pending_shared", 424 "__shr", 425 "__time_fn", 426 "name", 427 ] 428 429 __condition_class = _PipeConditionWithMode 430
431 - def __init__(self, name, monitor=None, _time_fn=time.time):
432 """Construct a new SharedLock. 433 434 @param name: the name of the lock 435 @type monitor: L{LockMonitor} 436 @param monitor: Lock monitor with which to register 437 438 """ 439 object.__init__(self) 440 441 self.name = name 442 443 # Used for unittesting 444 self.__time_fn = _time_fn 445 446 # Internal lock 447 self.__lock = threading.Lock() 448 449 # Queue containing waiting acquires 450 self.__pending = [] 451 self.__pending_by_prio = {} 452 self.__pending_shared = {} 453 454 # Current lock holders 455 self.__shr = set() 456 self.__exc = None 457 458 # is this lock in the deleted state? 459 self.__deleted = False 460 461 # Register with lock monitor 462 if monitor: 463 logging.debug("Adding lock %s to monitor", name) 464 monitor.RegisterLock(self)
465
466 - def __repr__(self):
467 return ("<%s.%s name=%s at %#x>" % 468 (self.__class__.__module__, self.__class__.__name__, 469 self.name, id(self)))
470
471 - def GetLockInfo(self, requested):
472 """Retrieves information for querying locks. 473 474 @type requested: set 475 @param requested: Requested information, see C{query.LQ_*} 476 477 """ 478 self.__lock.acquire() 479 try: 480 # Note: to avoid unintentional race conditions, no references to 481 # modifiable objects should be returned unless they were created in this 482 # function. 483 mode = None 484 owner_names = None 485 486 if query.LQ_MODE in requested: 487 if self.__deleted: 488 mode = _DELETED_TEXT 489 assert not (self.__exc or self.__shr) 490 elif self.__exc: 491 mode = _EXCLUSIVE_TEXT 492 elif self.__shr: 493 mode = _SHARED_TEXT 494 495 # Current owner(s) are wanted 496 if query.LQ_OWNER in requested: 497 if self.__exc: 498 owner = [self.__exc] 499 else: 500 owner = self.__shr 501 502 if owner: 503 assert not self.__deleted 504 owner_names = [i.getName() for i in owner] 505 506 # Pending acquires are wanted 507 if query.LQ_PENDING in requested: 508 pending = [] 509 510 # Sorting instead of copying and using heaq functions for simplicity 511 for (_, prioqueue) in sorted(self.__pending): 512 for cond in prioqueue: 513 if cond.shared: 514 pendmode = _SHARED_TEXT 515 else: 516 pendmode = _EXCLUSIVE_TEXT 517 518 # List of names will be sorted in L{query._GetLockPending} 519 pending.append((pendmode, [i.getName() 520 for i in cond.get_waiting()])) 521 else: 522 pending = None 523 524 return [(self.name, mode, owner_names, pending)] 525 finally: 526 self.__lock.release()
527
528 - def __check_deleted(self):
529 """Raises an exception if the lock has been deleted. 530 531 """ 532 if self.__deleted: 533 raise errors.LockError("Deleted lock %s" % self.name)
534
535 - def __is_sharer(self):
536 """Is the current thread sharing the lock at this time? 537 538 """ 539 return threading.currentThread() in self.__shr
540
541 - def __is_exclusive(self):
542 """Is the current thread holding the lock exclusively at this time? 543 544 """ 545 return threading.currentThread() == self.__exc
546
547 - def __is_owned(self, shared=-1):
548 """Is the current thread somehow owning the lock at this time? 549 550 This is a private version of the function, which presumes you're holding 551 the internal lock. 552 553 """ 554 if shared < 0: 555 return self.__is_sharer() or self.__is_exclusive() 556 elif shared: 557 return self.__is_sharer() 558 else: 559 return self.__is_exclusive()
560
561 - def is_owned(self, shared=-1):
562 """Is the current thread somehow owning the lock at this time? 563 564 @param shared: 565 - < 0: check for any type of ownership (default) 566 - 0: check for exclusive ownership 567 - > 0: check for shared ownership 568 569 """ 570 self.__lock.acquire() 571 try: 572 return self.__is_owned(shared=shared) 573 finally: 574 self.__lock.release()
575 576 #: Necessary to remain compatible with threading.Condition, which tries to 577 #: retrieve a locks' "_is_owned" attribute 578 _is_owned = is_owned 579
580 - def _count_pending(self):
581 """Returns the number of pending acquires. 582 583 @rtype: int 584 585 """ 586 self.__lock.acquire() 587 try: 588 return sum(len(prioqueue) for (_, prioqueue) in self.__pending) 589 finally: 590 self.__lock.release()
591
592 - def _check_empty(self):
593 """Checks whether there are any pending acquires. 594 595 @rtype: bool 596 597 """ 598 self.__lock.acquire() 599 try: 600 # Order is important: __find_first_pending_queue modifies __pending 601 (_, prioqueue) = self.__find_first_pending_queue() 602 603 return not (prioqueue or 604 self.__pending or 605 self.__pending_by_prio or 606 self.__pending_shared) 607 finally: 608 self.__lock.release()
609
610 - def __do_acquire(self, shared):
611 """Actually acquire the lock. 612 613 """ 614 if shared: 615 self.__shr.add(threading.currentThread()) 616 else: 617 self.__exc = threading.currentThread()
618
619 - def __can_acquire(self, shared):
620 """Determine whether lock can be acquired. 621 622 """ 623 if shared: 624 return self.__exc is None 625 else: 626 return len(self.__shr) == 0 and self.__exc is None
627
628 - def __find_first_pending_queue(self):
629 """Tries to find the topmost queued entry with pending acquires. 630 631 Removes empty entries while going through the list. 632 633 """ 634 while self.__pending: 635 (priority, prioqueue) = self.__pending[0] 636 637 if prioqueue: 638 return (priority, prioqueue) 639 640 # Remove empty queue 641 heapq.heappop(self.__pending) 642 del self.__pending_by_prio[priority] 643 assert priority not in self.__pending_shared 644 645 return (None, None)
646
647 - def __is_on_top(self, cond):
648 """Checks whether the passed condition is on top of the queue. 649 650 The caller must make sure the queue isn't empty. 651 652 """ 653 (_, prioqueue) = self.__find_first_pending_queue() 654 655 return cond == prioqueue[0]
656
657 - def __acquire_unlocked(self, shared, timeout, priority):
658 """Acquire a shared lock. 659 660 @param shared: whether to acquire in shared mode; by default an 661 exclusive lock will be acquired 662 @param timeout: maximum waiting time before giving up 663 @type priority: integer 664 @param priority: Priority for acquiring lock 665 666 """ 667 self.__check_deleted() 668 669 # We cannot acquire the lock if we already have it 670 assert not self.__is_owned(), ("double acquire() on a non-recursive lock" 671 " %s" % self.name) 672 673 # Remove empty entries from queue 674 self.__find_first_pending_queue() 675 676 # Check whether someone else holds the lock or there are pending acquires. 677 if not self.__pending and self.__can_acquire(shared): 678 # Apparently not, can acquire lock directly. 679 self.__do_acquire(shared) 680 return True 681 682 # The lock couldn't be acquired right away, so if a timeout is given and is 683 # considered too short, return right away as scheduling a pending 684 # acquisition is quite expensive 685 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT: 686 return False 687 688 prioqueue = self.__pending_by_prio.get(priority, None) 689 690 if shared: 691 # Try to re-use condition for shared acquire 692 wait_condition = self.__pending_shared.get(priority, None) 693 assert (wait_condition is None or 694 (wait_condition.shared and wait_condition in prioqueue)) 695 else: 696 wait_condition = None 697 698 if wait_condition is None: 699 if prioqueue is None: 700 assert priority not in self.__pending_by_prio 701 702 prioqueue = [] 703 heapq.heappush(self.__pending, (priority, prioqueue)) 704 self.__pending_by_prio[priority] = prioqueue 705 706 wait_condition = self.__condition_class(self.__lock, shared) 707 prioqueue.append(wait_condition) 708 709 if shared: 710 # Keep reference for further shared acquires on same priority. This is 711 # better than trying to find it in the list of pending acquires. 712 assert priority not in self.__pending_shared 713 self.__pending_shared[priority] = wait_condition 714 715 wait_start = self.__time_fn() 716 acquired = False 717 718 try: 719 # Wait until we become the topmost acquire in the queue or the timeout 720 # expires. 721 while True: 722 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 723 self.__do_acquire(shared) 724 acquired = True 725 break 726 727 # A lot of code assumes blocking acquires always succeed, therefore we 728 # can never return False for a blocking acquire 729 if (timeout is not None and 730 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): 731 break 732 733 # Wait for notification 734 wait_condition.wait(timeout) 735 self.__check_deleted() 736 finally: 737 # Remove condition from queue if there are no more waiters 738 if not wait_condition.has_waiting(): 739 prioqueue.remove(wait_condition) 740 if wait_condition.shared: 741 # Remove from list of shared acquires if it wasn't while releasing 742 # (e.g. on lock deletion) 743 self.__pending_shared.pop(priority, None) 744 745 return acquired
746
747 - def acquire(self, shared=0, timeout=None, priority=None, 748 test_notify=None):
749 """Acquire a shared lock. 750 751 @type shared: integer (0/1) used as a boolean 752 @param shared: whether to acquire in shared mode; by default an 753 exclusive lock will be acquired 754 @type timeout: float 755 @param timeout: maximum waiting time before giving up 756 @type priority: integer 757 @param priority: Priority for acquiring lock 758 @type test_notify: callable or None 759 @param test_notify: Special callback function for unittesting 760 761 """ 762 if priority is None: 763 priority = _DEFAULT_PRIORITY 764 765 self.__lock.acquire() 766 try: 767 # We already got the lock, notify now 768 if __debug__ and callable(test_notify): 769 test_notify() 770 771 return self.__acquire_unlocked(shared, timeout, priority) 772 finally: 773 self.__lock.release()
774
775 - def downgrade(self):
776 """Changes the lock mode from exclusive to shared. 777 778 Pending acquires in shared mode on the same priority will go ahead. 779 780 """ 781 self.__lock.acquire() 782 try: 783 assert self.__is_owned(), "Lock must be owned" 784 785 if self.__is_exclusive(): 786 # Do nothing if the lock is already acquired in shared mode 787 self.__exc = None 788 self.__do_acquire(1) 789 790 # Important: pending shared acquires should only jump ahead if there 791 # was a transition from exclusive to shared, otherwise an owner of a 792 # shared lock can keep calling this function to push incoming shared 793 # acquires 794 (priority, prioqueue) = self.__find_first_pending_queue() 795 if prioqueue: 796 # Is there a pending shared acquire on this priority? 797 cond = self.__pending_shared.pop(priority, None) 798 if cond: 799 assert cond.shared 800 assert cond in prioqueue 801 802 # Ensure shared acquire is on top of queue 803 if len(prioqueue) > 1: 804 prioqueue.remove(cond) 805 prioqueue.insert(0, cond) 806 807 # Notify 808 cond.notifyAll() 809 810 assert not self.__is_exclusive() 811 assert self.__is_sharer() 812 813 return True 814 finally: 815 self.__lock.release()
816
817 - def release(self):
818 """Release a Shared Lock. 819 820 You must have acquired the lock, either in shared or in exclusive mode, 821 before calling this function. 822 823 """ 824 self.__lock.acquire() 825 try: 826 assert self.__is_exclusive() or self.__is_sharer(), \ 827 "Cannot release non-owned lock" 828 829 # Autodetect release type 830 if self.__is_exclusive(): 831 self.__exc = None 832 notify = True 833 else: 834 self.__shr.remove(threading.currentThread()) 835 notify = not self.__shr 836 837 # Notify topmost condition in queue if there are no owners left (for 838 # shared locks) 839 if notify: 840 self.__notify_topmost() 841 finally: 842 self.__lock.release()
843
844 - def __notify_topmost(self):
845 """Notifies topmost condition in queue of pending acquires. 846 847 """ 848 (priority, prioqueue) = self.__find_first_pending_queue() 849 if prioqueue: 850 cond = prioqueue[0] 851 cond.notifyAll() 852 if cond.shared: 853 # Prevent further shared acquires from sneaking in while waiters are 854 # notified 855 self.__pending_shared.pop(priority, None)
856
857 - def _notify_topmost(self):
858 """Exported version of L{__notify_topmost}. 859 860 """ 861 self.__lock.acquire() 862 try: 863 return self.__notify_topmost() 864 finally: 865 self.__lock.release()
866
867 - def delete(self, timeout=None, priority=None):
868 """Delete a Shared Lock. 869 870 This operation will declare the lock for removal. First the lock will be 871 acquired in exclusive mode if you don't already own it, then the lock 872 will be put in a state where any future and pending acquire() fail. 873 874 @type timeout: float 875 @param timeout: maximum waiting time before giving up 876 @type priority: integer 877 @param priority: Priority for acquiring lock 878 879 """ 880 if priority is None: 881 priority = _DEFAULT_PRIORITY 882 883 self.__lock.acquire() 884 try: 885 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 886 887 self.__check_deleted() 888 889 # The caller is allowed to hold the lock exclusively already. 890 acquired = self.__is_exclusive() 891 892 if not acquired: 893 acquired = self.__acquire_unlocked(0, timeout, priority) 894 895 if acquired: 896 assert self.__is_exclusive() and not self.__is_sharer(), \ 897 "Lock wasn't acquired in exclusive mode" 898 899 self.__deleted = True 900 self.__exc = None 901 902 assert not (self.__exc or self.__shr), "Found owner during deletion" 903 904 # Notify all acquires. They'll throw an error. 905 for (_, prioqueue) in self.__pending: 906 for cond in prioqueue: 907 cond.notifyAll() 908 909 assert self.__deleted 910 911 return acquired 912 finally: 913 self.__lock.release()
914
915 - def _release_save(self):
916 shared = self.__is_sharer() 917 self.release() 918 return shared
919
920 - def _acquire_restore(self, shared):
921 self.acquire(shared=shared)
922 923 924 # Whenever we want to acquire a full LockSet we pass None as the value 925 # to acquire. Hide this behind this nicely named constant. 926 ALL_SET = None
927 928 929 -def _TimeoutZero():
930 """Returns the number zero. 931 932 """ 933 return 0
934
935 936 -def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
937 """Determines modes and timeouts for L{LockSet.acquire}. 938 939 @type want_all: boolean 940 @param want_all: Whether all locks in set should be acquired 941 @param timeout: Timeout in seconds or C{None} 942 @param opportunistic: Whther locks should be acquired opportunistically 943 @rtype: tuple 944 @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner} 945 (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for 946 acquiring the lockset-internal lock (might be C{None}) and a function to 947 calculate the timeout for acquiring individual locks 948 949 """ 950 # Short circuit when no running timeout is needed 951 if opportunistic and not want_all: 952 assert timeout is None, "Got timeout for an opportunistic acquisition" 953 return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero) 954 955 # We need to keep track of how long we spent waiting for a lock. The 956 # timeout passed to this function is over all lock acquisitions. 957 running_timeout = utils.RunningTimeout(timeout, False) 958 959 if want_all: 960 mode = _LS_ACQUIRE_ALL 961 ls_timeout_fn = running_timeout.Remaining 962 else: 963 mode = _LS_ACQUIRE_EXACT 964 ls_timeout_fn = None 965 966 if opportunistic: 967 mode = _LS_ACQUIRE_OPPORTUNISTIC 968 timeout_fn = _TimeoutZero 969 else: 970 timeout_fn = running_timeout.Remaining 971 972 return (mode, ls_timeout_fn, timeout_fn)
973
974 975 -class _AcquireTimeout(Exception):
976 """Internal exception to abort an acquire on a timeout. 977 978 """
979
980 981 -class LockSet(object):
982 """Implements a set of locks. 983 984 This abstraction implements a set of shared locks for the same resource type, 985 distinguished by name. The user can lock a subset of the resources and the 986 LockSet will take care of acquiring the locks always in the same order, thus 987 preventing deadlock. 988 989 All the locks needed in the same set must be acquired together, though. 990 991 @type name: string 992 @ivar name: the name of the lockset 993 994 """
995 - def __init__(self, members, name, monitor=None):
996 """Constructs a new LockSet. 997 998 @type members: list of strings 999 @param members: initial members of the set 1000 @type monitor: L{LockMonitor} 1001 @param monitor: Lock monitor with which to register member locks 1002 1003 """ 1004 assert members is not None, "members parameter is not a list" 1005 self.name = name 1006 1007 # Lock monitor 1008 self.__monitor = monitor 1009 1010 # Used internally to guarantee coherency 1011 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor) 1012 1013 # The lockdict indexes the relationship name -> lock 1014 # The order-of-locking is implied by the alphabetical order of names 1015 self.__lockdict = {} 1016 1017 for mname in members: 1018 self.__lockdict[mname] = SharedLock(self._GetLockName(mname), 1019 monitor=monitor) 1020 1021 # The owner dict contains the set of locks each thread owns. For 1022 # performance each thread can access its own key without a global lock on 1023 # this structure. It is paramount though that *no* other type of access is 1024 # done to this structure (eg. no looping over its keys). *_owner helper 1025 # function are defined to guarantee access is correct, but in general never 1026 # do anything different than __owners[threading.currentThread()], or there 1027 # will be trouble. 1028 self.__owners = {}
1029
1030 - def _GetLockName(self, mname):
1031 """Returns the name for a member lock. 1032 1033 """ 1034 return "%s/%s" % (self.name, mname)
1035
1036 - def _get_lock(self):
1037 """Returns the lockset-internal lock. 1038 1039 """ 1040 return self.__lock
1041
1042 - def _get_lockdict(self):
1043 """Returns the lockset-internal lock dictionary. 1044 1045 Accessing this structure is only safe in single-thread usage or when the 1046 lockset-internal lock is held. 1047 1048 """ 1049 return self.__lockdict
1050
1051 - def is_owned(self):
1052 """Is the current thread a current level owner? 1053 1054 @note: Use L{check_owned} to check if a specific lock is held 1055 1056 """ 1057 return threading.currentThread() in self.__owners
1058
1059 - def check_owned(self, names, shared=-1):
1060 """Check if locks are owned in a specific mode. 1061 1062 @type names: sequence or string 1063 @param names: Lock names (or a single lock name) 1064 @param shared: See L{SharedLock.is_owned} 1065 @rtype: bool 1066 @note: Use L{is_owned} to check if the current thread holds I{any} lock and 1067 L{list_owned} to get the names of all owned locks 1068 1069 """ 1070 if isinstance(names, basestring): 1071 names = [names] 1072 1073 # Avoid check if no locks are owned anyway 1074 if names and self.is_owned(): 1075 candidates = [] 1076 1077 # Gather references to all locks (in case they're deleted in the meantime) 1078 for lname in names: 1079 try: 1080 lock = self.__lockdict[lname] 1081 except KeyError: 1082 raise errors.LockError("Non-existing lock '%s' in set '%s' (it may" 1083 " have been removed)" % (lname, self.name)) 1084 else: 1085 candidates.append(lock) 1086 1087 return compat.all(lock.is_owned(shared=shared) for lock in candidates) 1088 else: 1089 return False
1090
1091 - def owning_all(self):
1092 """Checks whether current thread owns internal lock. 1093 1094 Holding the internal lock is equivalent with holding all locks in the set 1095 (the opposite does not necessarily hold as it can not be easily 1096 determined). L{add} and L{remove} require the internal lock. 1097 1098 @rtype: boolean 1099 1100 """ 1101 return self.__lock.is_owned()
1102
1103 - def _add_owned(self, name=None):
1104 """Note the current thread owns the given lock""" 1105 if name is None: 1106 if not self.is_owned(): 1107 self.__owners[threading.currentThread()] = set() 1108 else: 1109 if self.is_owned(): 1110 self.__owners[threading.currentThread()].add(name) 1111 else: 1112 self.__owners[threading.currentThread()] = set([name])
1113
1114 - def _del_owned(self, name=None):
1115 """Note the current thread owns the given lock""" 1116 1117 assert not (name is None and self.__lock.is_owned()), \ 1118 "Cannot hold internal lock when deleting owner status" 1119 1120 if name is not None: 1121 self.__owners[threading.currentThread()].remove(name) 1122 1123 # Only remove the key if we don't hold the set-lock as well 1124 if not (self.__lock.is_owned() or 1125 self.__owners[threading.currentThread()]): 1126 del self.__owners[threading.currentThread()]
1127
1128 - def list_owned(self):
1129 """Get the set of resource names owned by the current thread""" 1130 if self.is_owned(): 1131 return self.__owners[threading.currentThread()].copy() 1132 else: 1133 return set()
1134
1135 - def _release_and_delete_owned(self):
1136 """Release and delete all resources owned by the current thread""" 1137 for lname in self.list_owned(): 1138 lock = self.__lockdict[lname] 1139 if lock.is_owned(): 1140 lock.release() 1141 self._del_owned(name=lname)
1142
1143 - def __names(self):
1144 """Return the current set of names. 1145 1146 Only call this function while holding __lock and don't iterate on the 1147 result after releasing the lock. 1148 1149 """ 1150 return self.__lockdict.keys()
1151
1152 - def _names(self):
1153 """Return a copy of the current set of elements. 1154 1155 Used only for debugging purposes. 1156 1157 """ 1158 # If we don't already own the set-level lock acquired 1159 # we'll get it and note we need to release it later. 1160 release_lock = False 1161 if not self.__lock.is_owned(): 1162 release_lock = True 1163 self.__lock.acquire(shared=1) 1164 try: 1165 result = self.__names() 1166 finally: 1167 if release_lock: 1168 self.__lock.release() 1169 return set(result)
1170
1171 - def acquire(self, names, timeout=None, shared=0, priority=None, 1172 opportunistic=False, test_notify=None):
1173 """Acquire a set of resource locks. 1174 1175 @note: When acquiring locks opportunistically, any number of locks might 1176 actually be acquired, even zero. 1177 1178 @type names: list of strings (or string) 1179 @param names: the names of the locks which shall be acquired 1180 (special lock names, or instance/node names) 1181 @type shared: integer (0/1) used as a boolean 1182 @param shared: whether to acquire in shared mode; by default an 1183 exclusive lock will be acquired 1184 @type timeout: float or None 1185 @param timeout: Maximum time to acquire all locks; for opportunistic 1186 acquisitions, a timeout can only be given when C{names} is C{None}, in 1187 which case it is exclusively used for acquiring the L{LockSet}-internal 1188 lock; opportunistic acquisitions don't use a timeout for acquiring 1189 individual locks 1190 @type priority: integer 1191 @param priority: Priority for acquiring locks 1192 @type opportunistic: boolean 1193 @param opportunistic: Acquire locks opportunistically; use the return value 1194 to determine which locks were actually acquired 1195 @type test_notify: callable or None 1196 @param test_notify: Special callback function for unittesting 1197 1198 @return: Set of all locks successfully acquired or None in case of timeout 1199 1200 @raise errors.LockError: when any lock we try to acquire has 1201 been deleted before we succeed. In this case none of the 1202 locks requested will be acquired. 1203 1204 """ 1205 assert timeout is None or timeout >= 0.0 1206 1207 # Check we don't already own locks at this level 1208 assert not self.is_owned(), ("Cannot acquire locks in the same set twice" 1209 " (lockset %s)" % self.name) 1210 1211 if priority is None: 1212 priority = _DEFAULT_PRIORITY 1213 1214 try: 1215 if names is not None: 1216 assert timeout is None or not opportunistic, \ 1217 ("Opportunistic acquisitions can only use a timeout if no" 1218 " names are given; see docstring for details") 1219 1220 # Support passing in a single resource to acquire rather than many 1221 if isinstance(names, basestring): 1222 names = [names] 1223 1224 (mode, _, timeout_fn) = \ 1225 _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic) 1226 1227 return self.__acquire_inner(names, mode, shared, priority, 1228 timeout_fn, test_notify) 1229 1230 else: 1231 (mode, ls_timeout_fn, timeout_fn) = \ 1232 _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic) 1233 1234 # If no names are given acquire the whole set by not letting new names 1235 # being added before we release, and getting the current list of names. 1236 # Some of them may then be deleted later, but we'll cope with this. 1237 # 1238 # We'd like to acquire this lock in a shared way, as it's nice if 1239 # everybody else can use the instances at the same time. If we are 1240 # acquiring them exclusively though they won't be able to do this 1241 # anyway, though, so we'll get the list lock exclusively as well in 1242 # order to be able to do add() on the set while owning it. 1243 if not self.__lock.acquire(shared=shared, priority=priority, 1244 timeout=ls_timeout_fn()): 1245 raise _AcquireTimeout() 1246 1247 try: 1248 # note we own the set-lock 1249 self._add_owned() 1250 1251 return self.__acquire_inner(self.__names(), mode, shared, 1252 priority, timeout_fn, test_notify) 1253 except: 1254 # We shouldn't have problems adding the lock to the owners list, but 1255 # if we did we'll try to release this lock and re-raise exception. 1256 # Of course something is going to be really wrong, after this. 1257 self.__lock.release() 1258 self._del_owned() 1259 raise 1260 1261 except _AcquireTimeout: 1262 return None
1263
1264 - def __acquire_inner(self, names, mode, shared, priority, 1265 timeout_fn, test_notify):
1266 """Inner logic for acquiring a number of locks. 1267 1268 Acquisition modes: 1269 1270 - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but 1271 deleted locks can be ignored as the whole set is being acquired with 1272 its internal lock held 1273 - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired; 1274 timeouts and deleted locks are fatal 1275 - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially 1276 all within the set) which should be acquired opportunistically, that is 1277 failures are ignored 1278 1279 @param names: Names of the locks to be acquired 1280 @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES}) 1281 @param shared: Whether to acquire in shared mode 1282 @param timeout_fn: Function returning remaining timeout (C{None} for 1283 opportunistic acquisitions) 1284 @param priority: Priority for acquiring locks 1285 @param test_notify: Special callback function for unittesting 1286 1287 """ 1288 assert mode in _LS_ACQUIRE_MODES 1289 1290 acquire_list = [] 1291 1292 # First we look the locks up on __lockdict. We have no way of being sure 1293 # they will still be there after, but this makes it a lot faster should 1294 # just one of them be the already wrong. Using a sorted sequence to prevent 1295 # deadlocks. 1296 for lname in sorted(frozenset(names)): 1297 try: 1298 lock = self.__lockdict[lname] # raises KeyError if lock is not there 1299 except KeyError: 1300 # We are acquiring the whole set, it doesn't matter if this particular 1301 # element is not there anymore. If, however, only certain names should 1302 # be acquired, not finding a lock is an error. 1303 if mode == _LS_ACQUIRE_EXACT: 1304 raise errors.LockError("Lock '%s' not found in set '%s' (it may have" 1305 " been removed)" % (lname, self.name)) 1306 else: 1307 acquire_list.append((lname, lock)) 1308 1309 # This will hold the locknames we effectively acquired. 1310 acquired = set() 1311 1312 try: 1313 # Now acquire_list contains a sorted list of resources and locks we 1314 # want. In order to get them we loop on this (private) list and 1315 # acquire() them. We gave no real guarantee they will still exist till 1316 # this is done but .acquire() itself is safe and will alert us if the 1317 # lock gets deleted. 1318 for (lname, lock) in acquire_list: 1319 if __debug__ and callable(test_notify): 1320 test_notify_fn = lambda: test_notify(lname) 1321 else: 1322 test_notify_fn = None 1323 1324 timeout = timeout_fn() 1325 1326 try: 1327 # raises LockError if the lock was deleted 1328 acq_success = lock.acquire(shared=shared, timeout=timeout, 1329 priority=priority, 1330 test_notify=test_notify_fn) 1331 except errors.LockError: 1332 if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC): 1333 # We are acquiring the whole set, it doesn't matter if this 1334 # particular element is not there anymore. 1335 continue 1336 1337 raise errors.LockError("Lock '%s' not found in set '%s' (it may have" 1338 " been removed)" % (lname, self.name)) 1339 1340 if not acq_success: 1341 # Couldn't get lock or timeout occurred 1342 if mode == _LS_ACQUIRE_OPPORTUNISTIC: 1343 # Ignore timeouts on opportunistic acquisitions 1344 continue 1345 1346 if timeout is None: 1347 # This shouldn't happen as SharedLock.acquire(timeout=None) is 1348 # blocking. 1349 raise errors.LockError("Failed to get lock %s (set %s)" % 1350 (lname, self.name)) 1351 1352 raise _AcquireTimeout() 1353 1354 try: 1355 # now the lock cannot be deleted, we have it! 1356 self._add_owned(name=lname) 1357 acquired.add(lname) 1358 1359 except: 1360 # We shouldn't have problems adding the lock to the owners list, but 1361 # if we did we'll try to release this lock and re-raise exception. 1362 # Of course something is going to be really wrong after this. 1363 if lock.is_owned(): 1364 lock.release() 1365 raise 1366 1367 except: 1368 # Release all owned locks 1369 self._release_and_delete_owned() 1370 raise 1371 1372 return acquired
1373
1374 - def downgrade(self, names=None):
1375 """Downgrade a set of resource locks from exclusive to shared mode. 1376 1377 The locks must have been acquired in exclusive mode. 1378 1379 """ 1380 assert self.is_owned(), ("downgrade on lockset %s while not owning any" 1381 " lock" % self.name) 1382 1383 # Support passing in a single resource to downgrade rather than many 1384 if isinstance(names, basestring): 1385 names = [names] 1386 1387 owned = self.list_owned() 1388 1389 if names is None: 1390 names = owned 1391 else: 1392 names = set(names) 1393 assert owned.issuperset(names), \ 1394 ("downgrade() on unheld resources %s (set %s)" % 1395 (names.difference(owned), self.name)) 1396 1397 for lockname in names: 1398 self.__lockdict[lockname].downgrade() 1399 1400 # Do we own the lockset in exclusive mode? 1401 if self.__lock.is_owned(shared=0): 1402 # Have all locks been downgraded? 1403 if not compat.any(lock.is_owned(shared=0) 1404 for lock in self.__lockdict.values()): 1405 self.__lock.downgrade() 1406 assert self.__lock.is_owned(shared=1) 1407 1408 return True
1409
1410 - def release(self, names=None):
1411 """Release a set of resource locks, at the same level. 1412 1413 You must have acquired the locks, either in shared or in exclusive mode, 1414 before releasing them. 1415 1416 @type names: list of strings, or None 1417 @param names: the names of the locks which shall be released 1418 (defaults to all the locks acquired at that level). 1419 1420 """ 1421 assert self.is_owned(), ("release() on lock set %s while not owner" % 1422 self.name) 1423 1424 # Support passing in a single resource to release rather than many 1425 if isinstance(names, basestring): 1426 names = [names] 1427 1428 if names is None: 1429 names = self.list_owned() 1430 else: 1431 names = set(names) 1432 assert self.list_owned().issuperset(names), ( 1433 "release() on unheld resources %s (set %s)" % 1434 (names.difference(self.list_owned()), self.name)) 1435 1436 # First of all let's release the "all elements" lock, if set. 1437 # After this 'add' can work again 1438 if self.__lock.is_owned(): 1439 self.__lock.release() 1440 self._del_owned() 1441 1442 for lockname in names: 1443 # If we are sure the lock doesn't leave __lockdict without being 1444 # exclusively held we can do this... 1445 self.__lockdict[lockname].release() 1446 self._del_owned(name=lockname)
1447
1448 - def add(self, names, acquired=0, shared=0):
1449 """Add a new set of elements to the set 1450 1451 @type names: list of strings 1452 @param names: names of the new elements to add 1453 @type acquired: integer (0/1) used as a boolean 1454 @param acquired: pre-acquire the new resource? 1455 @type shared: integer (0/1) used as a boolean 1456 @param shared: is the pre-acquisition shared? 1457 1458 """ 1459 # Check we don't already own locks at this level 1460 assert not self.is_owned() or self.__lock.is_owned(shared=0), \ 1461 ("Cannot add locks if the set %s is only partially owned, or shared" % 1462 self.name) 1463 1464 # Support passing in a single resource to add rather than many 1465 if isinstance(names, basestring): 1466 names = [names] 1467 1468 # If we don't already own the set-level lock acquired in an exclusive way 1469 # we'll get it and note we need to release it later. 1470 release_lock = False 1471 if not self.__lock.is_owned(): 1472 release_lock = True 1473 self.__lock.acquire() 1474 1475 try: 1476 invalid_names = set(self.__names()).intersection(names) 1477 if invalid_names: 1478 # This must be an explicit raise, not an assert, because assert is 1479 # turned off when using optimization, and this can happen because of 1480 # concurrency even if the user doesn't want it. 1481 raise errors.LockError("duplicate add(%s) on lockset %s" % 1482 (invalid_names, self.name)) 1483 1484 for lockname in names: 1485 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) 1486 1487 if acquired: 1488 # No need for priority or timeout here as this lock has just been 1489 # created 1490 lock.acquire(shared=shared) 1491 # now the lock cannot be deleted, we have it! 1492 try: 1493 self._add_owned(name=lockname) 1494 except: 1495 # We shouldn't have problems adding the lock to the owners list, 1496 # but if we did we'll try to release this lock and re-raise 1497 # exception. Of course something is going to be really wrong, 1498 # after this. On the other hand the lock hasn't been added to the 1499 # __lockdict yet so no other threads should be pending on it. This 1500 # release is just a safety measure. 1501 lock.release() 1502 raise 1503 1504 self.__lockdict[lockname] = lock 1505 1506 finally: 1507 # Only release __lock if we were not holding it previously. 1508 if release_lock: 1509 self.__lock.release() 1510 1511 return True
1512
1513 - def remove(self, names):
1514 """Remove elements from the lock set. 1515 1516 You can either not hold anything in the lockset or already hold a superset 1517 of the elements you want to delete, exclusively. 1518 1519 @type names: list of strings 1520 @param names: names of the resource to remove. 1521 1522 @return: a list of locks which we removed; the list is always 1523 equal to the names list if we were holding all the locks 1524 exclusively 1525 1526 """ 1527 # Support passing in a single resource to remove rather than many 1528 if isinstance(names, basestring): 1529 names = [names] 1530 1531 # If we own any subset of this lock it must be a superset of what we want 1532 # to delete. The ownership must also be exclusive, but that will be checked 1533 # by the lock itself. 1534 assert not self.is_owned() or self.list_owned().issuperset(names), ( 1535 "remove() on acquired lockset %s while not owning all elements" % 1536 self.name) 1537 1538 removed = [] 1539 1540 for lname in names: 1541 # Calling delete() acquires the lock exclusively if we don't already own 1542 # it, and causes all pending and subsequent lock acquires to fail. It's 1543 # fine to call it out of order because delete() also implies release(), 1544 # and the assertion above guarantees that if we either already hold 1545 # everything we want to delete, or we hold none. 1546 try: 1547 self.__lockdict[lname].delete() 1548 removed.append(lname) 1549 except (KeyError, errors.LockError): 1550 # This cannot happen if we were already holding it, verify: 1551 assert not self.is_owned(), ("remove failed while holding lockset %s" % 1552 self.name) 1553 else: 1554 # If no LockError was raised we are the ones who deleted the lock. 1555 # This means we can safely remove it from lockdict, as any further or 1556 # pending delete() or acquire() will fail (and nobody can have the lock 1557 # since before our call to delete()). 1558 # 1559 # This is done in an else clause because if the exception was thrown 1560 # it's the job of the one who actually deleted it. 1561 del self.__lockdict[lname] 1562 # And let's remove it from our private list if we owned it. 1563 if self.is_owned(): 1564 self._del_owned(name=lname) 1565 1566 return removed
1567 1568 1569 # Locking levels, must be acquired in increasing order. Current rules are: 1570 # - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 1571 # acquired before performing any operation, either in shared or exclusive 1572 # mode. Acquiring the BGL in exclusive mode is discouraged and should be 1573 # avoided.. 1574 # - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If 1575 # you need more than one node, or more than one instance, acquire them at the 1576 # same time. 1577 # - LEVEL_NODE_RES is for node resources and should be used by operations with 1578 # possibly high impact on the node's disks. 1579 # - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster 1580 # ("NAL" is the only lock at this level). It should be acquired in shared 1581 # mode when an opcode blocks all or a significant amount of a cluster's 1582 # locks. Opcodes doing instance allocations should acquire in exclusive mode. 1583 # Once the set of acquired locks for an opcode has been reduced to the working 1584 # set, the NAL should be released as well to allow allocations to proceed. 1585 (LEVEL_CLUSTER, 1586 LEVEL_INSTANCE, 1587 LEVEL_NODE_ALLOC, 1588 LEVEL_NODEGROUP, 1589 LEVEL_NODE, 1590 LEVEL_NODE_RES, 1591 LEVEL_NETWORK) = range(0, 7) 1592 1593 LEVELS = [ 1594 LEVEL_CLUSTER, 1595 LEVEL_INSTANCE, 1596 LEVEL_NODE_ALLOC, 1597 LEVEL_NODEGROUP, 1598 LEVEL_NODE, 1599 LEVEL_NODE_RES, 1600 LEVEL_NETWORK, 1601 ] 1602 1603 # Lock levels which are modifiable 1604 LEVELS_MOD = compat.UniqueFrozenset([ 1605 LEVEL_NODE_RES, 1606 LEVEL_NODE, 1607 LEVEL_NODEGROUP, 1608 LEVEL_INSTANCE, 1609 LEVEL_NETWORK, 1610 ]) 1611 1612 #: Lock level names (make sure to use singular form) 1613 LEVEL_NAMES = { 1614 LEVEL_CLUSTER: "cluster", 1615 LEVEL_INSTANCE: "instance", 1616 LEVEL_NODE_ALLOC: "node-alloc", 1617 LEVEL_NODEGROUP: "nodegroup", 1618 LEVEL_NODE: "node", 1619 LEVEL_NODE_RES: "node-res", 1620 LEVEL_NETWORK: "network", 1621 } 1622 1623 # Constant for the big ganeti lock 1624 BGL = "BGL" 1625 1626 #: Node allocation lock 1627 NAL = "NAL"
1628 1629 1630 -class GanetiLockManager(object):
1631 """The Ganeti Locking Library 1632 1633 The purpose of this small library is to manage locking for ganeti clusters 1634 in a central place, while at the same time doing dynamic checks against 1635 possible deadlocks. It will also make it easier to transition to a different 1636 lock type should we migrate away from python threads. 1637 1638 """ 1639 _instance = None 1640
1641 - def __init__(self, node_uuids, nodegroups, instance_names, networks):
1642 """Constructs a new GanetiLockManager object. 1643 1644 There should be only a GanetiLockManager object at any time, so this 1645 function raises an error if this is not the case. 1646 1647 @param node_uuids: list of node UUIDs 1648 @param nodegroups: list of nodegroup uuids 1649 @param instance_names: list of instance names 1650 1651 """ 1652 assert self.__class__._instance is None, \ 1653 "double GanetiLockManager instance" 1654 1655 self.__class__._instance = self 1656 1657 self._monitor = LockMonitor() 1658 1659 # The keyring contains all the locks, at their level and in the correct 1660 # locking order. 1661 self.__keyring = { 1662 LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor), 1663 LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor), 1664 LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor), 1665 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor), 1666 LEVEL_INSTANCE: LockSet(instance_names, "instance", 1667 monitor=self._monitor), 1668 LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor), 1669 LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor), 1670 } 1671 1672 assert compat.all(ls.name == LEVEL_NAMES[level] 1673 for (level, ls) in self.__keyring.items()), \ 1674 "Keyring name mismatch"
1675
1676 - def AddToLockMonitor(self, provider):
1677 """Registers a new lock with the monitor. 1678 1679 See L{LockMonitor.RegisterLock}. 1680 1681 """ 1682 return self._monitor.RegisterLock(provider)
1683
1684 - def QueryLocks(self, fields):
1685 """Queries information from all locks. 1686 1687 See L{LockMonitor.QueryLocks}. 1688 1689 """ 1690 return self._monitor.QueryLocks(fields)
1691
1692 - def _names(self, level):
1693 """List the lock names at the given level. 1694 1695 This can be used for debugging/testing purposes. 1696 1697 @param level: the level whose list of locks to get 1698 1699 """ 1700 assert level in LEVELS, "Invalid locking level %s" % level 1701 return self.__keyring[level]._names()
1702
1703 - def is_owned(self, level):
1704 """Check whether we are owning locks at the given level 1705 1706 """ 1707 return self.__keyring[level].is_owned()
1708
1709 - def list_owned(self, level):
1710 """Get the set of owned locks at the given level 1711 1712 """ 1713 return self.__keyring[level].list_owned()
1714
1715 - def check_owned(self, level, names, shared=-1):
1716 """Check if locks at a certain level are owned in a specific mode. 1717 1718 @see: L{LockSet.check_owned} 1719 1720 """ 1721 return self.__keyring[level].check_owned(names, shared=shared)
1722
1723 - def owning_all(self, level):
1724 """Checks whether current thread owns all locks at a certain level. 1725 1726 @see: L{LockSet.owning_all} 1727 1728 """ 1729 return self.__keyring[level].owning_all()
1730
1731 - def _upper_owned(self, level):
1732 """Check that we don't own any lock at a level greater than the given one. 1733 1734 """ 1735 # This way of checking only works if LEVELS[i] = i, which we check for in 1736 # the test cases. 1737 return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1738
1739 - def _BGL_owned(self): # pylint: disable=C0103
1740 """Check if the current thread owns the BGL. 1741 1742 Both an exclusive or a shared acquisition work. 1743 1744 """ 1745 return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1746 1747 @staticmethod
1748 - def _contains_BGL(level, names): # pylint: disable=C0103
1749 """Check if the level contains the BGL. 1750 1751 Check if acting on the given level and set of names will change 1752 the status of the Big Ganeti Lock. 1753 1754 """ 1755 return level == LEVEL_CLUSTER and (names is None or BGL in names) 1756
1757 - def acquire(self, level, names, timeout=None, shared=0, priority=None, 1758 opportunistic=False):
1759 """Acquire a set of resource locks, at the same level. 1760 1761 @type level: member of locking.LEVELS 1762 @param level: the level at which the locks shall be acquired 1763 @type names: list of strings (or string) 1764 @param names: the names of the locks which shall be acquired 1765 (special lock names, or instance/node names) 1766 @type shared: integer (0/1) used as a boolean 1767 @param shared: whether to acquire in shared mode; by default 1768 an exclusive lock will be acquired 1769 @type timeout: float 1770 @param timeout: Maximum time to acquire all locks 1771 @type priority: integer 1772 @param priority: Priority for acquiring lock 1773 @type opportunistic: boolean 1774 @param opportunistic: Acquire locks opportunistically; use the return value 1775 to determine which locks were actually acquired 1776 1777 """ 1778 assert level in LEVELS, "Invalid locking level %s" % level 1779 1780 # Check that we are either acquiring the Big Ganeti Lock or we already own 1781 # it. Some "legacy" opcodes need to be sure they are run non-concurrently 1782 # so even if we've migrated we need to at least share the BGL to be 1783 # compatible with them. Of course if we own the BGL exclusively there's no 1784 # point in acquiring any other lock, unless perhaps we are half way through 1785 # the migration of the current opcode. 1786 assert (self._contains_BGL(level, names) or self._BGL_owned()), ( 1787 "You must own the Big Ganeti Lock before acquiring any other") 1788 1789 # Check we don't own locks at the same or upper levels. 1790 assert not self._upper_owned(level), ("Cannot acquire locks at a level" 1791 " while owning some at a greater one") 1792 1793 # Acquire the locks in the set. 1794 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, 1795 priority=priority, 1796 opportunistic=opportunistic)
1797
1798 - def downgrade(self, level, names=None):
1799 """Downgrade a set of resource locks from exclusive to shared mode. 1800 1801 You must have acquired the locks in exclusive mode. 1802 1803 @type level: member of locking.LEVELS 1804 @param level: the level at which the locks shall be downgraded 1805 @type names: list of strings, or None 1806 @param names: the names of the locks which shall be downgraded 1807 (defaults to all the locks acquired at the level) 1808 1809 """ 1810 assert level in LEVELS, "Invalid locking level %s" % level 1811 1812 return self.__keyring[level].downgrade(names=names)
1813
1814 - def release(self, level, names=None):
1815 """Release a set of resource locks, at the same level. 1816 1817 You must have acquired the locks, either in shared or in exclusive 1818 mode, before releasing them. 1819 1820 @type level: member of locking.LEVELS 1821 @param level: the level at which the locks shall be released 1822 @type names: list of strings, or None 1823 @param names: the names of the locks which shall be released 1824 (defaults to all the locks acquired at that level) 1825 1826 """ 1827 assert level in LEVELS, "Invalid locking level %s" % level 1828 assert (not self._contains_BGL(level, names) or 1829 not self._upper_owned(LEVEL_CLUSTER)), ( 1830 "Cannot release the Big Ganeti Lock while holding something" 1831 " at upper levels (%r)" % 1832 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i)) 1833 for i in self.__keyring.keys()]), )) 1834 1835 # Release will complain if we don't own the locks already 1836 return self.__keyring[level].release(names)
1837
1838 - def add(self, level, names, acquired=0, shared=0):
1839 """Add locks at the specified level. 1840 1841 @type level: member of locking.LEVELS_MOD 1842 @param level: the level at which the locks shall be added 1843 @type names: list of strings 1844 @param names: names of the locks to acquire 1845 @type acquired: integer (0/1) used as a boolean 1846 @param acquired: whether to acquire the newly added locks 1847 @type shared: integer (0/1) used as a boolean 1848 @param shared: whether the acquisition will be shared 1849 1850 """ 1851 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1852 assert self._BGL_owned(), ("You must own the BGL before performing other" 1853 " operations") 1854 assert not self._upper_owned(level), ("Cannot add locks at a level" 1855 " while owning some at a greater one") 1856 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1857
1858 - def remove(self, level, names):
1859 """Remove locks from the specified level. 1860 1861 You must either already own the locks you are trying to remove 1862 exclusively or not own any lock at an upper level. 1863 1864 @type level: member of locking.LEVELS_MOD 1865 @param level: the level at which the locks shall be removed 1866 @type names: list of strings 1867 @param names: the names of the locks which shall be removed 1868 (special lock names, or instance/node names) 1869 1870 """ 1871 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1872 assert self._BGL_owned(), ("You must own the BGL before performing other" 1873 " operations") 1874 # Check we either own the level or don't own anything from here 1875 # up. LockSet.remove() will check the case in which we don't own 1876 # all the needed resources, or we have a shared ownership. 1877 assert self.is_owned(level) or not self._upper_owned(level), ( 1878 "Cannot remove locks at a level while not owning it or" 1879 " owning some at a greater one") 1880 return self.__keyring[level].remove(names)
1881
1882 1883 -def _MonitorSortKey((item, idx, num)):
1884 """Sorting key function. 1885 1886 Sort by name, registration order and then order of information. This provides 1887 a stable sort order over different providers, even if they return the same 1888 name. 1889 1890 """ 1891 (name, _, _, _) = item 1892 1893 return (utils.NiceSortKey(name), num, idx)
1894
1895 1896 -class LockMonitor(object):
1897 _LOCK_ATTR = "_lock" 1898
1899 - def __init__(self):
1900 """Initializes this class. 1901 1902 """ 1903 self._lock = SharedLock("LockMonitor") 1904 1905 # Counter for stable sorting 1906 self._counter = itertools.count(0) 1907 1908 # Tracked locks. Weak references are used to avoid issues with circular 1909 # references and deletion. 1910 self._locks = weakref.WeakKeyDictionary()
1911 1912 @ssynchronized(_LOCK_ATTR)
1913 - def RegisterLock(self, provider):
1914 """Registers a new lock. 1915 1916 @param provider: Object with a callable method named C{GetLockInfo}, taking 1917 a single C{set} containing the requested information items 1918 @note: It would be nicer to only receive the function generating the 1919 requested information but, as it turns out, weak references to bound 1920 methods (e.g. C{self.GetLockInfo}) are tricky; there are several 1921 workarounds, but none of the ones I found works properly in combination 1922 with a standard C{WeakKeyDictionary} 1923 1924 """ 1925 assert provider not in self._locks, "Duplicate registration" 1926 1927 # There used to be a check for duplicate names here. As it turned out, when 1928 # a lock is re-created with the same name in a very short timeframe, the 1929 # previous instance might not yet be removed from the weakref dictionary. 1930 # By keeping track of the order of incoming registrations, a stable sort 1931 # ordering can still be guaranteed. 1932 1933 self._locks[provider] = self._counter.next()
1934
1935 - def _GetLockInfo(self, requested):
1936 """Get information from all locks. 1937 1938 """ 1939 # Must hold lock while getting consistent list of tracked items 1940 self._lock.acquire(shared=1) 1941 try: 1942 items = self._locks.items() 1943 finally: 1944 self._lock.release() 1945 1946 return [(info, idx, num) 1947 for (provider, num) in items 1948 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1949
1950 - def _Query(self, fields):
1951 """Queries information from all locks. 1952 1953 @type fields: list of strings 1954 @param fields: List of fields to return 1955 1956 """ 1957 qobj = query.Query(query.LOCK_FIELDS, fields) 1958 1959 # Get all data with internal lock held and then sort by name and incoming 1960 # order 1961 lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), 1962 key=_MonitorSortKey) 1963 1964 # Extract lock information and build query data 1965 return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1966
1967 - def QueryLocks(self, fields):
1968 """Queries information from all locks. 1969 1970 @type fields: list of strings 1971 @param fields: List of fields to return 1972 1973 """ 1974 (qobj, ctx) = self._Query(fields) 1975 1976 # Prepare query response 1977 return query.GetQueryResponse(qobj, ctx)
1978