Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

  1  # 
  2  # 
  3   
  4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
  5  # All rights reserved. 
  6  # 
  7  # Redistribution and use in source and binary forms, with or without 
  8  # modification, are permitted provided that the following conditions are 
  9  # met: 
 10  # 
 11  # 1. Redistributions of source code must retain the above copyright notice, 
 12  # this list of conditions and the following disclaimer. 
 13  # 
 14  # 2. Redistributions in binary form must reproduce the above copyright 
 15  # notice, this list of conditions and the following disclaimer in the 
 16  # documentation and/or other materials provided with the distribution. 
 17  # 
 18  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 
 19  # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
 20  # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
 21  # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
 22  # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
 23  # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
 24  # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
 25  # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
 26  # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
 27  # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
 28  # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
 29   
 30  """Module implementing the Ganeti locking code.""" 
 31   
 32  # pylint: disable=W0212 
 33   
 34  # W0212 since e.g. LockSet methods use (a lot) the internals of 
 35  # SharedLock 
 36   
 37  import os 
 38  import select 
 39  import threading 
 40  import errno 
 41  import logging 
 42  import heapq 
 43  import time 
 44   
 45  from ganeti import errors 
 46  from ganeti import utils 
 47  from ganeti import compat 
 48  from ganeti import query 
 49   
 50   
 51  _EXCLUSIVE_TEXT = "exclusive" 
 52  _SHARED_TEXT = "shared" 
 53  _DELETED_TEXT = "deleted" 
 54   
 55  _DEFAULT_PRIORITY = 0 
 56   
 57  #: Minimum timeout required to consider scheduling a pending acquisition 
 58  #: (seconds) 
 59  _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000) 
 60   
 61   
