Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21  """Module implementing the Ganeti locking code.""" 
  22   
  23  # pylint: disable=W0212 
  24   
  25  # W0212 since e.g. LockSet methods use (a lot) the internals of 
  26  # SharedLock 
  27   
  28  import os 
  29  import select 
  30  import threading 
  31  import errno 
  32  import weakref 
  33  import logging 
  34  import heapq 
  35  import itertools 
  36   
  37  from ganeti import errors 
  38  from ganeti import utils 
  39  from ganeti import compat 
  40  from ganeti import query 
  41   
  42   
  43  _EXCLUSIVE_TEXT = "exclusive" 
  44  _SHARED_TEXT = "shared" 
  45  _DELETED_TEXT = "deleted" 
  46   
  47  _DEFAULT_PRIORITY = 0 
48 49 50 -def ssynchronized(mylock, shared=0):
51 """Shared Synchronization decorator. 52 53 Calls the function holding the given lock, either in exclusive or shared 54 mode. It requires the passed lock to be a SharedLock (or support its 55 semantics). 56 57 @type mylock: lockable object or string 58 @param mylock: lock to acquire or class member name of the lock to acquire 59 60 """ 61 def wrap(fn): 62 def sync_function(*args, **kwargs): 63 if isinstance(mylock, basestring): 64 assert args, "cannot ssynchronize on non-class method: self not found" 65 # args[0] is "self" 66 lock = getattr(args[0], mylock) 67 else: 68 lock = mylock 69 lock.acquire(shared=shared) 70 try: 71 return fn(*args, **kwargs) 72 finally: 73 lock.release()
74 return sync_function 75 return wrap 76
77 78 -class _SingleNotifyPipeConditionWaiter(object):
79 """Helper class for SingleNotifyPipeCondition 80 81 """ 82 __slots__ = [ 83 "_fd", 84 "_poller", 85 ] 86
87 - def __init__(self, poller, fd):
88 """Constructor for _SingleNotifyPipeConditionWaiter 89 90 @type poller: select.poll 91 @param poller: Poller object 92 @type fd: int 93 @param fd: File descriptor to wait for 94 95 """ 96 object.__init__(self) 97 self._poller = poller 98 self._fd = fd
99
100 - def __call__(self, timeout):
101 """Wait for something to happen on the pipe. 102 103 @type timeout: float or None 104 @param timeout: Timeout for waiting (can be None) 105 106 """ 107 running_timeout = utils.RunningTimeout(timeout, True) 108 109 while True: 110 remaining_time = running_timeout.Remaining() 111 112 if remaining_time is not None: 113 if remaining_time < 0.0: 114 break 115 116 # Our calculation uses seconds, poll() wants milliseconds 117 remaining_time *= 1000 118 119 try: 120 result = self._poller.poll(remaining_time) 121 except EnvironmentError, err: 122 if err.errno != errno.EINTR: 123 raise 124 result = None 125 126 # Check whether we were notified 127 if result and result[0][0] == self._fd: 128 break
129
130 131 -class _BaseCondition(object):
132 """Base class containing common code for conditions. 133 134 Some of this code is taken from python's threading module. 135 136 """ 137 __slots__ = [ 138 "_lock", 139 "acquire", 140 "release", 141 "_is_owned", 142 "_acquire_restore", 143 "_release_save", 144 ] 145
146 - def __init__(self, lock):
147 """Constructor for _BaseCondition. 148 149 @type lock: threading.Lock 150 @param lock: condition base lock 151 152 """ 153 object.__init__(self) 154 155 try: 156 self._release_save = lock._release_save 157 except AttributeError: 158 self._release_save = self._base_release_save 159 try: 160 self._acquire_restore = lock._acquire_restore 161 except AttributeError: 162 self._acquire_restore = self._base_acquire_restore 163 try: 164 self._is_owned = lock._is_owned 165 except AttributeError: 166 self._is_owned = self._base_is_owned 167 168 self._lock = lock 169 170 # Export the lock's acquire() and release() methods 171 self.acquire = lock.acquire 172 self.release = lock.release
173
174 - def _base_is_owned(self):
175 """Check whether lock is owned by current thread. 176 177 """ 178 if self._lock.acquire(0): 179 self._lock.release() 180 return False 181 return True
182
183 - def _base_release_save(self):
184 self._lock.release()
185
186 - def _base_acquire_restore(self, _):
187 self._lock.acquire()
188
189 - def _check_owned(self):
190 """Raise an exception if the current thread doesn't own the lock. 191 192 """ 193 if not self._is_owned(): 194 raise RuntimeError("cannot work with un-aquired lock")
195
196 197 -class SingleNotifyPipeCondition(_BaseCondition):
198 """Condition which can only be notified once. 199 200 This condition class uses pipes and poll, internally, to be able to wait for 201 notification with a timeout, without resorting to polling. It is almost 202 compatible with Python's threading.Condition, with the following differences: 203 - notifyAll can only be called once, and no wait can happen after that 204 - notify is not supported, only notifyAll 205 206 """ 207 208 __slots__ = [ 209 "_poller", 210 "_read_fd", 211 "_write_fd", 212 "_nwaiters", 213 "_notified", 214 ] 215 216 _waiter_class = _SingleNotifyPipeConditionWaiter 217
218 - def __init__(self, lock):
219 """Constructor for SingleNotifyPipeCondition 220 221 """ 222 _BaseCondition.__init__(self, lock) 223 self._nwaiters = 0 224 self._notified = False 225 self._read_fd = None 226 self._write_fd = None 227 self._poller = None
228
229 - def _check_unnotified(self):
230 """Throws an exception if already notified. 231 232 """ 233 if self._notified: 234 raise RuntimeError("cannot use already notified condition")
235
236 - def _Cleanup(self):
237 """Cleanup open file descriptors, if any. 238 239 """ 240 if self._read_fd is not None: 241 os.close(self._read_fd) 242 self._read_fd = None 243 244 if self._write_fd is not None: 245 os.close(self._write_fd) 246 self._write_fd = None 247 self._poller = None
248
249 - def wait(self, timeout):
250 """Wait for a notification. 251 252 @type timeout: float or None 253 @param timeout: Waiting timeout (can be None) 254 255 """ 256 self._check_owned() 257 self._check_unnotified() 258 259 self._nwaiters += 1 260 try: 261 if self._poller is None: 262 (self._read_fd, self._write_fd) = os.pipe() 263 self._poller = select.poll() 264 self._poller.register(self._read_fd, select.POLLHUP) 265 266 wait_fn = self._waiter_class(self._poller, self._read_fd) 267 state = self._release_save() 268 try: 269 # Wait for notification 270 wait_fn(timeout) 271 finally: 272 # Re-acquire lock 273 self._acquire_restore(state) 274 finally: 275 self._nwaiters -= 1 276 if self._nwaiters == 0: 277 self._Cleanup()
278
279 - def notifyAll(self): # pylint: disable=C0103
280 """Close the writing side of the pipe to notify all waiters. 281 282 """ 283 self._check_owned() 284 self._check_unnotified() 285 self._notified = True 286 if self._write_fd is not None: 287 os.close(self._write_fd) 288 self._write_fd = None
289
290 291 -class PipeCondition(_BaseCondition):
292 """Group-only non-polling condition with counters. 293 294 This condition class uses pipes and poll, internally, to be able to wait for 295 notification with a timeout, without resorting to polling. It is almost 296 compatible with Python's threading.Condition, but only supports notifyAll and 297 non-recursive locks. As an additional features it's able to report whether 298 there are any waiting threads. 299 300 """ 301 __slots__ = [ 302 "_waiters", 303 "_single_condition", 304 ] 305 306 _single_condition_class = SingleNotifyPipeCondition 307
308 - def __init__(self, lock):
309 """Initializes this class. 310 311 """ 312 _BaseCondition.__init__(self, lock) 313 self._waiters = set() 314 self._single_condition = self._single_condition_class(self._lock)
315
316 - def wait(self, timeout):
317 """Wait for a notification. 318 319 @type timeout: float or None 320 @param timeout: Waiting timeout (can be None) 321 322 """ 323 self._check_owned() 324 325 # Keep local reference to the pipe. It could be replaced by another thread 326 # notifying while we're waiting. 327 cond = self._single_condition 328 329 self._waiters.add(threading.currentThread()) 330 try: 331 cond.wait(timeout) 332 finally: 333 self._check_owned() 334 self._waiters.remove(threading.currentThread())
335
336 - def notifyAll(self): # pylint: disable=C0103
337 """Notify all currently waiting threads. 338 339 """ 340 self._check_owned() 341 self._single_condition.notifyAll() 342 self._single_condition = self._single_condition_class(self._lock)
343
344 - def get_waiting(self):
345 """Returns a list of all waiting threads. 346 347 """ 348 self._check_owned() 349 350 return self._waiters
351
352 - def has_waiting(self):
353 """Returns whether there are active waiters. 354 355 """ 356 self._check_owned() 357 358 return bool(self._waiters)
359
360 361 -class _PipeConditionWithMode(PipeCondition):
362 __slots__ = [ 363 "shared", 364 ] 365
366 - def __init__(self, lock, shared):
367 """Initializes this class. 368 369 """ 370 self.shared = shared 371 PipeCondition.__init__(self, lock)
372
373 374 -class SharedLock(object):
375 """Implements a shared lock. 376 377 Multiple threads can acquire the lock in a shared way by calling 378 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way 379 threads can call C{acquire(shared=0)}. 380 381 Notes on data structures: C{__pending} contains a priority queue (heapq) of 382 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), 383 ...]}. Each per-priority queue contains a normal in-order list of conditions 384 to be notified when the lock can be acquired. Shared locks are grouped 385 together by priority and the condition for them is stored in 386 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps 387 references for the per-priority queues indexed by priority for faster access. 388 389 @type name: string 390 @ivar name: the name of the lock 391 392 """ 393 __slots__ = [ 394 "__weakref__", 395 "__deleted", 396 "__exc", 397 "__lock", 398 "__pending", 399 "__pending_by_prio", 400 "__pending_shared", 401 "__shr", 402 "name", 403 ] 404 405 __condition_class = _PipeConditionWithMode 406
407 - def __init__(self, name, monitor=None):
408 """Construct a new SharedLock. 409 410 @param name: the name of the lock 411 @type monitor: L{LockMonitor} 412 @param monitor: Lock monitor with which to register 413 414 """ 415 object.__init__(self) 416 417 self.name = name 418 419 # Internal lock 420 self.__lock = threading.Lock() 421 422 # Queue containing waiting acquires 423 self.__pending = [] 424 self.__pending_by_prio = {} 425 self.__pending_shared = {} 426 427 # Current lock holders 428 self.__shr = set() 429 self.__exc = None 430 431 # is this lock in the deleted state? 432 self.__deleted = False 433 434 # Register with lock monitor 435 if monitor: 436 logging.debug("Adding lock %s to monitor", name) 437 monitor.RegisterLock(self)
438
439 - def GetLockInfo(self, requested):
440 """Retrieves information for querying locks. 441 442 @type requested: set 443 @param requested: Requested information, see C{query.LQ_*} 444 445 """ 446 self.__lock.acquire() 447 try: 448 # Note: to avoid unintentional race conditions, no references to 449 # modifiable objects should be returned unless they were created in this 450 # function. 451 mode = None 452 owner_names = None 453 454 if query.LQ_MODE in requested: 455 if self.__deleted: 456 mode = _DELETED_TEXT 457 assert not (self.__exc or self.__shr) 458 elif self.__exc: 459 mode = _EXCLUSIVE_TEXT 460 elif self.__shr: 461 mode = _SHARED_TEXT 462 463 # Current owner(s) are wanted 464 if query.LQ_OWNER in requested: 465 if self.__exc: 466 owner = [self.__exc] 467 else: 468 owner = self.__shr 469 470 if owner: 471 assert not self.__deleted 472 owner_names = [i.getName() for i in owner] 473 474 # Pending acquires are wanted 475 if query.LQ_PENDING in requested: 476 pending = [] 477 478 # Sorting instead of copying and using heaq functions for simplicity 479 for (_, prioqueue) in sorted(self.__pending): 480 for cond in prioqueue: 481 if cond.shared: 482 pendmode = _SHARED_TEXT 483 else: 484 pendmode = _EXCLUSIVE_TEXT 485 486 # List of names will be sorted in L{query._GetLockPending} 487 pending.append((pendmode, [i.getName() 488 for i in cond.get_waiting()])) 489 else: 490 pending = None 491 492 return [(self.name, mode, owner_names, pending)] 493 finally: 494 self.__lock.release()
495
496 - def __check_deleted(self):
497 """Raises an exception if the lock has been deleted. 498 499 """ 500 if self.__deleted: 501 raise errors.LockError("Deleted lock %s" % self.name)
502
503 - def __is_sharer(self):
504 """Is the current thread sharing the lock at this time? 505 506 """ 507 return threading.currentThread() in self.__shr
508
509 - def __is_exclusive(self):
510 """Is the current thread holding the lock exclusively at this time? 511 512 """ 513 return threading.currentThread() == self.__exc
514
515 - def __is_owned(self, shared=-1):
516 """Is the current thread somehow owning the lock at this time? 517 518 This is a private version of the function, which presumes you're holding 519 the internal lock. 520 521 """ 522 if shared < 0: 523 return self.__is_sharer() or self.__is_exclusive() 524 elif shared: 525 return self.__is_sharer() 526 else: 527 return self.__is_exclusive()
528
529 - def _is_owned(self, shared=-1):
530 """Is the current thread somehow owning the lock at this time? 531 532 @param shared: 533 - < 0: check for any type of ownership (default) 534 - 0: check for exclusive ownership 535 - > 0: check for shared ownership 536 537 """ 538 self.__lock.acquire() 539 try: 540 return self.__is_owned(shared=shared) 541 finally: 542 self.__lock.release()
543 544 is_owned = _is_owned 545
546 - def _count_pending(self):
547 """Returns the number of pending acquires. 548 549 @rtype: int 550 551 """ 552 self.__lock.acquire() 553 try: 554 return sum(len(prioqueue) for (_, prioqueue) in self.__pending) 555 finally: 556 self.__lock.release()
557
558 - def _check_empty(self):
559 """Checks whether there are any pending acquires. 560 561 @rtype: bool 562 563 """ 564 self.__lock.acquire() 565 try: 566 # Order is important: __find_first_pending_queue modifies __pending 567 (_, prioqueue) = self.__find_first_pending_queue() 568 569 return not (prioqueue or 570 self.__pending or 571 self.__pending_by_prio or 572 self.__pending_shared) 573 finally: 574 self.__lock.release()
575
576 - def __do_acquire(self, shared):
577 """Actually acquire the lock. 578 579 """ 580 if shared: 581 self.__shr.add(threading.currentThread()) 582 else: 583 self.__exc = threading.currentThread()
584
585 - def __can_acquire(self, shared):
586 """Determine whether lock can be acquired. 587 588 """ 589 if shared: 590 return self.__exc is None 591 else: 592 return len(self.__shr) == 0 and self.__exc is None
593
594 - def __find_first_pending_queue(self):
595 """Tries to find the topmost queued entry with pending acquires. 596 597 Removes empty entries while going through the list. 598 599 """ 600 while self.__pending: 601 (priority, prioqueue) = self.__pending[0] 602 603 if prioqueue: 604 return (priority, prioqueue) 605 606 # Remove empty queue 607 heapq.heappop(self.__pending) 608 del self.__pending_by_prio[priority] 609 assert priority not in self.__pending_shared 610 611 return (None, None)
612
613 - def __is_on_top(self, cond):
614 """Checks whether the passed condition is on top of the queue. 615 616 The caller must make sure the queue isn't empty. 617 618 """ 619 (_, prioqueue) = self.__find_first_pending_queue() 620 621 return cond == prioqueue[0]
622
623 - def __acquire_unlocked(self, shared, timeout, priority):
624 """Acquire a shared lock. 625 626 @param shared: whether to acquire in shared mode; by default an 627 exclusive lock will be acquired 628 @param timeout: maximum waiting time before giving up 629 @type priority: integer 630 @param priority: Priority for acquiring lock 631 632 """ 633 self.__check_deleted() 634 635 # We cannot acquire the lock if we already have it 636 assert not self.__is_owned(), ("double acquire() on a non-recursive lock" 637 " %s" % self.name) 638 639 # Remove empty entries from queue 640 self.__find_first_pending_queue() 641 642 # Check whether someone else holds the lock or there are pending acquires. 643 if not self.__pending and self.__can_acquire(shared): 644 # Apparently not, can acquire lock directly. 645 self.__do_acquire(shared) 646 return True 647 648 prioqueue = self.__pending_by_prio.get(priority, None) 649 650 if shared: 651 # Try to re-use condition for shared acquire 652 wait_condition = self.__pending_shared.get(priority, None) 653 assert (wait_condition is None or 654 (wait_condition.shared and wait_condition in prioqueue)) 655 else: 656 wait_condition = None 657 658 if wait_condition is None: 659 if prioqueue is None: 660 assert priority not in self.__pending_by_prio 661 662 prioqueue = [] 663 heapq.heappush(self.__pending, (priority, prioqueue)) 664 self.__pending_by_prio[priority] = prioqueue 665 666 wait_condition = self.__condition_class(self.__lock, shared) 667 prioqueue.append(wait_condition) 668 669 if shared: 670 # Keep reference for further shared acquires on same priority. This is 671 # better than trying to find it in the list of pending acquires. 672 assert priority not in self.__pending_shared 673 self.__pending_shared[priority] = wait_condition 674 675 try: 676 # Wait until we become the topmost acquire in the queue or the timeout 677 # expires. 678 # TODO: Decrease timeout with spurious notifications 679 while not (self.__is_on_top(wait_condition) and 680 self.__can_acquire(shared)): 681 # Wait for notification 682 wait_condition.wait(timeout) 683 self.__check_deleted() 684 685 # A lot of code assumes blocking acquires always succeed. Loop 686 # internally for that case. 687 if timeout is not None: 688 break 689 690 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 691 self.__do_acquire(shared) 692 return True 693 finally: 694 # Remove condition from queue if there are no more waiters 695 if not wait_condition.has_waiting(): 696 prioqueue.remove(wait_condition) 697 if wait_condition.shared: 698 # Remove from list of shared acquires if it wasn't while releasing 699 # (e.g. on lock deletion) 700 self.__pending_shared.pop(priority, None) 701 702 return False
703
704 - def acquire(self, shared=0, timeout=None, priority=None, 705 test_notify=None):
706 """Acquire a shared lock. 707 708 @type shared: integer (0/1) used as a boolean 709 @param shared: whether to acquire in shared mode; by default an 710 exclusive lock will be acquired 711 @type timeout: float 712 @param timeout: maximum waiting time before giving up 713 @type priority: integer 714 @param priority: Priority for acquiring lock 715 @type test_notify: callable or None 716 @param test_notify: Special callback function for unittesting 717 718 """ 719 if priority is None: 720 priority = _DEFAULT_PRIORITY 721 722 self.__lock.acquire() 723 try: 724 # We already got the lock, notify now 725 if __debug__ and callable(test_notify): 726 test_notify() 727 728 return self.__acquire_unlocked(shared, timeout, priority) 729 finally: 730 self.__lock.release()
731
732 - def downgrade(self):
733 """Changes the lock mode from exclusive to shared. 734 735 Pending acquires in shared mode on the same priority will go ahead. 736 737 """ 738 self.__lock.acquire() 739 try: 740 assert self.__is_owned(), "Lock must be owned" 741 742 if self.__is_exclusive(): 743 # Do nothing if the lock is already acquired in shared mode 744 self.__exc = None 745 self.__do_acquire(1) 746 747 # Important: pending shared acquires should only jump ahead if there 748 # was a transition from exclusive to shared, otherwise an owner of a 749 # shared lock can keep calling this function to push incoming shared 750 # acquires 751 (priority, prioqueue) = self.__find_first_pending_queue() 752 if prioqueue: 753 # Is there a pending shared acquire on this priority? 754 cond = self.__pending_shared.pop(priority, None) 755 if cond: 756 assert cond.shared 757 assert cond in prioqueue 758 759 # Ensure shared acquire is on top of queue 760 if len(prioqueue) > 1: 761 prioqueue.remove(cond) 762 prioqueue.insert(0, cond) 763 764 # Notify 765 cond.notifyAll() 766 767 assert not self.__is_exclusive() 768 assert self.__is_sharer() 769 770 return True 771 finally: 772 self.__lock.release()
773
774 - def release(self):
775 """Release a Shared Lock. 776 777 You must have acquired the lock, either in shared or in exclusive mode, 778 before calling this function. 779 780 """ 781 self.__lock.acquire() 782 try: 783 assert self.__is_exclusive() or self.__is_sharer(), \ 784 "Cannot release non-owned lock" 785 786 # Autodetect release type 787 if self.__is_exclusive(): 788 self.__exc = None 789 else: 790 self.__shr.remove(threading.currentThread()) 791 792 # Notify topmost condition in queue 793 (priority, prioqueue) = self.__find_first_pending_queue() 794 if prioqueue: 795 cond = prioqueue[0] 796 cond.notifyAll() 797 if cond.shared: 798 # Prevent further shared acquires from sneaking in while waiters are 799 # notified 800 self.__pending_shared.pop(priority, None) 801 802 finally: 803 self.__lock.release()
804
805 - def delete(self, timeout=None, priority=None):
806 """Delete a Shared Lock. 807 808 This operation will declare the lock for removal. First the lock will be 809 acquired in exclusive mode if you don't already own it, then the lock 810 will be put in a state where any future and pending acquire() fail. 811 812 @type timeout: float 813 @param timeout: maximum waiting time before giving up 814 @type priority: integer 815 @param priority: Priority for acquiring lock 816 817 """ 818 if priority is None: 819 priority = _DEFAULT_PRIORITY 820 821 self.__lock.acquire() 822 try: 823 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 824 825 self.__check_deleted() 826 827 # The caller is allowed to hold the lock exclusively already. 828 acquired = self.__is_exclusive() 829 830 if not acquired: 831 acquired = self.__acquire_unlocked(0, timeout, priority) 832 833 assert self.__is_exclusive() and not self.__is_sharer(), \ 834 "Lock wasn't acquired in exclusive mode" 835 836 if acquired: 837 self.__deleted = True 838 self.__exc = None 839 840 assert not (self.__exc or self.__shr), "Found owner during deletion" 841 842 # Notify all acquires. They'll throw an error. 843 for (_, prioqueue) in self.__pending: 844 for cond in prioqueue: 845 cond.notifyAll() 846 847 assert self.__deleted 848 849 return acquired 850 finally: 851 self.__lock.release()
852
853 - def _release_save(self):
854 shared = self.__is_sharer() 855 self.release() 856 return shared
857
858 - def _acquire_restore(self, shared):
859 self.acquire(shared=shared)
860 861 862 # Whenever we want to acquire a full LockSet we pass None as the value 863 # to acquire. Hide this behind this nicely named constant. 864 ALL_SET = None
865 866 867 -class _AcquireTimeout(Exception):
868 """Internal exception to abort an acquire on a timeout. 869 870 """
871
872 873 -class LockSet:
874 """Implements a set of locks. 875 876 This abstraction implements a set of shared locks for the same resource type, 877 distinguished by name. The user can lock a subset of the resources and the 878 LockSet will take care of acquiring the locks always in the same order, thus 879 preventing deadlock. 880 881 All the locks needed in the same set must be acquired together, though. 882 883 @type name: string 884 @ivar name: the name of the lockset 885 886 """
887 - def __init__(self, members, name, monitor=None):
888 """Constructs a new LockSet. 889 890 @type members: list of strings 891 @param members: initial members of the set 892 @type monitor: L{LockMonitor} 893 @param monitor: Lock monitor with which to register member locks 894 895 """ 896 assert members is not None, "members parameter is not a list" 897 self.name = name 898 899 # Lock monitor 900 self.__monitor = monitor 901 902 # Used internally to guarantee coherency 903 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor) 904 905 # The lockdict indexes the relationship name -> lock 906 # The order-of-locking is implied by the alphabetical order of names 907 self.__lockdict = {} 908 909 for mname in members: 910 self.__lockdict[mname] = SharedLock(self._GetLockName(mname), 911 monitor=monitor) 912 913 # The owner dict contains the set of locks each thread owns. For 914 # performance each thread can access its own key without a global lock on 915 # this structure. It is paramount though that *no* other type of access is 916 # done to this structure (eg. no looping over its keys). *_owner helper 917 # function are defined to guarantee access is correct, but in general never 918 # do anything different than __owners[threading.currentThread()], or there 919 # will be trouble. 920 self.__owners = {}
921
922 - def _GetLockName(self, mname):
923 """Returns the name for a member lock. 924 925 """ 926 return "%s/%s" % (self.name, mname)
927
928 - def _get_lock(self):
929 """Returns the lockset-internal lock. 930 931 """ 932 return self.__lock
933
934 - def _get_lockdict(self):
935 """Returns the lockset-internal lock dictionary. 936 937 Accessing this structure is only safe in single-thread usage or when the 938 lockset-internal lock is held. 939 940 """ 941 return self.__lockdict
942
943 - def _is_owned(self):
944 """Is the current thread a current level owner?""" 945 return threading.currentThread() in self.__owners
946
947 - def _add_owned(self, name=None):
948 """Note the current thread owns the given lock""" 949 if name is None: 950 if not self._is_owned(): 951 self.__owners[threading.currentThread()] = set() 952 else: 953 if self._is_owned(): 954 self.__owners[threading.currentThread()].add(name) 955 else: 956 self.__owners[threading.currentThread()] = set([name])
957
958 - def _del_owned(self, name=None):
959 """Note the current thread owns the given lock""" 960 961 assert not (name is None and self.__lock._is_owned()), \ 962 "Cannot hold internal lock when deleting owner status" 963 964 if name is not None: 965 self.__owners[threading.currentThread()].remove(name) 966 967 # Only remove the key if we don't hold the set-lock as well 968 if (not self.__lock._is_owned() and 969 not self.__owners[threading.currentThread()]): 970 del self.__owners[threading.currentThread()]
971
972 - def _list_owned(self):
973 """Get the set of resource names owned by the current thread""" 974 if self._is_owned(): 975 return self.__owners[threading.currentThread()].copy() 976 else: 977 return set()
978
979 - def _release_and_delete_owned(self):
980 """Release and delete all resources owned by the current thread""" 981 for lname in self._list_owned(): 982 lock = self.__lockdict[lname] 983 if lock._is_owned(): 984 lock.release() 985 self._del_owned(name=lname)
986
987 - def __names(self):
988 """Return the current set of names. 989 990 Only call this function while holding __lock and don't iterate on the 991 result after releasing the lock. 992 993 """ 994 return self.__lockdict.keys()
995
996 - def _names(self):
997 """Return a copy of the current set of elements. 998 999 Used only for debugging purposes. 1000 1001 """ 1002 # If we don't already own the set-level lock acquired 1003 # we'll get it and note we need to release it later. 1004 release_lock = False 1005 if not self.__lock._is_owned(): 1006 release_lock = True 1007 self.__lock.acquire(shared=1) 1008 try: 1009 result = self.__names() 1010 finally: 1011 if release_lock: 1012 self.__lock.release() 1013 return set(result)
1014
1015 - def acquire(self, names, timeout=None, shared=0, priority=None, 1016 test_notify=None):
1017 """Acquire a set of resource locks. 1018 1019 @type names: list of strings (or string) 1020 @param names: the names of the locks which shall be acquired 1021 (special lock names, or instance/node names) 1022 @type shared: integer (0/1) used as a boolean 1023 @param shared: whether to acquire in shared mode; by default an 1024 exclusive lock will be acquired 1025 @type timeout: float or None 1026 @param timeout: Maximum time to acquire all locks 1027 @type priority: integer 1028 @param priority: Priority for acquiring locks 1029 @type test_notify: callable or None 1030 @param test_notify: Special callback function for unittesting 1031 1032 @return: Set of all locks successfully acquired or None in case of timeout 1033 1034 @raise errors.LockError: when any lock we try to acquire has 1035 been deleted before we succeed. In this case none of the 1036 locks requested will be acquired. 1037 1038 """ 1039 assert timeout is None or timeout >= 0.0 1040 1041 # Check we don't already own locks at this level 1042 assert not self._is_owned(), ("Cannot acquire locks in the same set twice" 1043 " (lockset %s)" % self.name) 1044 1045 if priority is None: 1046 priority = _DEFAULT_PRIORITY 1047 1048 # We need to keep track of how long we spent waiting for a lock. The 1049 # timeout passed to this function is over all lock acquires. 1050 running_timeout = utils.RunningTimeout(timeout, False) 1051 1052 try: 1053 if names is not None: 1054 # Support passing in a single resource to acquire rather than many 1055 if isinstance(names, basestring): 1056 names = [names] 1057 1058 return self.__acquire_inner(names, False, shared, priority, 1059 running_timeout.Remaining, test_notify) 1060 1061 else: 1062 # If no names are given acquire the whole set by not letting new names 1063 # being added before we release, and getting the current list of names. 1064 # Some of them may then be deleted later, but we'll cope with this. 1065 # 1066 # We'd like to acquire this lock in a shared way, as it's nice if 1067 # everybody else can use the instances at the same time. If we are 1068 # acquiring them exclusively though they won't be able to do this 1069 # anyway, though, so we'll get the list lock exclusively as well in 1070 # order to be able to do add() on the set while owning it. 1071 if not self.__lock.acquire(shared=shared, priority=priority, 1072 timeout=running_timeout.Remaining()): 1073 raise _AcquireTimeout() 1074 try: 1075 # note we own the set-lock 1076 self._add_owned() 1077 1078 return self.__acquire_inner(self.__names(), True, shared, priority, 1079 running_timeout.Remaining, test_notify) 1080 except: 1081 # We shouldn't have problems adding the lock to the owners list, but 1082 # if we did we'll try to release this lock and re-raise exception. 1083 # Of course something is going to be really wrong, after this. 1084 self.__lock.release() 1085 self._del_owned() 1086 raise 1087 1088 except _AcquireTimeout: 1089 return None
1090
1091 - def __acquire_inner(self, names, want_all, shared, priority, 1092 timeout_fn, test_notify):
1093 """Inner logic for acquiring a number of locks. 1094 1095 @param names: Names of the locks to be acquired 1096 @param want_all: Whether all locks in the set should be acquired 1097 @param shared: Whether to acquire in shared mode 1098 @param timeout_fn: Function returning remaining timeout 1099 @param priority: Priority for acquiring locks 1100 @param test_notify: Special callback function for unittesting 1101 1102 """ 1103 acquire_list = [] 1104 1105 # First we look the locks up on __lockdict. We have no way of being sure 1106 # they will still be there after, but this makes it a lot faster should 1107 # just one of them be the already wrong. Using a sorted sequence to prevent 1108 # deadlocks. 1109 for lname in sorted(utils.UniqueSequence(names)): 1110 try: 1111 lock = self.__lockdict[lname] # raises KeyError if lock is not there 1112 except KeyError: 1113 if want_all: 1114 # We are acquiring all the set, it doesn't matter if this particular 1115 # element is not there anymore. 1116 continue 1117 1118 raise errors.LockError("Non-existing lock %s in set %s (it may have" 1119 " been removed)" % (lname, self.name)) 1120 1121 acquire_list.append((lname, lock)) 1122 1123 # This will hold the locknames we effectively acquired. 1124 acquired = set() 1125 1126 try: 1127 # Now acquire_list contains a sorted list of resources and locks we 1128 # want. In order to get them we loop on this (private) list and 1129 # acquire() them. We gave no real guarantee they will still exist till 1130 # this is done but .acquire() itself is safe and will alert us if the 1131 # lock gets deleted. 1132 for (lname, lock) in acquire_list: 1133 if __debug__ and callable(test_notify): 1134 test_notify_fn = lambda: test_notify(lname) 1135 else: 1136 test_notify_fn = None 1137 1138 timeout = timeout_fn() 1139 1140 try: 1141 # raises LockError if the lock was deleted 1142 acq_success = lock.acquire(shared=shared, timeout=timeout, 1143 priority=priority, 1144 test_notify=test_notify_fn) 1145 except errors.LockError: 1146 if want_all: 1147 # We are acquiring all the set, it doesn't matter if this 1148 # particular element is not there anymore. 1149 continue 1150 1151 raise errors.LockError("Non-existing lock %s in set %s (it may" 1152 " have been removed)" % (lname, self.name)) 1153 1154 if not acq_success: 1155 # Couldn't get lock or timeout occurred 1156 if timeout is None: 1157 # This shouldn't happen as SharedLock.acquire(timeout=None) is 1158 # blocking. 1159 raise errors.LockError("Failed to get lock %s (set %s)" % 1160 (lname, self.name)) 1161 1162 raise _AcquireTimeout() 1163 1164 try: 1165 # now the lock cannot be deleted, we have it! 1166 self._add_owned(name=lname) 1167 acquired.add(lname) 1168 1169 except: 1170 # We shouldn't have problems adding the lock to the owners list, but 1171 # if we did we'll try to release this lock and re-raise exception. 1172 # Of course something is going to be really wrong after this. 1173 if lock._is_owned(): 1174 lock.release() 1175 raise 1176 1177 except: 1178 # Release all owned locks 1179 self._release_and_delete_owned() 1180 raise 1181 1182 return acquired
1183
1184 - def downgrade(self, names=None):
1185 """Downgrade a set of resource locks from exclusive to shared mode. 1186 1187 The locks must have been acquired in exclusive mode. 1188 1189 """ 1190 assert self._is_owned(), ("downgrade on lockset %s while not owning any" 1191 " lock" % self.name) 1192 1193 # Support passing in a single resource to downgrade rather than many 1194 if isinstance(names, basestring): 1195 names = [names] 1196 1197 owned = self._list_owned() 1198 1199 if names is None: 1200 names = owned 1201 else: 1202 names = set(names) 1203 assert owned.issuperset(names), \ 1204 ("downgrade() on unheld resources %s (set %s)" % 1205 (names.difference(owned), self.name)) 1206 1207 for lockname in names: 1208 self.__lockdict[lockname].downgrade() 1209 1210 # Do we own the lockset in exclusive mode? 1211 if self.__lock._is_owned(shared=0): 1212 # Have all locks been downgraded? 1213 if not compat.any(lock._is_owned(shared=0) 1214 for lock in self.__lockdict.values()): 1215 self.__lock.downgrade() 1216 assert self.__lock._is_owned(shared=1) 1217 1218 return True
1219
1220 - def release(self, names=None):
1221 """Release a set of resource locks, at the same level. 1222 1223 You must have acquired the locks, either in shared or in exclusive mode, 1224 before releasing them. 1225 1226 @type names: list of strings, or None 1227 @param names: the names of the locks which shall be released 1228 (defaults to all the locks acquired at that level). 1229 1230 """ 1231 assert self._is_owned(), ("release() on lock set %s while not owner" % 1232 self.name) 1233 1234 # Support passing in a single resource to release rather than many 1235 if isinstance(names, basestring): 1236 names = [names] 1237 1238 if names is None: 1239 names = self._list_owned() 1240 else: 1241 names = set(names) 1242 assert self._list_owned().issuperset(names), ( 1243 "release() on unheld resources %s (set %s)" % 1244 (names.difference(self._list_owned()), self.name)) 1245 1246 # First of all let's release the "all elements" lock, if set. 1247 # After this 'add' can work again 1248 if self.__lock._is_owned(): 1249 self.__lock.release() 1250 self._del_owned() 1251 1252 for lockname in names: 1253 # If we are sure the lock doesn't leave __lockdict without being 1254 # exclusively held we can do this... 1255 self.__lockdict[lockname].release() 1256 self._del_owned(name=lockname)
1257
1258 - def add(self, names, acquired=0, shared=0):
1259 """Add a new set of elements to the set 1260 1261 @type names: list of strings 1262 @param names: names of the new elements to add 1263 @type acquired: integer (0/1) used as a boolean 1264 @param acquired: pre-acquire the new resource? 1265 @type shared: integer (0/1) used as a boolean 1266 @param shared: is the pre-acquisition shared? 1267 1268 """ 1269 # Check we don't already own locks at this level 1270 assert not self._is_owned() or self.__lock._is_owned(shared=0), \ 1271 ("Cannot add locks if the set %s is only partially owned, or shared" % 1272 self.name) 1273 1274 # Support passing in a single resource to add rather than many 1275 if isinstance(names, basestring): 1276 names = [names] 1277 1278 # If we don't already own the set-level lock acquired in an exclusive way 1279 # we'll get it and note we need to release it later. 1280 release_lock = False 1281 if not self.__lock._is_owned(): 1282 release_lock = True 1283 self.__lock.acquire() 1284 1285 try: 1286 invalid_names = set(self.__names()).intersection(names) 1287 if invalid_names: 1288 # This must be an explicit raise, not an assert, because assert is 1289 # turned off when using optimization, and this can happen because of 1290 # concurrency even if the user doesn't want it. 1291 raise errors.LockError("duplicate add(%s) on lockset %s" % 1292 (invalid_names, self.name)) 1293 1294 for lockname in names: 1295 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) 1296 1297 if acquired: 1298 # No need for priority or timeout here as this lock has just been 1299 # created 1300 lock.acquire(shared=shared) 1301 # now the lock cannot be deleted, we have it! 1302 try: 1303 self._add_owned(name=lockname) 1304 except: 1305 # We shouldn't have problems adding the lock to the owners list, 1306 # but if we did we'll try to release this lock and re-raise 1307 # exception. Of course something is going to be really wrong, 1308 # after this. On the other hand the lock hasn't been added to the 1309 # __lockdict yet so no other threads should be pending on it. This 1310 # release is just a safety measure. 1311 lock.release() 1312 raise 1313 1314 self.__lockdict[lockname] = lock 1315 1316 finally: 1317 # Only release __lock if we were not holding it previously. 1318 if release_lock: 1319 self.__lock.release() 1320 1321 return True
1322
1323 - def remove(self, names):
1324 """Remove elements from the lock set. 1325 1326 You can either not hold anything in the lockset or already hold a superset 1327 of the elements you want to delete, exclusively. 1328 1329 @type names: list of strings 1330 @param names: names of the resource to remove. 1331 1332 @return: a list of locks which we removed; the list is always 1333 equal to the names list if we were holding all the locks 1334 exclusively 1335 1336 """ 1337 # Support passing in a single resource to remove rather than many 1338 if isinstance(names, basestring): 1339 names = [names] 1340 1341 # If we own any subset of this lock it must be a superset of what we want 1342 # to delete. The ownership must also be exclusive, but that will be checked 1343 # by the lock itself. 1344 assert not self._is_owned() or self._list_owned().issuperset(names), ( 1345 "remove() on acquired lockset %s while not owning all elements" % 1346 self.name) 1347 1348 removed = [] 1349 1350 for lname in names: 1351 # Calling delete() acquires the lock exclusively if we don't already own 1352 # it, and causes all pending and subsequent lock acquires to fail. It's 1353 # fine to call it out of order because delete() also implies release(), 1354 # and the assertion above guarantees that if we either already hold 1355 # everything we want to delete, or we hold none. 1356 try: 1357 self.__lockdict[lname].delete() 1358 removed.append(lname) 1359 except (KeyError, errors.LockError): 1360 # This cannot happen if we were already holding it, verify: 1361 assert not self._is_owned(), ("remove failed while holding lockset %s" 1362 % self.name) 1363 else: 1364 # If no LockError was raised we are the ones who deleted the lock. 1365 # This means we can safely remove it from lockdict, as any further or 1366 # pending delete() or acquire() will fail (and nobody can have the lock 1367 # since before our call to delete()). 1368 # 1369 # This is done in an else clause because if the exception was thrown 1370 # it's the job of the one who actually deleted it. 1371 del self.__lockdict[lname] 1372 # And let's remove it from our private list if we owned it. 1373 if self._is_owned(): 1374 self._del_owned(name=lname) 1375 1376 return removed
1377 1378 1379 # Locking levels, must be acquired in increasing order. 1380 # Current rules are: 1381 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 1382 # acquired before performing any operation, either in shared or in exclusive 1383 # mode. acquiring the BGL in exclusive mode is discouraged and should be 1384 # avoided. 1385 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. 1386 # If you need more than one node, or more than one instance, acquire them at 1387 # the same time. 1388 LEVEL_CLUSTER = 0 1389 LEVEL_INSTANCE = 1 1390 LEVEL_NODEGROUP = 2 1391 LEVEL_NODE = 3 1392 1393 LEVELS = [LEVEL_CLUSTER, 1394 LEVEL_INSTANCE, 1395 LEVEL_NODEGROUP, 1396 LEVEL_NODE] 1397 1398 # Lock levels which are modifiable 1399 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE] 1400 1401 LEVEL_NAMES = { 1402 LEVEL_CLUSTER: "cluster", 1403 LEVEL_INSTANCE: "instance", 1404 LEVEL_NODEGROUP: "nodegroup", 1405 LEVEL_NODE: "node", 1406 } 1407 1408 # Constant for the big ganeti lock 1409 BGL = 'BGL'
1410 1411 1412 -class GanetiLockManager:
1413 """The Ganeti Locking Library 1414 1415 The purpose of this small library is to manage locking for ganeti clusters 1416 in a central place, while at the same time doing dynamic checks against 1417 possible deadlocks. It will also make it easier to transition to a different 1418 lock type should we migrate away from python threads. 1419 1420 """ 1421 _instance = None 1422
1423 - def __init__(self, nodes, nodegroups, instances):
1424 """Constructs a new GanetiLockManager object. 1425 1426 There should be only a GanetiLockManager object at any time, so this 1427 function raises an error if this is not the case. 1428 1429 @param nodes: list of node names 1430 @param nodegroups: list of nodegroup uuids 1431 @param instances: list of instance names 1432 1433 """ 1434 assert self.__class__._instance is None, \ 1435 "double GanetiLockManager instance" 1436 1437 self.__class__._instance = self 1438 1439 self._monitor = LockMonitor() 1440 1441 # The keyring contains all the locks, at their level and in the correct 1442 # locking order. 1443 self.__keyring = { 1444 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), 1445 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), 1446 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor), 1447 LEVEL_INSTANCE: LockSet(instances, "instances", 1448 monitor=self._monitor), 1449 }
1450
1451 - def AddToLockMonitor(self, provider):
1452 """Registers a new lock with the monitor. 1453 1454 See L{LockMonitor.RegisterLock}. 1455 1456 """ 1457 return self._monitor.RegisterLock(provider)
1458
1459 - def QueryLocks(self, fields):
1460 """Queries information from all locks. 1461 1462 See L{LockMonitor.QueryLocks}. 1463 1464 """ 1465 return self._monitor.QueryLocks(fields)
1466
1467 - def OldStyleQueryLocks(self, fields):
1468 """Queries information from all locks, returning old-style data. 1469 1470 See L{LockMonitor.OldStyleQueryLocks}. 1471 1472 """ 1473 return self._monitor.OldStyleQueryLocks(fields)
1474
1475 - def _names(self, level):
1476 """List the lock names at the given level. 1477 1478 This can be used for debugging/testing purposes. 1479 1480 @param level: the level whose list of locks to get 1481 1482 """ 1483 assert level in LEVELS, "Invalid locking level %s" % level 1484 return self.__keyring[level]._names()
1485
1486 - def _is_owned(self, level):
1487 """Check whether we are owning locks at the given level 1488 1489 """ 1490 return self.__keyring[level]._is_owned()
1491 1492 is_owned = _is_owned 1493
1494 - def _list_owned(self, level):
1495 """Get the set of owned locks at the given level 1496 1497 """ 1498 return self.__keyring[level]._list_owned()
1499 1500 list_owned = _list_owned 1501
1502 - def _upper_owned(self, level):
1503 """Check that we don't own any lock at a level greater than the given one. 1504 1505 """ 1506 # This way of checking only works if LEVELS[i] = i, which we check for in 1507 # the test cases. 1508 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1509
1510 - def _BGL_owned(self): # pylint: disable=C0103
1511 """Check if the current thread owns the BGL. 1512 1513 Both an exclusive or a shared acquisition work. 1514 1515 """ 1516 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1517 1518 @staticmethod
1519 - def _contains_BGL(level, names): # pylint: disable=C0103
1520 """Check if the level contains the BGL. 1521 1522 Check if acting on the given level and set of names will change 1523 the status of the Big Ganeti Lock. 1524 1525 """ 1526 return level == LEVEL_CLUSTER and (names is None or BGL in names) 1527
1528 - def acquire(self, level, names, timeout=None, shared=0, priority=None):
1529 """Acquire a set of resource locks, at the same level. 1530 1531 @type level: member of locking.LEVELS 1532 @param level: the level at which the locks shall be acquired 1533 @type names: list of strings (or string) 1534 @param names: the names of the locks which shall be acquired 1535 (special lock names, or instance/node names) 1536 @type shared: integer (0/1) used as a boolean 1537 @param shared: whether to acquire in shared mode; by default 1538 an exclusive lock will be acquired 1539 @type timeout: float 1540 @param timeout: Maximum time to acquire all locks 1541 @type priority: integer 1542 @param priority: Priority for acquiring lock 1543 1544 """ 1545 assert level in LEVELS, "Invalid locking level %s" % level 1546 1547 # Check that we are either acquiring the Big Ganeti Lock or we already own 1548 # it. Some "legacy" opcodes need to be sure they are run non-concurrently 1549 # so even if we've migrated we need to at least share the BGL to be 1550 # compatible with them. Of course if we own the BGL exclusively there's no 1551 # point in acquiring any other lock, unless perhaps we are half way through 1552 # the migration of the current opcode. 1553 assert (self._contains_BGL(level, names) or self._BGL_owned()), ( 1554 "You must own the Big Ganeti Lock before acquiring any other") 1555 1556 # Check we don't own locks at the same or upper levels. 1557 assert not self._upper_owned(level), ("Cannot acquire locks at a level" 1558 " while owning some at a greater one") 1559 1560 # Acquire the locks in the set. 1561 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, 1562 priority=priority)
1563
1564 - def downgrade(self, level, names=None):
1565 """Downgrade a set of resource locks from exclusive to shared mode. 1566 1567 You must have acquired the locks in exclusive mode. 1568 1569 @type level: member of locking.LEVELS 1570 @param level: the level at which the locks shall be downgraded 1571 @type names: list of strings, or None 1572 @param names: the names of the locks which shall be downgraded 1573 (defaults to all the locks acquired at the level) 1574 1575 """ 1576 assert level in LEVELS, "Invalid locking level %s" % level 1577 1578 return self.__keyring[level].downgrade(names=names)
1579
1580 - def release(self, level, names=None):
1581 """Release a set of resource locks, at the same level. 1582 1583 You must have acquired the locks, either in shared or in exclusive 1584 mode, before releasing them. 1585 1586 @type level: member of locking.LEVELS 1587 @param level: the level at which the locks shall be released 1588 @type names: list of strings, or None 1589 @param names: the names of the locks which shall be released 1590 (defaults to all the locks acquired at that level) 1591 1592 """ 1593 assert level in LEVELS, "Invalid locking level %s" % level 1594 assert (not self._contains_BGL(level, names) or 1595 not self._upper_owned(LEVEL_CLUSTER)), ( 1596 "Cannot release the Big Ganeti Lock while holding something" 1597 " at upper levels (%r)" % 1598 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) 1599 for i in self.__keyring.keys()]), )) 1600 1601 # Release will complain if we don't own the locks already 1602 return self.__keyring[level].release(names)
1603
1604 - def add(self, level, names, acquired=0, shared=0):
1605 """Add locks at the specified level. 1606 1607 @type level: member of locking.LEVELS_MOD 1608 @param level: the level at which the locks shall be added 1609 @type names: list of strings 1610 @param names: names of the locks to acquire 1611 @type acquired: integer (0/1) used as a boolean 1612 @param acquired: whether to acquire the newly added locks 1613 @type shared: integer (0/1) used as a boolean 1614 @param shared: whether the acquisition will be shared 1615 1616 """ 1617 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1618 assert self._BGL_owned(), ("You must own the BGL before performing other" 1619 " operations") 1620 assert not self._upper_owned(level), ("Cannot add locks at a level" 1621 " while owning some at a greater one") 1622 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1623
1624 - def remove(self, level, names):
1625 """Remove locks from the specified level. 1626 1627 You must either already own the locks you are trying to remove 1628 exclusively or not own any lock at an upper level. 1629 1630 @type level: member of locking.LEVELS_MOD 1631 @param level: the level at which the locks shall be removed 1632 @type names: list of strings 1633 @param names: the names of the locks which shall be removed 1634 (special lock names, or instance/node names) 1635 1636 """ 1637 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1638 assert self._BGL_owned(), ("You must own the BGL before performing other" 1639 " operations") 1640 # Check we either own the level or don't own anything from here 1641 # up. LockSet.remove() will check the case in which we don't own 1642 # all the needed resources, or we have a shared ownership. 1643 assert self._is_owned(level) or not self._upper_owned(level), ( 1644 "Cannot remove locks at a level while not owning it or" 1645 " owning some at a greater one") 1646 return self.__keyring[level].remove(names)
1647
1648 1649 -def _MonitorSortKey((item, idx, num)):
1650 """Sorting key function. 1651 1652 Sort by name, registration order and then order of information. This provides 1653 a stable sort order over different providers, even if they return the same 1654 name. 1655 1656 """ 1657 (name, _, _, _) = item 1658 1659 return (utils.NiceSortKey(name), num, idx)
1660
1661 1662 -class LockMonitor(object):
1663 _LOCK_ATTR = "_lock" 1664
1665 - def __init__(self):
1666 """Initializes this class. 1667 1668 """ 1669 self._lock = SharedLock("LockMonitor") 1670 1671 # Counter for stable sorting 1672 self._counter = itertools.count(0) 1673 1674 # Tracked locks. Weak references are used to avoid issues with circular 1675 # references and deletion. 1676 self._locks = weakref.WeakKeyDictionary()
1677 1678 @ssynchronized(_LOCK_ATTR)
1679 - def RegisterLock(self, provider):
1680 """Registers a new lock. 1681 1682 @param provider: Object with a callable method named C{GetLockInfo}, taking 1683 a single C{set} containing the requested information items 1684 @note: It would be nicer to only receive the function generating the 1685 requested information but, as it turns out, weak references to bound 1686 methods (e.g. C{self.GetLockInfo}) are tricky; there are several 1687 workarounds, but none of the ones I found works properly in combination 1688 with a standard C{WeakKeyDictionary} 1689 1690 """ 1691 assert provider not in self._locks, "Duplicate registration" 1692 1693 # There used to be a check for duplicate names here. As it turned out, when 1694 # a lock is re-created with the same name in a very short timeframe, the 1695 # previous instance might not yet be removed from the weakref dictionary. 1696 # By keeping track of the order of incoming registrations, a stable sort 1697 # ordering can still be guaranteed. 1698 1699 self._locks[provider] = self._counter.next()
1700
1701 - def _GetLockInfo(self, requested):
1702 """Get information from all locks. 1703 1704 """ 1705 # Must hold lock while getting consistent list of tracked items 1706 self._lock.acquire(shared=1) 1707 try: 1708 items = self._locks.items() 1709 finally: 1710 self._lock.release() 1711 1712 return [(info, idx, num) 1713 for (provider, num) in items 1714 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1715
1716 - def _Query(self, fields):
1717 """Queries information from all locks. 1718 1719 @type fields: list of strings 1720 @param fields: List of fields to return 1721 1722 """ 1723 qobj = query.Query(query.LOCK_FIELDS, fields) 1724 1725 # Get all data with internal lock held and then sort by name and incoming 1726 # order 1727 lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), 1728 key=_MonitorSortKey) 1729 1730 # Extract lock information and build query data 1731 return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1732
1733 - def QueryLocks(self, fields):
1734 """Queries information from all locks. 1735 1736 @type fields: list of strings 1737 @param fields: List of fields to return 1738 1739 """ 1740 (qobj, ctx) = self._Query(fields) 1741 1742 # Prepare query response 1743 return query.GetQueryResponse(qobj, ctx)
1744
1745 - def OldStyleQueryLocks(self, fields):
1746 """Queries information from all locks, returning old-style data. 1747 1748 @type fields: list of strings 1749 @param fields: List of fields to return 1750 1751 """ 1752 (qobj, ctx) = self._Query(fields) 1753 1754 return qobj.OldStyleQuery(ctx)
1755