62 -def ssynchronized(mylock, shared=0):
63 """Shared Synchronization decorator. 64 65 Calls the function holding the given lock, either in exclusive or shared 66 mode. It requires the passed lock to be a SharedLock (or support its 67 semantics). 68 69 @type mylock: lockable object or string 70 @param mylock: lock to acquire or class member name of the lock to acquire 71 72 """ 73 def wrap(fn): 74 def sync_function(*args, **kwargs): 75 if isinstance(mylock, basestring): 76 assert args, "cannot ssynchronize on non-class method: self not found" 77 # args[0] is "self" 78 lock = getattr(args[0], mylock) 79 else: 80 lock = mylock 81 lock.acquire(shared=shared) 82 try: 83 return fn(*args, **kwargs) 84 finally: 85 lock.release()
86 return sync_function 87 return wrap 88 89
90 -class _SingleNotifyPipeConditionWaiter(object):
91 """Helper class for SingleNotifyPipeCondition 92 93 """ 94 __slots__ = [ 95 "_fd", 96 ] 97
98 - def __init__(self, fd):
99 """Constructor for _SingleNotifyPipeConditionWaiter 100 101 @type fd: int 102 @param fd: File descriptor to wait for 103 104 """ 105 object.__init__(self) 106 self._fd = fd
107
108 - def __call__(self, timeout):
109 """Wait for something to happen on the pipe. 110 111 @type timeout: float or None 112 @param timeout: Timeout for waiting (can be None) 113 114 """ 115 running_timeout = utils.RunningTimeout(timeout, True) 116 poller = select.poll() 117 poller.register(self._fd, select.POLLHUP) 118 119 while True: 120 remaining_time = running_timeout.Remaining() 121 122 if remaining_time is not None: 123 if remaining_time < 0.0: 124 break 125 126 # Our calculation uses seconds, poll() wants milliseconds 127 remaining_time *= 1000 128 129 try: 130 result = poller.poll(remaining_time) 131 except EnvironmentError, err: 132 if err.errno != errno.EINTR: 133 raise 134 result = None 135 136 # Check whether we were notified 137 if result and result[0][0] == self._fd: 138 break
139 140
141 -class _BaseCondition(object):
142 """Base class containing common code for conditions. 143 144 Some of this code is taken from python's threading module. 145 146 """ 147 __slots__ = [ 148 "_lock", 149 "acquire", 150 "release", 151 "_is_owned", 152 "_acquire_restore", 153 "_release_save", 154 ] 155
156 - def __init__(self, lock):
157 """Constructor for _BaseCondition. 158 159 @type lock: threading.Lock 160 @param lock: condition base lock 161 162 """ 163 object.__init__(self) 164 165 try: 166 self._release_save = lock._release_save 167 except AttributeError: 168 self._release_save = self._base_release_save 169 try: 170 self._acquire_restore = lock._acquire_restore 171 except AttributeError: 172 self._acquire_restore = self._base_acquire_restore 173 try: 174 self._is_owned = lock.is_owned 175 except AttributeError: 176 self._is_owned = self._base_is_owned 177 178 self._lock = lock 179 180 # Export the lock's acquire() and release() methods 181 self.acquire = lock.acquire 182 self.release = lock.release
183
184 - def _base_is_owned(self):
185 """Check whether lock is owned by current thread. 186 187 """ 188 if self._lock.acquire(0): 189 self._lock.release() 190 return False 191 return True
192
193 - def _base_release_save(self):
194 self._lock.release()
195
196 - def _base_acquire_restore(self, _):
197 self._lock.acquire()
198
199 - def _check_owned(self):
200 """Raise an exception if the current thread doesn't own the lock. 201 202 """ 203 if not self._is_owned(): 204 raise RuntimeError("cannot work with un-aquired lock")
205 206
207 -class SingleNotifyPipeCondition(_BaseCondition):
208 """Condition which can only be notified once. 209 210 This condition class uses pipes and poll, internally, to be able to wait for 211 notification with a timeout, without resorting to polling. It is almost 212 compatible with Python's threading.Condition, with the following differences: 213 - notifyAll can only be called once, and no wait can happen after that 214 - notify is not supported, only notifyAll 215 216 """ 217 218 __slots__ = [ 219 "_read_fd", 220 "_write_fd", 221 "_nwaiters", 222 "_notified", 223 ] 224 225 _waiter_class = _SingleNotifyPipeConditionWaiter 226
227 - def __init__(self, lock):
228 """Constructor for SingleNotifyPipeCondition 229 230 """ 231 _BaseCondition.__init__(self, lock) 232 self._nwaiters = 0 233 self._notified = False 234 self._read_fd = None 235 self._write_fd = None
236
237 - def _check_unnotified(self):
238 """Throws an exception if already notified. 239 240 """ 241 if self._notified: 242 raise RuntimeError("cannot use already notified condition")
243
244 - def _Cleanup(self):
245 """Cleanup open file descriptors, if any. 246 247 """ 248 if self._read_fd is not None: 249 os.close(self._read_fd) 250 self._read_fd = None 251 252 if self._write_fd is not None: 253 os.close(self._write_fd) 254 self._write_fd = None
255
256 - def wait(self, timeout):
257 """Wait for a notification. 258 259 @type timeout: float or None 260 @param timeout: Waiting timeout (can be None) 261 262 """ 263 self._check_owned() 264 self._check_unnotified() 265 266 self._nwaiters += 1 267 try: 268 if self._read_fd is None: 269 (self._read_fd, self._write_fd) = os.pipe() 270 271 wait_fn = self._waiter_class(self._read_fd) 272 state = self._release_save() 273 try: 274 # Wait for notification 275 wait_fn(timeout) 276 finally: 277 # Re-acquire lock 278 self._acquire_restore(state) 279 finally: 280 self._nwaiters -= 1 281 if self._nwaiters == 0: 282 self._Cleanup()
283
284 - def notifyAll(self): # pylint: disable=C0103
285 """Close the writing side of the pipe to notify all waiters. 286 287 """ 288 self._check_owned() 289 self._check_unnotified() 290 self._notified = True 291 if self._write_fd is not None: 292 os.close(self._write_fd) 293 self._write_fd = None
294 295
296 -class PipeCondition(_BaseCondition):
297 """Group-only non-polling condition with counters. 298 299 This condition class uses pipes and poll, internally, to be able to wait for 300 notification with a timeout, without resorting to polling. It is almost 301 compatible with Python's threading.Condition, but only supports notifyAll and 302 non-recursive locks. As an additional features it's able to report whether 303 there are any waiting threads. 304 305 """ 306 __slots__ = [ 307 "_waiters", 308 "_single_condition", 309 ] 310 311 _single_condition_class = SingleNotifyPipeCondition 312
313 - def __init__(self, lock):
314 """Initializes this class. 315 316 """ 317 _BaseCondition.__init__(self, lock) 318 self._waiters = set() 319 self._single_condition = self._single_condition_class(self._lock)
320
321 - def wait(self, timeout):
322 """Wait for a notification. 323 324 @type timeout: float or None 325 @param timeout: Waiting timeout (can be None) 326 327 """ 328 self._check_owned() 329 330 # Keep local reference to the pipe. It could be replaced by another thread 331 # notifying while we're waiting. 332 cond = self._single_condition 333 334 self._waiters.add(threading.currentThread()) 335 try: 336 cond.wait(timeout) 337 finally: 338 self._check_owned() 339 self._waiters.remove(threading.currentThread())
340
341 - def notifyAll(self): # pylint: disable=C0103
342 """Notify all currently waiting threads. 343 344 """ 345 self._check_owned() 346 self._single_condition.notifyAll() 347 self._single_condition = self._single_condition_class(self._lock)
348
349 - def get_waiting(self):
350 """Returns a list of all waiting threads. 351 352 """ 353 self._check_owned() 354 355 return self._waiters
356
357 - def has_waiting(self):
358 """Returns whether there are active waiters. 359 360 """ 361 self._check_owned() 362 363 return bool(self._waiters)
364
365 - def __repr__(self):
366 return ("<%s.%s waiters=%s at %#x>" % 367 (self.__class__.__module__, self.__class__.__name__, 368 self._waiters, id(self)))
369 370
371 -class _PipeConditionWithMode(PipeCondition):
372 __slots__ = [ 373 "shared", 374 ] 375
376 - def __init__(self, lock, shared):
377 """Initializes this class. 378 379 """ 380 self.shared = shared 381 PipeCondition.__init__(self, lock)
382 383
384 -class SharedLock(object):
385 """Implements a shared lock. 386 387 Multiple threads can acquire the lock in a shared way by calling 388 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way 389 threads can call C{acquire(shared=0)}. 390 391 Notes on data structures: C{__pending} contains a priority queue (heapq) of 392 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), 393 ...]}. Each per-priority queue contains a normal in-order list of conditions 394 to be notified when the lock can be acquired. Shared locks are grouped 395 together by priority and the condition for them is stored in 396 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps 397 references for the per-priority queues indexed by priority for faster access. 398 399 @type name: string 400 @ivar name: the name of the lock 401 402 """ 403 __slots__ = [ 404 "__weakref__", 405 "__deleted", 406 "__exc", 407 "__lock", 408 "__pending", 409 "__pending_by_prio", 410 "__pending_shared", 411 "__shr", 412 "__time_fn", 413 "name", 414 ] 415 416 __condition_class = _PipeConditionWithMode 417
418 - def __init__(self, name, monitor=None, _time_fn=time.time):
419 """Construct a new SharedLock. 420 421 @param name: the name of the lock 422 @param monitor: Lock monitor with which to register 423 424 """ 425 object.__init__(self) 426 427 self.name = name 428 429 # Used for unittesting 430 self.__time_fn = _time_fn 431 432 # Internal lock 433 self.__lock = threading.Lock() 434 435 # Queue containing waiting acquires 436 self.__pending = [] 437 self.__pending_by_prio = {} 438 self.__pending_shared = {} 439 440 # Current lock holders 441 self.__shr = set() 442 self.__exc = None 443 444 # is this lock in the deleted state? 445 self.__deleted = False 446 447 # Register with lock monitor 448 if monitor: 449 logging.debug("Adding lock %s to monitor", name) 450 monitor.RegisterLock(self)
451
452 - def __repr__(self):
453 return ("<%s.%s name=%s at %#x>" % 454 (self.__class__.__module__, self.__class__.__name__, 455 self.name, id(self)))
456
457 - def GetLockInfo(self, requested):
458 """Retrieves information for querying locks. 459 460 @type requested: set 461 @param requested: Requested information, see C{query.LQ_*} 462 463 """ 464 self.__lock.acquire() 465 try: 466 # Note: to avoid unintentional race conditions, no references to 467 # modifiable objects should be returned unless they were created in this 468 # function. 469 mode = None 470 owner_names = None 471 472 if query.LQ_MODE in requested: 473 if self.__deleted: 474 mode = _DELETED_TEXT 475 assert not (self.__exc or self.__shr) 476 elif self.__exc: 477 mode = _EXCLUSIVE_TEXT 478 elif self.__shr: 479 mode = _SHARED_TEXT 480 481 # Current owner(s) are wanted 482 if query.LQ_OWNER in requested: 483 if self.__exc: 484 owner = [self.__exc] 485 else: 486 owner = self.__shr 487 488 if owner: 489 assert not self.__deleted 490 owner_names = [i.getName() for i in owner] 491 492 # Pending acquires are wanted 493 if query.LQ_PENDING in requested: 494 pending = [] 495 496 # Sorting instead of copying and using heaq functions for simplicity 497 for (_, prioqueue) in sorted(self.__pending): 498 for cond in prioqueue: 499 if cond.shared: 500 pendmode = _SHARED_TEXT 501 else: 502 pendmode = _EXCLUSIVE_TEXT 503 504 # List of names will be sorted in L{query._GetLockPending} 505 pending.append((pendmode, [i.getName() 506 for i in cond.get_waiting()])) 507 else: 508 pending = None 509 510 return [(self.name, mode, owner_names, pending)] 511 finally: 512 self.__lock.release()
513
514 - def __check_deleted(self):
515 """Raises an exception if the lock has been deleted. 516 517 """ 518 if self.__deleted: 519 raise errors.LockError("Deleted lock %s" % self.name)
520
521 - def __is_sharer(self):
522 """Is the current thread sharing the lock at this time? 523 524 """ 525 return threading.currentThread() in self.__shr
526
527 - def __is_exclusive(self):
528 """Is the current thread holding the lock exclusively at this time? 529 530 """ 531 return threading.currentThread() == self.__exc
532
533 - def __is_owned(self, shared=-1):
534 """Is the current thread somehow owning the lock at this time? 535 536 This is a private version of the function, which presumes you're holding 537 the internal lock. 538 539 """ 540 if shared < 0: 541 return self.__is_sharer() or self.__is_exclusive() 542 elif shared: 543 return self.__is_sharer() 544 else: 545 return self.__is_exclusive()
546
547 - def is_owned(self, shared=-1):
548 """Is the current thread somehow owning the lock at this time? 549 550 @param shared: 551 - < 0: check for any type of ownership (default) 552 - 0: check for exclusive ownership 553 - > 0: check for shared ownership 554 555 """ 556 self.__lock.acquire() 557 try: 558 return self.__is_owned(shared=shared) 559 finally: 560 self.__lock.release()
561 562 #: Necessary to remain compatible with threading.Condition, which tries to 563 #: retrieve a locks' "_is_owned" attribute 564 _is_owned = is_owned 565
566 - def _count_pending(self):
567 """Returns the number of pending acquires. 568 569 @rtype: int 570 571 """ 572 self.__lock.acquire() 573 try: 574 return sum(len(prioqueue) for (_, prioqueue) in self.__pending) 575 finally: 576 self.__lock.release()
577
578 - def _check_empty(self):
579 """Checks whether there are any pending acquires. 580 581 @rtype: bool 582 583 """ 584 self.__lock.acquire() 585 try: 586 # Order is important: __find_first_pending_queue modifies __pending 587 (_, prioqueue) = self.__find_first_pending_queue() 588 589 return not (prioqueue or 590 self.__pending or 591 self.__pending_by_prio or 592 self.__pending_shared) 593 finally: 594 self.__lock.release()
595
596 - def __do_acquire(self, shared):
597 """Actually acquire the lock. 598 599 """ 600 if shared: 601 self.__shr.add(threading.currentThread()) 602 else: 603 self.__exc = threading.currentThread()
604
605 - def __can_acquire(self, shared):
606 """Determine whether lock can be acquired. 607 608 """ 609 if shared: 610 return self.__exc is None 611 else: 612 return len(self.__shr) == 0 and self.__exc is None
613
615 """Tries to find the topmost queued entry with pending acquires. 616 617 Removes empty entries while going through the list. 618 619 """ 620 while self.__pending: 621 (priority, prioqueue) = self.__pending[0] 622 623 if prioqueue: 624 return (priority, prioqueue) 625 626 # Remove empty queue 627 heapq.heappop(self.__pending) 628 del self.__pending_by_prio[priority] 629 assert priority not in self.__pending_shared 630 631 return (None, None)
632
633 - def __is_on_top(self, cond):
634 """Checks whether the passed condition is on top of the queue. 635 636 The caller must make sure the queue isn't empty. 637 638 """ 639 (_, prioqueue) = self.__find_first_pending_queue() 640 641 return cond == prioqueue[0]
642
643 - def __acquire_unlocked(self, shared, timeout, priority):
644 """Acquire a shared lock. 645 646 @param shared: whether to acquire in shared mode; by default an 647 exclusive lock will be acquired 648 @param timeout: maximum waiting time before giving up 649 @type priority: integer 650 @param priority: Priority for acquiring lock 651 652 """ 653 self.__check_deleted() 654 655 # We cannot acquire the lock if we already have it 656 assert not self.__is_owned(), ("double acquire() on a non-recursive lock" 657 " %s" % self.name) 658 659 # Remove empty entries from queue 660 self.__find_first_pending_queue() 661 662 # Check whether someone else holds the lock or there are pending acquires. 663 if not self.__pending and self.__can_acquire(shared): 664 # Apparently not, can acquire lock directly. 665 self.__do_acquire(shared) 666 return True 667 668 # The lock couldn't be acquired right away, so if a timeout is given and is 669 # considered too short, return right away as scheduling a pending 670 # acquisition is quite expensive 671 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT: 672 return False 673 674 prioqueue = self.__pending_by_prio.get(priority, None) 675 676 if shared: 677 # Try to re-use condition for shared acquire 678 wait_condition = self.__pending_shared.get(priority, None) 679 assert (wait_condition is None or 680 (wait_condition.shared and wait_condition in prioqueue)) 681 else: 682 wait_condition = None 683 684 if wait_condition is None: 685 if prioqueue is None: 686 assert priority not in self.__pending_by_prio 687 688 prioqueue = [] 689 heapq.heappush(self.__pending, (priority, prioqueue)) 690 self.__pending_by_prio[priority] = prioqueue 691 692 wait_condition = self.__condition_class(self.__lock, shared) 693 prioqueue.append(wait_condition) 694 695 if shared: 696 # Keep reference for further shared acquires on same priority. This is 697 # better than trying to find it in the list of pending acquires. 698 assert priority not in self.__pending_shared 699 self.__pending_shared[priority] = wait_condition 700 701 wait_start = self.__time_fn() 702 acquired = False 703 704 try: 705 # Wait until we become the topmost acquire in the queue or the timeout 706 # expires. 707 while True: 708 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 709 self.__do_acquire(shared) 710 acquired = True 711 break 712 713 # A lot of code assumes blocking acquires always succeed, therefore we 714 # can never return False for a blocking acquire 715 if (timeout is not None and 716 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): 717 break 718 719 # Wait for notification 720 wait_condition.wait(timeout) 721 self.__check_deleted() 722 finally: 723 # Remove condition from queue if there are no more waiters 724 if not wait_condition.has_waiting(): 725 prioqueue.remove(wait_condition) 726 if wait_condition.shared: 727 # Remove from list of shared acquires if it wasn't while releasing 728 # (e.g. on lock deletion) 729 self.__pending_shared.pop(priority, None) 730 731 return acquired
732
733 - def acquire(self, shared=0, timeout=None, priority=None, 734 test_notify=None):
735 """Acquire a shared lock. 736 737 @type shared: integer (0/1) used as a boolean 738 @param shared: whether to acquire in shared mode; by default an 739 exclusive lock will be acquired 740 @type timeout: float 741 @param timeout: maximum waiting time before giving up 742 @type priority: integer 743 @param priority: Priority for acquiring lock 744 @type test_notify: callable or None 745 @param test_notify: Special callback function for unittesting 746 747 """ 748 if priority is None: 749 priority = _DEFAULT_PRIORITY 750 751 self.__lock.acquire() 752 try: 753 # We already got the lock, notify now 754 if __debug__ and callable(test_notify): 755 test_notify() 756 757 return self.__acquire_unlocked(shared, timeout, priority) 758 finally: 759 self.__lock.release()
760
761 - def downgrade(self):
762 """Changes the lock mode from exclusive to shared. 763 764 Pending acquires in shared mode on the same priority will go ahead. 765 766 """ 767 self.__lock.acquire() 768 try: 769 assert self.__is_owned(), "Lock must be owned" 770 771 if self.__is_exclusive(): 772 # Do nothing if the lock is already acquired in shared mode 773 self.__exc = None 774 self.__do_acquire(1) 775 776 # Important: pending shared acquires should only jump ahead if there 777 # was a transition from exclusive to shared, otherwise an owner of a 778 # shared lock can keep calling this function to push incoming shared 779 # acquires 780 (priority, prioqueue) = self.__find_first_pending_queue() 781 if prioqueue: 782 # Is there a pending shared acquire on this priority? 783 cond = self.__pending_shared.pop(priority, None) 784 if cond: 785 assert cond.shared 786 assert cond in prioqueue 787 788 # Ensure shared acquire is on top of queue 789 if len(prioqueue) > 1: 790 prioqueue.remove(cond) 791 prioqueue.insert(0, cond) 792 793 # Notify 794 cond.notifyAll() 795 796 assert not self.__is_exclusive() 797 assert self.__is_sharer() 798 799 return True 800 finally: 801 self.__lock.release()
802
803 - def release(self):
804 """Release a Shared Lock. 805 806 You must have acquired the lock, either in shared or in exclusive mode, 807 before calling this function. 808 809 """ 810 self.__lock.acquire() 811 try: 812 assert self.__is_exclusive() or self.__is_sharer(), \ 813 "Cannot release non-owned lock" 814 815 # Autodetect release type 816 if self.__is_exclusive(): 817 self.__exc = None 818 notify = True 819 else: 820 self.__shr.remove(threading.currentThread()) 821 notify = not self.__shr 822 823 # Notify topmost condition in queue if there are no owners left (for 824 # shared locks) 825 if notify: 826 self.__notify_topmost() 827 finally: 828 self.__lock.release()
829
830 - def __notify_topmost(self):
831 """Notifies topmost condition in queue of pending acquires. 832 833 """ 834 (priority, prioqueue) = self.__find_first_pending_queue() 835 if prioqueue: 836 cond = prioqueue[0] 837 cond.notifyAll() 838 if cond.shared: 839 # Prevent further shared acquires from sneaking in while waiters are 840 # notified 841 self.__pending_shared.pop(priority, None)
842
843 - def _notify_topmost(self):
844 """Exported version of L{__notify_topmost}. 845 846 """ 847 self.__lock.acquire() 848 try: 849 return self.__notify_topmost() 850 finally: 851 self.__lock.release()
852
853 - def delete(self, timeout=None, priority=None):
854 """Delete a Shared Lock. 855 856 This operation will declare the lock for removal. First the lock will be 857 acquired in exclusive mode if you don't already own it, then the lock 858 will be put in a state where any future and pending acquire() fail. 859 860 @type timeout: float 861 @param timeout: maximum waiting time before giving up 862 @type priority: integer 863 @param priority: Priority for acquiring lock 864 865 """ 866 if priority is None: 867 priority = _DEFAULT_PRIORITY 868 869 self.__lock.acquire() 870 try: 871 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 872 873 self.__check_deleted() 874 875 # The caller is allowed to hold the lock exclusively already. 876 acquired = self.__is_exclusive() 877 878 if not acquired: 879 acquired = self.__acquire_unlocked(0, timeout, priority) 880 881 if acquired: 882 assert self.__is_exclusive() and not self.__is_sharer(), \ 883 "Lock wasn't acquired in exclusive mode" 884 885 self.__deleted = True 886 self.__exc = None 887 888 assert not (self.__exc or self.__shr), "Found owner during deletion" 889 890 # Notify all acquires. They'll throw an error. 891 for (_, prioqueue) in self.__pending: 892 for cond in prioqueue: 893 cond.notifyAll() 894 895 assert self.__deleted 896 897 return acquired 898 finally: 899 self.__lock.release()
900
901 - def _release_save(self):
902 shared = self.__is_sharer() 903 self.release() 904 return shared
905
906 - def _acquire_restore(self, shared):
907 self.acquire(shared=shared)
908 909 910 # Whenever we want to acquire a full LockSet we pass None as the value 911 # to acquire. Hide this behind this nicely named constant. 912 ALL_SET = None 913 914 LOCKSET_NAME = "[lockset]" 915 916
917 -def _TimeoutZero():
918 """Returns the number zero. 919 920 """ 921 return 0
922 923
924 -class _AcquireTimeout(Exception):
925 """Internal exception to abort an acquire on a timeout. 926 927 """
928 929 930 # Locking levels, must be acquired in increasing order. Current rules are: 931 # - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 932 # acquired before performing any operation, either in shared or exclusive 933 # mode. Acquiring the BGL in exclusive mode is discouraged and should be 934 # avoided.. 935 # - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If 936 # you need more than one node, or more than one instance, acquire them at the 937 # same time. 938 # - LEVEL_NODE_RES is for node resources and should be used by operations with 939 # possibly high impact on the node's disks. 940 (LEVEL_CLUSTER, 941 LEVEL_INSTANCE, 942 LEVEL_NODEGROUP, 943 LEVEL_NODE, 944 LEVEL_NODE_RES, 945 LEVEL_NETWORK) = range(0, 6) 946 947 LEVELS = [ 948 LEVEL_CLUSTER, 949 LEVEL_INSTANCE, 950 LEVEL_NODEGROUP, 951 LEVEL_NODE, 952 LEVEL_NODE_RES, 953 LEVEL_NETWORK, 954 ] 955 956 # Lock levels which are modifiable 957 LEVELS_MOD = compat.UniqueFrozenset([ 958 LEVEL_NODE_RES, 959 LEVEL_NODE, 960 LEVEL_NODEGROUP, 961 LEVEL_INSTANCE, 962 LEVEL_NETWORK, 963 ]) 964 965 #: Lock level names (make sure to use singular form) 966 LEVEL_NAMES = { 967 LEVEL_CLUSTER: "cluster", 968 LEVEL_INSTANCE: "instance", 969 LEVEL_NODEGROUP: "nodegroup", 970 LEVEL_NODE: "node", 971 LEVEL_NODE_RES: "node-res", 972 LEVEL_NETWORK: "network", 973 } 974 975 # Constant for the big ganeti lock 976 BGL = "BGL" 977