Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21  """Module implementing the Ganeti locking code.""" 
  22   
  23  # pylint: disable=W0212 
  24   
  25  # W0212 since e.g. LockSet methods use (a lot) the internals of 
  26  # SharedLock 
  27   
  28  import os 
  29  import select 
  30  import threading 
  31  import errno 
  32  import weakref 
  33  import logging 
  34  import heapq 
  35  import itertools 
  36  import time 
  37   
  38  from ganeti import errors 
  39  from ganeti import utils 
  40  from ganeti import compat 
  41  from ganeti import query 
  42   
  43   
  44  _EXCLUSIVE_TEXT = "exclusive" 
  45  _SHARED_TEXT = "shared" 
  46  _DELETED_TEXT = "deleted" 
  47   
  48  _DEFAULT_PRIORITY = 0 
  49   
  50  #: Minimum timeout required to consider scheduling a pending acquisition 
  51  #: (seconds) 
  52  _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000) 
  53   
  54  # Internal lock acquisition modes for L{LockSet} 
  55  (_LS_ACQUIRE_EXACT, 
  56   _LS_ACQUIRE_ALL, 
  57   _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4) 
  58   
  59  _LS_ACQUIRE_MODES = compat.UniqueFrozenset([ 
  60    _LS_ACQUIRE_EXACT, 
  61    _LS_ACQUIRE_ALL, 
  62    _LS_ACQUIRE_OPPORTUNISTIC, 
  63    ]) 
64 65 66 -def ssynchronized(mylock, shared=0):
67 """Shared Synchronization decorator. 68 69 Calls the function holding the given lock, either in exclusive or shared 70 mode. It requires the passed lock to be a SharedLock (or support its 71 semantics). 72 73 @type mylock: lockable object or string 74 @param mylock: lock to acquire or class member name of the lock to acquire 75 76 """ 77 def wrap(fn): 78 def sync_function(*args, **kwargs): 79 if isinstance(mylock, basestring): 80 assert args, "cannot ssynchronize on non-class method: self not found" 81 # args[0] is "self" 82 lock = getattr(args[0], mylock) 83 else: 84 lock = mylock 85 lock.acquire(shared=shared) 86 try: 87 return fn(*args, **kwargs) 88 finally: 89 lock.release()
90 return sync_function 91 return wrap 92
93 94 -class _SingleNotifyPipeConditionWaiter(object):
95 """Helper class for SingleNotifyPipeCondition 96 97 """ 98 __slots__ = [ 99 "_fd", 100 ] 101
102 - def __init__(self, fd):
103 """Constructor for _SingleNotifyPipeConditionWaiter 104 105 @type fd: int 106 @param fd: File descriptor to wait for 107 108 """ 109 object.__init__(self) 110 self._fd = fd
111
112 - def __call__(self, timeout):
113 """Wait for something to happen on the pipe. 114 115 @type timeout: float or None 116 @param timeout: Timeout for waiting (can be None) 117 118 """ 119 running_timeout = utils.RunningTimeout(timeout, True) 120 poller = select.poll() 121 poller.register(self._fd, select.POLLHUP) 122 123 while True: 124 remaining_time = running_timeout.Remaining() 125 126 if remaining_time is not None: 127 if remaining_time < 0.0: 128 break 129 130 # Our calculation uses seconds, poll() wants milliseconds 131 remaining_time *= 1000 132 133 try: 134 result = poller.poll(remaining_time) 135 except EnvironmentError, err: 136 if err.errno != errno.EINTR: 137 raise 138 result = None 139 140 # Check whether we were notified 141 if result and result[0][0] == self._fd: 142 break
143
144 145 -class _BaseCondition(object):
146 """Base class containing common code for conditions. 147 148 Some of this code is taken from python's threading module. 149 150 """ 151 __slots__ = [ 152 "_lock", 153 "acquire", 154 "release", 155 "_is_owned", 156 "_acquire_restore", 157 "_release_save", 158 ] 159
160 - def __init__(self, lock):
161 """Constructor for _BaseCondition. 162 163 @type lock: threading.Lock 164 @param lock: condition base lock 165 166 """ 167 object.__init__(self) 168 169 try: 170 self._release_save = lock._release_save 171 except AttributeError: 172 self._release_save = self._base_release_save 173 try: 174 self._acquire_restore = lock._acquire_restore 175 except AttributeError: 176 self._acquire_restore = self._base_acquire_restore 177 try: 178 self._is_owned = lock.is_owned 179 except AttributeError: 180 self._is_owned = self._base_is_owned 181 182 self._lock = lock 183 184 # Export the lock's acquire() and release() methods 185 self.acquire = lock.acquire 186 self.release = lock.release
187
188 - def _base_is_owned(self):
189 """Check whether lock is owned by current thread. 190 191 """ 192 if self._lock.acquire(0): 193 self._lock.release() 194 return False 195 return True
196
197 - def _base_release_save(self):
198 self._lock.release()
199
200 - def _base_acquire_restore(self, _):
201 self._lock.acquire()
202
203 - def _check_owned(self):
204 """Raise an exception if the current thread doesn't own the lock. 205 206 """ 207 if not self._is_owned(): 208 raise RuntimeError("cannot work with un-aquired lock")
209
210 211 -class SingleNotifyPipeCondition(_BaseCondition):
212 """Condition which can only be notified once. 213 214 This condition class uses pipes and poll, internally, to be able to wait for 215 notification with a timeout, without resorting to polling. It is almost 216 compatible with Python's threading.Condition, with the following differences: 217 - notifyAll can only be called once, and no wait can happen after that 218 - notify is not supported, only notifyAll 219 220 """ 221 222 __slots__ = [ 223 "_read_fd", 224 "_write_fd", 225 "_nwaiters", 226 "_notified", 227 ] 228 229 _waiter_class = _SingleNotifyPipeConditionWaiter 230
231 - def __init__(self, lock):
232 """Constructor for SingleNotifyPipeCondition 233 234 """ 235 _BaseCondition.__init__(self, lock) 236 self._nwaiters = 0 237 self._notified = False 238 self._read_fd = None 239 self._write_fd = None
240
241 - def _check_unnotified(self):
242 """Throws an exception if already notified. 243 244 """ 245 if self._notified: 246 raise RuntimeError("cannot use already notified condition")
247
248 - def _Cleanup(self):
249 """Cleanup open file descriptors, if any. 250 251 """ 252 if self._read_fd is not None: 253 os.close(self._read_fd) 254 self._read_fd = None 255 256 if self._write_fd is not None: 257 os.close(self._write_fd) 258 self._write_fd = None
259
260 - def wait(self, timeout):
261 """Wait for a notification. 262 263 @type timeout: float or None 264 @param timeout: Waiting timeout (can be None) 265 266 """ 267 self._check_owned() 268 self._check_unnotified() 269 270 self._nwaiters += 1 271 try: 272 if self._read_fd is None: 273 (self._read_fd, self._write_fd) = os.pipe() 274 275 wait_fn = self._waiter_class(self._read_fd) 276 state = self._release_save() 277 try: 278 # Wait for notification 279 wait_fn(timeout) 280 finally: 281 # Re-acquire lock 282 self._acquire_restore(state) 283 finally: 284 self._nwaiters -= 1 285 if self._nwaiters == 0: 286 self._Cleanup()
287
288 - def notifyAll(self): # pylint: disable=C0103
289 """Close the writing side of the pipe to notify all waiters. 290 291 """ 292 self._check_owned() 293 self._check_unnotified() 294 self._notified = True 295 if self._write_fd is not None: 296 os.close(self._write_fd) 297 self._write_fd = None
298
299 300 -class PipeCondition(_BaseCondition):
301 """Group-only non-polling condition with counters. 302 303 This condition class uses pipes and poll, internally, to be able to wait for 304 notification with a timeout, without resorting to polling. It is almost 305 compatible with Python's threading.Condition, but only supports notifyAll and 306 non-recursive locks. As an additional features it's able to report whether 307 there are any waiting threads. 308 309 """ 310 __slots__ = [ 311 "_waiters", 312 "_single_condition", 313 ] 314 315 _single_condition_class = SingleNotifyPipeCondition 316
317 - def __init__(self, lock):
318 """Initializes this class. 319 320 """ 321 _BaseCondition.__init__(self, lock) 322 self._waiters = set() 323 self._single_condition = self._single_condition_class(self._lock)
324
325 - def wait(self, timeout):
326 """Wait for a notification. 327 328 @type timeout: float or None 329 @param timeout: Waiting timeout (can be None) 330 331 """ 332 self._check_owned() 333 334 # Keep local reference to the pipe. It could be replaced by another thread 335 # notifying while we're waiting. 336 cond = self._single_condition 337 338 self._waiters.add(threading.currentThread()) 339 try: 340 cond.wait(timeout) 341 finally: 342 self._check_owned() 343 self._waiters.remove(threading.currentThread())
344
345 - def notifyAll(self): # pylint: disable=C0103
346 """Notify all currently waiting threads. 347 348 """ 349 self._check_owned() 350 self._single_condition.notifyAll() 351 self._single_condition = self._single_condition_class(self._lock)
352
353 - def get_waiting(self):
354 """Returns a list of all waiting threads. 355 356 """ 357 self._check_owned() 358 359 return self._waiters
360
361 - def has_waiting(self):
362 """Returns whether there are active waiters. 363 364 """ 365 self._check_owned() 366 367 return bool(self._waiters)
368
369 - def __repr__(self):
370 return ("<%s.%s waiters=%s at %#x>" % 371 (self.__class__.__module__, self.__class__.__name__, 372 self._waiters, id(self)))
373
374 375 -class _PipeConditionWithMode(PipeCondition):
376 __slots__ = [ 377 "shared", 378 ] 379
380 - def __init__(self, lock, shared):
381 """Initializes this class. 382 383 """ 384 self.shared = shared 385 PipeCondition.__init__(self, lock)
386
387 388 -class SharedLock(object):
389 """Implements a shared lock. 390 391 Multiple threads can acquire the lock in a shared way by calling 392 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way 393 threads can call C{acquire(shared=0)}. 394 395 Notes on data structures: C{__pending} contains a priority queue (heapq) of 396 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), 397 ...]}. Each per-priority queue contains a normal in-order list of conditions 398 to be notified when the lock can be acquired. Shared locks are grouped 399 together by priority and the condition for them is stored in 400 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps 401 references for the per-priority queues indexed by priority for faster access. 402 403 @type name: string 404 @ivar name: the name of the lock 405 406 """ 407 __slots__ = [ 408 "__weakref__", 409 "__deleted", 410 "__exc", 411 "__lock", 412 "__pending", 413 "__pending_by_prio", 414 "__pending_shared", 415 "__shr", 416 "__time_fn", 417 "name", 418 ] 419 420 __condition_class = _PipeConditionWithMode 421
422 - def __init__(self, name, monitor=None, _time_fn=time.time):
423 """Construct a new SharedLock. 424 425 @param name: the name of the lock 426 @type monitor: L{LockMonitor} 427 @param monitor: Lock monitor with which to register 428 429 """ 430 object.__init__(self) 431 432 self.name = name 433 434 # Used for unittesting 435 self.__time_fn = _time_fn 436 437 # Internal lock 438 self.__lock = threading.Lock() 439 440 # Queue containing waiting acquires 441 self.__pending = [] 442 self.__pending_by_prio = {} 443 self.__pending_shared = {} 444 445 # Current lock holders 446 self.__shr = set() 447 self.__exc = None 448 449 # is this lock in the deleted state? 450 self.__deleted = False 451 452 # Register with lock monitor 453 if monitor: 454 logging.debug("Adding lock %s to monitor", name) 455 monitor.RegisterLock(self)
456
457 - def __repr__(self):
458 return ("<%s.%s name=%s at %#x>" % 459 (self.__class__.__module__, self.__class__.__name__, 460 self.name, id(self)))
461
462 - def GetLockInfo(self, requested):
463 """Retrieves information for querying locks. 464 465 @type requested: set 466 @param requested: Requested information, see C{query.LQ_*} 467 468 """ 469 self.__lock.acquire() 470 try: 471 # Note: to avoid unintentional race conditions, no references to 472 # modifiable objects should be returned unless they were created in this 473 # function. 474 mode = None 475 owner_names = None 476 477 if query.LQ_MODE in requested: 478 if self.__deleted: 479 mode = _DELETED_TEXT 480 assert not (self.__exc or self.__shr) 481 elif self.__exc: 482 mode = _EXCLUSIVE_TEXT 483 elif self.__shr: 484 mode = _SHARED_TEXT 485 486 # Current owner(s) are wanted 487 if query.LQ_OWNER in requested: 488 if self.__exc: 489 owner = [self.__exc] 490 else: 491 owner = self.__shr 492 493 if owner: 494 assert not self.__deleted 495 owner_names = [i.getName() for i in owner] 496 497 # Pending acquires are wanted 498 if query.LQ_PENDING in requested: 499 pending = [] 500 501 # Sorting instead of copying and using heaq functions for simplicity 502 for (_, prioqueue) in sorted(self.__pending): 503 for cond in prioqueue: 504 if cond.shared: 505 pendmode = _SHARED_TEXT 506 else: 507 pendmode = _EXCLUSIVE_TEXT 508 509 # List of names will be sorted in L{query._GetLockPending} 510 pending.append((pendmode, [i.getName() 511 for i in cond.get_waiting()])) 512 else: 513 pending = None 514 515 return [(self.name, mode, owner_names, pending)] 516 finally: 517 self.__lock.release()
518
519 - def __check_deleted(self):
520 """Raises an exception if the lock has been deleted. 521 522 """ 523 if self.__deleted: 524 raise errors.LockError("Deleted lock %s" % self.name)
525
526 - def __is_sharer(self):
527 """Is the current thread sharing the lock at this time? 528 529 """ 530 return threading.currentThread() in self.__shr
531
532 - def __is_exclusive(self):
533 """Is the current thread holding the lock exclusively at this time? 534 535 """ 536 return threading.currentThread() == self.__exc
537
538 - def __is_owned(self, shared=-1):
539 """Is the current thread somehow owning the lock at this time? 540 541 This is a private version of the function, which presumes you're holding 542 the internal lock. 543 544 """ 545 if shared < 0: 546 return self.__is_sharer() or self.__is_exclusive() 547 elif shared: 548 return self.__is_sharer() 549 else: 550 return self.__is_exclusive()
551
552 - def is_owned(self, shared=-1):
553 """Is the current thread somehow owning the lock at this time? 554 555 @param shared: 556 - < 0: check for any type of ownership (default) 557 - 0: check for exclusive ownership 558 - > 0: check for shared ownership 559 560 """ 561 self.__lock.acquire() 562 try: 563 return self.__is_owned(shared=shared) 564 finally: 565 self.__lock.release()
566 567 #: Necessary to remain compatible with threading.Condition, which tries to 568 #: retrieve a locks' "_is_owned" attribute 569 _is_owned = is_owned 570
571 - def _count_pending(self):
572 """Returns the number of pending acquires. 573 574 @rtype: int 575 576 """ 577 self.__lock.acquire() 578 try: 579 return sum(len(prioqueue) for (_, prioqueue) in self.__pending) 580 finally: 581 self.__lock.release()
582
583 - def _check_empty(self):
584 """Checks whether there are any pending acquires. 585 586 @rtype: bool 587 588 """ 589 self.__lock.acquire() 590 try: 591 # Order is important: __find_first_pending_queue modifies __pending 592 (_, prioqueue) = self.__find_first_pending_queue() 593 594 return not (prioqueue or 595 self.__pending or 596 self.__pending_by_prio or 597 self.__pending_shared) 598 finally: 599 self.__lock.release()
600
601 - def __do_acquire(self, shared):
602 """Actually acquire the lock. 603 604 """ 605 if shared: 606 self.__shr.add(threading.currentThread()) 607 else: 608 self.__exc = threading.currentThread()
609
610 - def __can_acquire(self, shared):
611 """Determine whether lock can be acquired. 612 613 """ 614 if shared: 615 return self.__exc is None 616 else: 617 return len(self.__shr) == 0 and self.__exc is None
618
619 - def __find_first_pending_queue(self):
620 """Tries to find the topmost queued entry with pending acquires. 621 622 Removes empty entries while going through the list. 623 624 """ 625 while self.__pending: 626 (priority, prioqueue) = self.__pending[0] 627 628 if prioqueue: 629 return (priority, prioqueue) 630 631 # Remove empty queue 632 heapq.heappop(self.__pending) 633 del self.__pending_by_prio[priority] 634 assert priority not in self.__pending_shared 635 636 return (None, None)
637
638 - def __is_on_top(self, cond):
639 """Checks whether the passed condition is on top of the queue. 640 641 The caller must make sure the queue isn't empty. 642 643 """ 644 (_, prioqueue) = self.__find_first_pending_queue() 645 646 return cond == prioqueue[0]
647
648 - def __acquire_unlocked(self, shared, timeout, priority):
649 """Acquire a shared lock. 650 651 @param shared: whether to acquire in shared mode; by default an 652 exclusive lock will be acquired 653 @param timeout: maximum waiting time before giving up 654 @type priority: integer 655 @param priority: Priority for acquiring lock 656 657 """ 658 self.__check_deleted() 659 660 # We cannot acquire the lock if we already have it 661 assert not self.__is_owned(), ("double acquire() on a non-recursive lock" 662 " %s" % self.name) 663 664 # Remove empty entries from queue 665 self.__find_first_pending_queue() 666 667 # Check whether someone else holds the lock or there are pending acquires. 668 if not self.__pending and self.__can_acquire(shared): 669 # Apparently not, can acquire lock directly. 670 self.__do_acquire(shared) 671 return True 672 673 # The lock couldn't be acquired right away, so if a timeout is given and is 674 # considered too short, return right away as scheduling a pending 675 # acquisition is quite expensive 676 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT: 677 return False 678 679 prioqueue = self.__pending_by_prio.get(priority, None) 680 681 if shared: 682 # Try to re-use condition for shared acquire 683 wait_condition = self.__pending_shared.get(priority, None) 684 assert (wait_condition is None or 685 (wait_condition.shared and wait_condition in prioqueue)) 686 else: 687 wait_condition = None 688 689 if wait_condition is None: 690 if prioqueue is None: 691 assert priority not in self.__pending_by_prio 692 693 prioqueue = [] 694 heapq.heappush(self.__pending, (priority, prioqueue)) 695 self.__pending_by_prio[priority] = prioqueue 696 697 wait_condition = self.__condition_class(self.__lock, shared) 698 prioqueue.append(wait_condition) 699 700 if shared: 701 # Keep reference for further shared acquires on same priority. This is 702 # better than trying to find it in the list of pending acquires. 703 assert priority not in self.__pending_shared 704 self.__pending_shared[priority] = wait_condition 705 706 wait_start = self.__time_fn() 707 acquired = False 708 709 try: 710 # Wait until we become the topmost acquire in the queue or the timeout 711 # expires. 712 while True: 713 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 714 self.__do_acquire(shared) 715 acquired = True 716 break 717 718 # A lot of code assumes blocking acquires always succeed, therefore we 719 # can never return False for a blocking acquire 720 if (timeout is not None and 721 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): 722 break 723 724 # Wait for notification 725 wait_condition.wait(timeout) 726 self.__check_deleted() 727 finally: 728 # Remove condition from queue if there are no more waiters 729 if not wait_condition.has_waiting(): 730 prioqueue.remove(wait_condition) 731 if wait_condition.shared: 732 # Remove from list of shared acquires if it wasn't while releasing 733 # (e.g. on lock deletion) 734 self.__pending_shared.pop(priority, None) 735 736 return acquired
737
738 - def acquire(self, shared=0, timeout=None, priority=None, 739 test_notify=None):
740 """Acquire a shared lock. 741 742 @type shared: integer (0/1) used as a boolean 743 @param shared: whether to acquire in shared mode; by default an 744 exclusive lock will be acquired 745 @type timeout: float 746 @param timeout: maximum waiting time before giving up 747 @type priority: integer 748 @param priority: Priority for acquiring lock 749 @type test_notify: callable or None 750 @param test_notify: Special callback function for unittesting 751 752 """ 753 if priority is None: 754 priority = _DEFAULT_PRIORITY 755 756 self.__lock.acquire() 757 try: 758 # We already got the lock, notify now 759 if __debug__ and callable(test_notify): 760 test_notify() 761 762 return self.__acquire_unlocked(shared, timeout, priority) 763 finally: 764 self.__lock.release()
765
766 - def downgrade(self):
767 """Changes the lock mode from exclusive to shared. 768 769 Pending acquires in shared mode on the same priority will go ahead. 770 771 """ 772 self.__lock.acquire() 773 try: 774 assert self.__is_owned(), "Lock must be owned" 775 776 if self.__is_exclusive(): 777 # Do nothing if the lock is already acquired in shared mode 778 self.__exc = None 779 self.__do_acquire(1) 780 781 # Important: pending shared acquires should only jump ahead if there 782 # was a transition from exclusive to shared, otherwise an owner of a 783 # shared lock can keep calling this function to push incoming shared 784 # acquires 785 (priority, prioqueue) = self.__find_first_pending_queue() 786 if prioqueue: 787 # Is there a pending shared acquire on this priority? 788 cond = self.__pending_shared.pop(priority, None) 789 if cond: 790 assert cond.shared 791 assert cond in prioqueue 792 793 # Ensure shared acquire is on top of queue 794 if len(prioqueue) > 1: 795 prioqueue.remove(cond) 796 prioqueue.insert(0, cond) 797 798 # Notify 799 cond.notifyAll() 800 801 assert not self.__is_exclusive() 802 assert self.__is_sharer() 803 804 return True 805 finally: 806 self.__lock.release()
807
808 - def release(self):
809 """Release a Shared Lock. 810 811 You must have acquired the lock, either in shared or in exclusive mode, 812 before calling this function. 813 814 """ 815 self.__lock.acquire() 816 try: 817 assert self.__is_exclusive() or self.__is_sharer(), \ 818 "Cannot release non-owned lock" 819 820 # Autodetect release type 821 if self.__is_exclusive(): 822 self.__exc = None 823 notify = True 824 else: 825 self.__shr.remove(threading.currentThread()) 826 notify = not self.__shr 827 828 # Notify topmost condition in queue if there are no owners left (for 829 # shared locks) 830 if notify: 831 self.__notify_topmost() 832 finally: 833 self.__lock.release()
834
835 - def __notify_topmost(self):
836 """Notifies topmost condition in queue of pending acquires. 837 838 """ 839 (priority, prioqueue) = self.__find_first_pending_queue() 840 if prioqueue: 841 cond = prioqueue[0] 842 cond.notifyAll() 843 if cond.shared: 844 # Prevent further shared acquires from sneaking in while waiters are 845 # notified 846 self.__pending_shared.pop(priority, None)
847
848 - def _notify_topmost(self):
849 """Exported version of L{__notify_topmost}. 850 851 """ 852 self.__lock.acquire() 853 try: 854 return self.__notify_topmost() 855 finally: 856 self.__lock.release()
857
858 - def delete(self, timeout=None, priority=None):
859 """Delete a Shared Lock. 860 861 This operation will declare the lock for removal. First the lock will be 862 acquired in exclusive mode if you don't already own it, then the lock 863 will be put in a state where any future and pending acquire() fail. 864 865 @type timeout: float 866 @param timeout: maximum waiting time before giving up 867 @type priority: integer 868 @param priority: Priority for acquiring lock 869 870 """ 871 if priority is None: 872 priority = _DEFAULT_PRIORITY 873 874 self.__lock.acquire() 875 try: 876 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 877 878 self.__check_deleted() 879 880 # The caller is allowed to hold the lock exclusively already. 881 acquired = self.__is_exclusive() 882 883 if not acquired: 884 acquired = self.__acquire_unlocked(0, timeout, priority) 885 886 if acquired: 887 assert self.__is_exclusive() and not self.__is_sharer(), \ 888 "Lock wasn't acquired in exclusive mode" 889 890 self.__deleted = True 891 self.__exc = None 892 893 assert not (self.__exc or self.__shr), "Found owner during deletion" 894 895 # Notify all acquires. They'll throw an error. 896 for (_, prioqueue) in self.__pending: 897 for cond in prioqueue: 898 cond.notifyAll() 899 900 assert self.__deleted 901 902 return acquired 903 finally: 904 self.__lock.release()
905
906 - def _release_save(self):
907 shared = self.__is_sharer() 908 self.release() 909 return shared
910
911 - def _acquire_restore(self, shared):
912 self.acquire(shared=shared)
913 914 915 # Whenever we want to acquire a full LockSet we pass None as the value 916 # to acquire. Hide this behind this nicely named constant. 917 ALL_SET = None
918 919 920 -def _TimeoutZero():
921 """Returns the number zero. 922 923 """ 924 return 0
925
926 927 -def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
928 """Determines modes and timeouts for L{LockSet.acquire}. 929 930 @type want_all: boolean 931 @param want_all: Whether all locks in set should be acquired 932 @param timeout: Timeout in seconds or C{None} 933 @param opportunistic: Whther locks should be acquired opportunistically 934 @rtype: tuple 935 @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner} 936 (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for 937 acquiring the lockset-internal lock (might be C{None}) and a function to 938 calculate the timeout for acquiring individual locks 939 940 """ 941 # Short circuit when no running timeout is needed 942 if opportunistic and not want_all: 943 assert timeout is None, "Got timeout for an opportunistic acquisition" 944 return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero) 945 946 # We need to keep track of how long we spent waiting for a lock. The 947 # timeout passed to this function is over all lock acquisitions. 948 running_timeout = utils.RunningTimeout(timeout, False) 949 950 if want_all: 951 mode = _LS_ACQUIRE_ALL 952 ls_timeout_fn = running_timeout.Remaining 953 else: 954 mode = _LS_ACQUIRE_EXACT 955 ls_timeout_fn = None 956 957 if opportunistic: 958 mode = _LS_ACQUIRE_OPPORTUNISTIC 959 timeout_fn = _TimeoutZero 960 else: 961 timeout_fn = running_timeout.Remaining 962 963 return (mode, ls_timeout_fn, timeout_fn)
964
965 966 -class _AcquireTimeout(Exception):
967 """Internal exception to abort an acquire on a timeout. 968 969 """
970
971 972 -class LockSet:
973 """Implements a set of locks. 974 975 This abstraction implements a set of shared locks for the same resource type, 976 distinguished by name. The user can lock a subset of the resources and the 977 LockSet will take care of acquiring the locks always in the same order, thus 978 preventing deadlock. 979 980 All the locks needed in the same set must be acquired together, though. 981 982 @type name: string 983 @ivar name: the name of the lockset 984 985 """
986 - def __init__(self, members, name, monitor=None):
987 """Constructs a new LockSet. 988 989 @type members: list of strings 990 @param members: initial members of the set 991 @type monitor: L{LockMonitor} 992 @param monitor: Lock monitor with which to register member locks 993 994 """ 995 assert members is not None, "members parameter is not a list" 996 self.name = name 997 998 # Lock monitor 999 self.__monitor = monitor 1000 1001 # Used internally to guarantee coherency 1002 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor) 1003 1004 # The lockdict indexes the relationship name -> lock 1005 # The order-of-locking is implied by the alphabetical order of names 1006 self.__lockdict = {} 1007 1008 for mname in members: 1009 self.__lockdict[mname] = SharedLock(self._GetLockName(mname), 1010 monitor=monitor) 1011 1012 # The owner dict contains the set of locks each thread owns. For 1013 # performance each thread can access its own key without a global lock on 1014 # this structure. It is paramount though that *no* other type of access is 1015 # done to this structure (eg. no looping over its keys). *_owner helper 1016 # function are defined to guarantee access is correct, but in general never 1017 # do anything different than __owners[threading.currentThread()], or there 1018 # will be trouble. 1019 self.__owners = {}
1020
1021 - def _GetLockName(self, mname):
1022 """Returns the name for a member lock. 1023 1024 """ 1025 return "%s/%s" % (self.name, mname)
1026
1027 - def _get_lock(self):
1028 """Returns the lockset-internal lock. 1029 1030 """ 1031 return self.__lock
1032
1033 - def _get_lockdict(self):
1034 """Returns the lockset-internal lock dictionary. 1035 1036 Accessing this structure is only safe in single-thread usage or when the 1037 lockset-internal lock is held. 1038 1039 """ 1040 return self.__lockdict
1041
1042 - def is_owned(self):
1043 """Is the current thread a current level owner? 1044 1045 @note: Use L{check_owned} to check if a specific lock is held 1046 1047 """ 1048 return threading.currentThread() in self.__owners
1049
1050 - def check_owned(self, names, shared=-1):
1051 """Check if locks are owned in a specific mode. 1052 1053 @type names: sequence or string 1054 @param names: Lock names (or a single lock name) 1055 @param shared: See L{SharedLock.is_owned} 1056 @rtype: bool 1057 @note: Use L{is_owned} to check if the current thread holds I{any} lock and 1058 L{list_owned} to get the names of all owned locks 1059 1060 """ 1061 if isinstance(names, basestring): 1062 names = [names] 1063 1064 # Avoid check if no locks are owned anyway 1065 if names and self.is_owned(): 1066 candidates = [] 1067 1068 # Gather references to all locks (in case they're deleted in the meantime) 1069 for lname in names: 1070 try: 1071 lock = self.__lockdict[lname] 1072 except KeyError: 1073 raise errors.LockError("Non-existing lock '%s' in set '%s' (it may" 1074 " have been removed)" % (lname, self.name)) 1075 else: 1076 candidates.append(lock) 1077 1078 return compat.all(lock.is_owned(shared=shared) for lock in candidates) 1079 else: 1080 return False
1081
1082 - def owning_all(self):
1083 """Checks whether current thread owns internal lock. 1084 1085 Holding the internal lock is equivalent with holding all locks in the set 1086 (the opposite does not necessarily hold as it can not be easily 1087 determined). L{add} and L{remove} require the internal lock. 1088 1089 @rtype: boolean 1090 1091 """ 1092 return self.__lock.is_owned()
1093
1094 - def _add_owned(self, name=None):
1095 """Note the current thread owns the given lock""" 1096 if name is None: 1097 if not self.is_owned(): 1098 self.__owners[threading.currentThread()] = set() 1099 else: 1100 if self.is_owned(): 1101 self.__owners[threading.currentThread()].add(name) 1102 else: 1103 self.__owners[threading.currentThread()] = set([name])
1104
1105 - def _del_owned(self, name=None):
1106 """Note the current thread owns the given lock""" 1107 1108 assert not (name is None and self.__lock.is_owned()), \ 1109 "Cannot hold internal lock when deleting owner status" 1110 1111 if name is not None: 1112 self.__owners[threading.currentThread()].remove(name) 1113 1114 # Only remove the key if we don't hold the set-lock as well 1115 if not (self.__lock.is_owned() or 1116 self.__owners[threading.currentThread()]): 1117 del self.__owners[threading.currentThread()]
1118
1119 - def list_owned(self):
1120 """Get the set of resource names owned by the current thread""" 1121 if self.is_owned(): 1122 return self.__owners[threading.currentThread()].copy() 1123 else: 1124 return set()
1125
1126 - def _release_and_delete_owned(self):
1127 """Release and delete all resources owned by the current thread""" 1128 for lname in self.list_owned(): 1129 lock = self.__lockdict[lname] 1130 if lock.is_owned(): 1131 lock.release() 1132 self._del_owned(name=lname)
1133
1134 - def __names(self):
1135 """Return the current set of names. 1136 1137 Only call this function while holding __lock and don't iterate on the 1138 result after releasing the lock. 1139 1140 """ 1141 return self.__lockdict.keys()
1142
1143 - def _names(self):
1144 """Return a copy of the current set of elements. 1145 1146 Used only for debugging purposes. 1147 1148 """ 1149 # If we don't already own the set-level lock acquired 1150 # we'll get it and note we need to release it later. 1151 release_lock = False 1152 if not self.__lock.is_owned(): 1153 release_lock = True 1154 self.__lock.acquire(shared=1) 1155 try: 1156 result = self.__names() 1157 finally: 1158 if release_lock: 1159 self.__lock.release() 1160 return set(result)
1161
1162 - def acquire(self, names, timeout=None, shared=0, priority=None, 1163 opportunistic=False, test_notify=None):
1164 """Acquire a set of resource locks. 1165 1166 @note: When acquiring locks opportunistically, any number of locks might 1167 actually be acquired, even zero. 1168 1169 @type names: list of strings (or string) 1170 @param names: the names of the locks which shall be acquired 1171 (special lock names, or instance/node names) 1172 @type shared: integer (0/1) used as a boolean 1173 @param shared: whether to acquire in shared mode; by default an 1174 exclusive lock will be acquired 1175 @type timeout: float or None 1176 @param timeout: Maximum time to acquire all locks; for opportunistic 1177 acquisitions, a timeout can only be given when C{names} is C{None}, in 1178 which case it is exclusively used for acquiring the L{LockSet}-internal 1179 lock; opportunistic acquisitions don't use a timeout for acquiring 1180 individual locks 1181 @type priority: integer 1182 @param priority: Priority for acquiring locks 1183 @type opportunistic: boolean 1184 @param opportunistic: Acquire locks opportunistically; use the return value 1185 to determine which locks were actually acquired 1186 @type test_notify: callable or None 1187 @param test_notify: Special callback function for unittesting 1188 1189 @return: Set of all locks successfully acquired or None in case of timeout 1190 1191 @raise errors.LockError: when any lock we try to acquire has 1192 been deleted before we succeed. In this case none of the 1193 locks requested will be acquired. 1194 1195 """ 1196 assert timeout is None or timeout >= 0.0 1197 1198 # Check we don't already own locks at this level 1199 assert not self.is_owned(), ("Cannot acquire locks in the same set twice" 1200 " (lockset %s)" % self.name) 1201 1202 if priority is None: 1203 priority = _DEFAULT_PRIORITY 1204 1205 try: 1206 if names is not None: 1207 assert timeout is None or not opportunistic, \ 1208 ("Opportunistic acquisitions can only use a timeout if no" 1209 " names are given; see docstring for details") 1210 1211 # Support passing in a single resource to acquire rather than many 1212 if isinstance(names, basestring): 1213 names = [names] 1214 1215 (mode, _, timeout_fn) = \ 1216 _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic) 1217 1218 return self.__acquire_inner(names, mode, shared, priority, 1219 timeout_fn, test_notify) 1220 1221 else: 1222 (mode, ls_timeout_fn, timeout_fn) = \ 1223 _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic) 1224 1225 # If no names are given acquire the whole set by not letting new names 1226 # being added before we release, and getting the current list of names. 1227 # Some of them may then be deleted later, but we'll cope with this. 1228 # 1229 # We'd like to acquire this lock in a shared way, as it's nice if 1230 # everybody else can use the instances at the same time. If we are 1231 # acquiring them exclusively though they won't be able to do this 1232 # anyway, though, so we'll get the list lock exclusively as well in 1233 # order to be able to do add() on the set while owning it. 1234 if not self.__lock.acquire(shared=shared, priority=priority, 1235 timeout=ls_timeout_fn()): 1236 raise _AcquireTimeout() 1237 1238 try: 1239 # note we own the set-lock 1240 self._add_owned() 1241 1242 return self.__acquire_inner(self.__names(), mode, shared, 1243 priority, timeout_fn, test_notify) 1244 except: 1245 # We shouldn't have problems adding the lock to the owners list, but 1246 # if we did we'll try to release this lock and re-raise exception. 1247 # Of course something is going to be really wrong, after this. 1248 self.__lock.release() 1249 self._del_owned() 1250 raise 1251 1252 except _AcquireTimeout: 1253 return None
1254
1255 - def __acquire_inner(self, names, mode, shared, priority, 1256 timeout_fn, test_notify):
1257 """Inner logic for acquiring a number of locks. 1258 1259 Acquisition modes: 1260 1261 - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but 1262 deleted locks can be ignored as the whole set is being acquired with 1263 its internal lock held 1264 - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired; 1265 timeouts and deleted locks are fatal 1266 - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially 1267 all within the set) which should be acquired opportunistically, that is 1268 failures are ignored 1269 1270 @param names: Names of the locks to be acquired 1271 @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES}) 1272 @param shared: Whether to acquire in shared mode 1273 @param timeout_fn: Function returning remaining timeout (C{None} for 1274 opportunistic acquisitions) 1275 @param priority: Priority for acquiring locks 1276 @param test_notify: Special callback function for unittesting 1277 1278 """ 1279 assert mode in _LS_ACQUIRE_MODES 1280 1281 acquire_list = [] 1282 1283 # First we look the locks up on __lockdict. We have no way of being sure 1284 # they will still be there after, but this makes it a lot faster should 1285 # just one of them be the already wrong. Using a sorted sequence to prevent 1286 # deadlocks. 1287 for lname in sorted(frozenset(names)): 1288 try: 1289 lock = self.__lockdict[lname] # raises KeyError if lock is not there 1290 except KeyError: 1291 # We are acquiring the whole set, it doesn't matter if this particular 1292 # element is not there anymore. If, however, only certain names should 1293 # be acquired, not finding a lock is an error. 1294 if mode == _LS_ACQUIRE_EXACT: 1295 raise errors.LockError("Lock '%s' not found in set '%s' (it may have" 1296 " been removed)" % (lname, self.name)) 1297 else: 1298 acquire_list.append((lname, lock)) 1299 1300 # This will hold the locknames we effectively acquired. 1301 acquired = set() 1302 1303 try: 1304 # Now acquire_list contains a sorted list of resources and locks we 1305 # want. In order to get them we loop on this (private) list and 1306 # acquire() them. We gave no real guarantee they will still exist till 1307 # this is done but .acquire() itself is safe and will alert us if the 1308 # lock gets deleted. 1309 for (lname, lock) in acquire_list: 1310 if __debug__ and callable(test_notify): 1311 test_notify_fn = lambda: test_notify(lname) 1312 else: 1313 test_notify_fn = None 1314 1315 timeout = timeout_fn() 1316 1317 try: 1318 # raises LockError if the lock was deleted 1319 acq_success = lock.acquire(shared=shared, timeout=timeout, 1320 priority=priority, 1321 test_notify=test_notify_fn) 1322 except errors.LockError: 1323 if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC): 1324 # We are acquiring the whole set, it doesn't matter if this 1325 # particular element is not there anymore. 1326 continue 1327 1328 raise errors.LockError("Lock '%s' not found in set '%s' (it may have" 1329 " been removed)" % (lname, self.name)) 1330 1331 if not acq_success: 1332 # Couldn't get lock or timeout occurred 1333 if mode == _LS_ACQUIRE_OPPORTUNISTIC: 1334 # Ignore timeouts on opportunistic acquisitions 1335 continue 1336 1337 if timeout is None: 1338 # This shouldn't happen as SharedLock.acquire(timeout=None) is 1339 # blocking. 1340 raise errors.LockError("Failed to get lock %s (set %s)" % 1341 (lname, self.name)) 1342 1343 raise _AcquireTimeout() 1344 1345 try: 1346 # now the lock cannot be deleted, we have it! 1347 self._add_owned(name=lname) 1348 acquired.add(lname) 1349 1350 except: 1351 # We shouldn't have problems adding the lock to the owners list, but 1352 # if we did we'll try to release this lock and re-raise exception. 1353 # Of course something is going to be really wrong after this. 1354 if lock.is_owned(): 1355 lock.release() 1356 raise 1357 1358 except: 1359 # Release all owned locks 1360 self._release_and_delete_owned() 1361 raise 1362 1363 return acquired
1364
1365 - def downgrade(self, names=None):
1366 """Downgrade a set of resource locks from exclusive to shared mode. 1367 1368 The locks must have been acquired in exclusive mode. 1369 1370 """ 1371 assert self.is_owned(), ("downgrade on lockset %s while not owning any" 1372 " lock" % self.name) 1373 1374 # Support passing in a single resource to downgrade rather than many 1375 if isinstance(names, basestring): 1376 names = [names] 1377 1378 owned = self.list_owned() 1379 1380 if names is None: 1381 names = owned 1382 else: 1383 names = set(names) 1384 assert owned.issuperset(names), \ 1385 ("downgrade() on unheld resources %s (set %s)" % 1386 (names.difference(owned), self.name)) 1387 1388 for lockname in names: 1389 self.__lockdict[lockname].downgrade() 1390 1391 # Do we own the lockset in exclusive mode? 1392 if self.__lock.is_owned(shared=0): 1393 # Have all locks been downgraded? 1394 if not compat.any(lock.is_owned(shared=0) 1395 for lock in self.__lockdict.values()): 1396 self.__lock.downgrade() 1397 assert self.__lock.is_owned(shared=1) 1398 1399 return True
1400
1401 - def release(self, names=None):
1402 """Release a set of resource locks, at the same level. 1403 1404 You must have acquired the locks, either in shared or in exclusive mode, 1405 before releasing them. 1406 1407 @type names: list of strings, or None 1408 @param names: the names of the locks which shall be released 1409 (defaults to all the locks acquired at that level). 1410 1411 """ 1412 assert self.is_owned(), ("release() on lock set %s while not owner" % 1413 self.name) 1414 1415 # Support passing in a single resource to release rather than many 1416 if isinstance(names, basestring): 1417 names = [names] 1418 1419 if names is None: 1420 names = self.list_owned() 1421 else: 1422 names = set(names) 1423 assert self.list_owned().issuperset(names), ( 1424 "release() on unheld resources %s (set %s)" % 1425 (names.difference(self.list_owned()), self.name)) 1426 1427 # First of all let's release the "all elements" lock, if set. 1428 # After this 'add' can work again 1429 if self.__lock.is_owned(): 1430 self.__lock.release() 1431 self._del_owned() 1432 1433 for lockname in names: 1434 # If we are sure the lock doesn't leave __lockdict without being 1435 # exclusively held we can do this... 1436 self.__lockdict[lockname].release() 1437 self._del_owned(name=lockname)
1438
1439 - def add(self, names, acquired=0, shared=0):
1440 """Add a new set of elements to the set 1441 1442 @type names: list of strings 1443 @param names: names of the new elements to add 1444 @type acquired: integer (0/1) used as a boolean 1445 @param acquired: pre-acquire the new resource? 1446 @type shared: integer (0/1) used as a boolean 1447 @param shared: is the pre-acquisition shared? 1448 1449 """ 1450 # Check we don't already own locks at this level 1451 assert not self.is_owned() or self.__lock.is_owned(shared=0), \ 1452 ("Cannot add locks if the set %s is only partially owned, or shared" % 1453 self.name) 1454 1455 # Support passing in a single resource to add rather than many 1456 if isinstance(names, basestring): 1457 names = [names] 1458 1459 # If we don't already own the set-level lock acquired in an exclusive way 1460 # we'll get it and note we need to release it later. 1461 release_lock = False 1462 if not self.__lock.is_owned(): 1463 release_lock = True 1464 self.__lock.acquire() 1465 1466 try: 1467 invalid_names = set(self.__names()).intersection(names) 1468 if invalid_names: 1469 # This must be an explicit raise, not an assert, because assert is 1470 # turned off when using optimization, and this can happen because of 1471 # concurrency even if the user doesn't want it. 1472 raise errors.LockError("duplicate add(%s) on lockset %s" % 1473 (invalid_names, self.name)) 1474 1475 for lockname in names: 1476 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) 1477 1478 if acquired: 1479 # No need for priority or timeout here as this lock has just been 1480 # created 1481 lock.acquire(shared=shared) 1482 # now the lock cannot be deleted, we have it! 1483 try: 1484 self._add_owned(name=lockname) 1485 except: 1486 # We shouldn't have problems adding the lock to the owners list, 1487 # but if we did we'll try to release this lock and re-raise 1488 # exception. Of course something is going to be really wrong, 1489 # after this. On the other hand the lock hasn't been added to the 1490 # __lockdict yet so no other threads should be pending on it. This 1491 # release is just a safety measure. 1492 lock.release() 1493 raise 1494 1495 self.__lockdict[lockname] = lock 1496 1497 finally: 1498 # Only release __lock if we were not holding it previously. 1499 if release_lock: 1500 self.__lock.release() 1501 1502 return True
1503
1504 - def remove(self, names):
1505 """Remove elements from the lock set. 1506 1507 You can either not hold anything in the lockset or already hold a superset 1508 of the elements you want to delete, exclusively. 1509 1510 @type names: list of strings 1511 @param names: names of the resource to remove. 1512 1513 @return: a list of locks which we removed; the list is always 1514 equal to the names list if we were holding all the locks 1515 exclusively 1516 1517 """ 1518 # Support passing in a single resource to remove rather than many 1519 if isinstance(names, basestring): 1520 names = [names] 1521 1522 # If we own any subset of this lock it must be a superset of what we want 1523 # to delete. The ownership must also be exclusive, but that will be checked 1524 # by the lock itself. 1525 assert not self.is_owned() or self.list_owned().issuperset(names), ( 1526 "remove() on acquired lockset %s while not owning all elements" % 1527 self.name) 1528 1529 removed = [] 1530 1531 for lname in names: 1532 # Calling delete() acquires the lock exclusively if we don't already own 1533 # it, and causes all pending and subsequent lock acquires to fail. It's 1534 # fine to call it out of order because delete() also implies release(), 1535 # and the assertion above guarantees that if we either already hold 1536 # everything we want to delete, or we hold none. 1537 try: 1538 self.__lockdict[lname].delete() 1539 removed.append(lname) 1540 except (KeyError, errors.LockError): 1541 # This cannot happen if we were already holding it, verify: 1542 assert not self.is_owned(), ("remove failed while holding lockset %s" % 1543 self.name) 1544 else: 1545 # If no LockError was raised we are the ones who deleted the lock. 1546 # This means we can safely remove it from lockdict, as any further or 1547 # pending delete() or acquire() will fail (and nobody can have the lock 1548 # since before our call to delete()). 1549 # 1550 # This is done in an else clause because if the exception was thrown 1551 # it's the job of the one who actually deleted it. 1552 del self.__lockdict[lname] 1553 # And let's remove it from our private list if we owned it. 1554 if self.is_owned(): 1555 self._del_owned(name=lname) 1556 1557 return removed
1558 1559 1560 # Locking levels, must be acquired in increasing order. Current rules are: 1561 # - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 1562 # acquired before performing any operation, either in shared or exclusive 1563 # mode. Acquiring the BGL in exclusive mode is discouraged and should be 1564 # avoided.. 1565 # - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If 1566 # you need more than one node, or more than one instance, acquire them at the 1567 # same time. 1568 # - LEVEL_NODE_RES is for node resources and should be used by operations with 1569 # possibly high impact on the node's disks. 1570 # - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster 1571 # ("NAL" is the only lock at this level). It should be acquired in shared 1572 # mode when an opcode blocks all or a significant amount of a cluster's 1573 # locks. Opcodes doing instance allocations should acquire in exclusive mode. 1574 # Once the set of acquired locks for an opcode has been reduced to the working 1575 # set, the NAL should be released as well to allow allocations to proceed. 1576 (LEVEL_CLUSTER, 1577 LEVEL_INSTANCE, 1578 LEVEL_NODE_ALLOC, 1579 LEVEL_NODEGROUP, 1580 LEVEL_NODE, 1581 LEVEL_NODE_RES, 1582 LEVEL_NETWORK) = range(0, 7) 1583 1584 LEVELS = [ 1585 LEVEL_CLUSTER, 1586 LEVEL_INSTANCE, 1587 LEVEL_NODE_ALLOC, 1588 LEVEL_NODEGROUP, 1589 LEVEL_NODE, 1590 LEVEL_NODE_RES, 1591 LEVEL_NETWORK, 1592 ] 1593 1594 # Lock levels which are modifiable 1595 LEVELS_MOD = compat.UniqueFrozenset([ 1596 LEVEL_NODE_RES, 1597 LEVEL_NODE, 1598 LEVEL_NODEGROUP, 1599 LEVEL_INSTANCE, 1600 LEVEL_NETWORK, 1601 ]) 1602 1603 #: Lock level names (make sure to use singular form) 1604 LEVEL_NAMES = { 1605 LEVEL_CLUSTER: "cluster", 1606 LEVEL_INSTANCE: "instance", 1607 LEVEL_NODE_ALLOC: "node-alloc", 1608 LEVEL_NODEGROUP: "nodegroup", 1609 LEVEL_NODE: "node", 1610 LEVEL_NODE_RES: "node-res", 1611 LEVEL_NETWORK: "network", 1612 } 1613 1614 # Constant for the big ganeti lock 1615 BGL = "BGL" 1616 1617 #: Node allocation lock 1618 NAL = "NAL"
1619 1620 1621 -class GanetiLockManager:
1622 """The Ganeti Locking Library 1623 1624 The purpose of this small library is to manage locking for ganeti clusters 1625 in a central place, while at the same time doing dynamic checks against 1626 possible deadlocks. It will also make it easier to transition to a different 1627 lock type should we migrate away from python threads. 1628 1629 """ 1630 _instance = None 1631
1632 - def __init__(self, node_uuids, nodegroups, instance_names, networks):
1633 """Constructs a new GanetiLockManager object. 1634 1635 There should be only a GanetiLockManager object at any time, so this 1636 function raises an error if this is not the case. 1637 1638 @param node_uuids: list of node UUIDs 1639 @param nodegroups: list of nodegroup uuids 1640 @param instance_names: list of instance names 1641 1642 """ 1643 assert self.__class__._instance is None, \ 1644 "double GanetiLockManager instance" 1645 1646 self.__class__._instance = self 1647 1648 self._monitor = LockMonitor() 1649 1650 # The keyring contains all the locks, at their level and in the correct 1651 # locking order. 1652 self.__keyring = { 1653 LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor), 1654 LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor), 1655 LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor), 1656 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor), 1657 LEVEL_INSTANCE: LockSet(instance_names, "instance", 1658 monitor=self._monitor), 1659 LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor), 1660 LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor), 1661 } 1662 1663 assert compat.all(ls.name == LEVEL_NAMES[level] 1664 for (level, ls) in self.__keyring.items()), \ 1665 "Keyring name mismatch"
1666
1667 - def AddToLockMonitor(self, provider):
1668 """Registers a new lock with the monitor. 1669 1670 See L{LockMonitor.RegisterLock}. 1671 1672 """ 1673 return self._monitor.RegisterLock(provider)
1674
1675 - def QueryLocks(self, fields):
1676 """Queries information from all locks. 1677 1678 See L{LockMonitor.QueryLocks}. 1679 1680 """ 1681 return self._monitor.QueryLocks(fields)
1682
1683 - def _names(self, level):
1684 """List the lock names at the given level. 1685 1686 This can be used for debugging/testing purposes. 1687 1688 @param level: the level whose list of locks to get 1689 1690 """ 1691 assert level in LEVELS, "Invalid locking level %s" % level 1692 return self.__keyring[level]._names()
1693
1694 - def is_owned(self, level):
1695 """Check whether we are owning locks at the given level 1696 1697 """ 1698 return self.__keyring[level].is_owned()
1699
1700 - def list_owned(self, level):
1701 """Get the set of owned locks at the given level 1702 1703 """ 1704 return self.__keyring[level].list_owned()
1705
1706 - def check_owned(self, level, names, shared=-1):
1707 """Check if locks at a certain level are owned in a specific mode. 1708 1709 @see: L{LockSet.check_owned} 1710 1711 """ 1712 return self.__keyring[level].check_owned(names, shared=shared)
1713
1714 - def owning_all(self, level):
1715 """Checks whether current thread owns all locks at a certain level. 1716 1717 @see: L{LockSet.owning_all} 1718 1719 """ 1720 return self.__keyring[level].owning_all()
1721
1722 - def _upper_owned(self, level):
1723 """Check that we don't own any lock at a level greater than the given one. 1724 1725 """ 1726 # This way of checking only works if LEVELS[i] = i, which we check for in 1727 # the test cases. 1728 return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1729
1730 - def _BGL_owned(self): # pylint: disable=C0103
1731 """Check if the current thread owns the BGL. 1732 1733 Both an exclusive or a shared acquisition work. 1734 1735 """ 1736 return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1737 1738 @staticmethod
1739 - def _contains_BGL(level, names): # pylint: disable=C0103
1740 """Check if the level contains the BGL. 1741 1742 Check if acting on the given level and set of names will change 1743 the status of the Big Ganeti Lock. 1744 1745 """ 1746 return level == LEVEL_CLUSTER and (names is None or BGL in names) 1747
1748 - def acquire(self, level, names, timeout=None, shared=0, priority=None, 1749 opportunistic=False):
1750 """Acquire a set of resource locks, at the same level. 1751 1752 @type level: member of locking.LEVELS 1753 @param level: the level at which the locks shall be acquired 1754 @type names: list of strings (or string) 1755 @param names: the names of the locks which shall be acquired 1756 (special lock names, or instance/node names) 1757 @type shared: integer (0/1) used as a boolean 1758 @param shared: whether to acquire in shared mode; by default 1759 an exclusive lock will be acquired 1760 @type timeout: float 1761 @param timeout: Maximum time to acquire all locks 1762 @type priority: integer 1763 @param priority: Priority for acquiring lock 1764 @type opportunistic: boolean 1765 @param opportunistic: Acquire locks opportunistically; use the return value 1766 to determine which locks were actually acquired 1767 1768 """ 1769 assert level in LEVELS, "Invalid locking level %s" % level 1770 1771 # Check that we are either acquiring the Big Ganeti Lock or we already own 1772 # it. Some "legacy" opcodes need to be sure they are run non-concurrently 1773 # so even if we've migrated we need to at least share the BGL to be 1774 # compatible with them. Of course if we own the BGL exclusively there's no 1775 # point in acquiring any other lock, unless perhaps we are half way through 1776 # the migration of the current opcode. 1777 assert (self._contains_BGL(level, names) or self._BGL_owned()), ( 1778 "You must own the Big Ganeti Lock before acquiring any other") 1779 1780 # Check we don't own locks at the same or upper levels. 1781 assert not self._upper_owned(level), ("Cannot acquire locks at a level" 1782 " while owning some at a greater one") 1783 1784 # Acquire the locks in the set. 1785 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, 1786 priority=priority, 1787 opportunistic=opportunistic)
1788
1789 - def downgrade(self, level, names=None):
1790 """Downgrade a set of resource locks from exclusive to shared mode. 1791 1792 You must have acquired the locks in exclusive mode. 1793 1794 @type level: member of locking.LEVELS 1795 @param level: the level at which the locks shall be downgraded 1796 @type names: list of strings, or None 1797 @param names: the names of the locks which shall be downgraded 1798 (defaults to all the locks acquired at the level) 1799 1800 """ 1801 assert level in LEVELS, "Invalid locking level %s" % level 1802 1803 return self.__keyring[level].downgrade(names=names)
1804
1805 - def release(self, level, names=None):
1806 """Release a set of resource locks, at the same level. 1807 1808 You must have acquired the locks, either in shared or in exclusive 1809 mode, before releasing them. 1810 1811 @type level: member of locking.LEVELS 1812 @param level: the level at which the locks shall be released 1813 @type names: list of strings, or None 1814 @param names: the names of the locks which shall be released 1815 (defaults to all the locks acquired at that level) 1816 1817 """ 1818 assert level in LEVELS, "Invalid locking level %s" % level 1819 assert (not self._contains_BGL(level, names) or 1820 not self._upper_owned(LEVEL_CLUSTER)), ( 1821 "Cannot release the Big Ganeti Lock while holding something" 1822 " at upper levels (%r)" % 1823 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i)) 1824 for i in self.__keyring.keys()]), )) 1825 1826 # Release will complain if we don't own the locks already 1827 return self.__keyring[level].release(names)
1828
1829 - def add(self, level, names, acquired=0, shared=0):
1830 """Add locks at the specified level. 1831 1832 @type level: member of locking.LEVELS_MOD 1833 @param level: the level at which the locks shall be added 1834 @type names: list of strings 1835 @param names: names of the locks to acquire 1836 @type acquired: integer (0/1) used as a boolean 1837 @param acquired: whether to acquire the newly added locks 1838 @type shared: integer (0/1) used as a boolean 1839 @param shared: whether the acquisition will be shared 1840 1841 """ 1842 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1843 assert self._BGL_owned(), ("You must own the BGL before performing other" 1844 " operations") 1845 assert not self._upper_owned(level), ("Cannot add locks at a level" 1846 " while owning some at a greater one") 1847 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1848
1849 - def remove(self, level, names):
1850 """Remove locks from the specified level. 1851 1852 You must either already own the locks you are trying to remove 1853 exclusively or not own any lock at an upper level. 1854 1855 @type level: member of locking.LEVELS_MOD 1856 @param level: the level at which the locks shall be removed 1857 @type names: list of strings 1858 @param names: the names of the locks which shall be removed 1859 (special lock names, or instance/node names) 1860 1861 """ 1862 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1863 assert self._BGL_owned(), ("You must own the BGL before performing other" 1864 " operations") 1865 # Check we either own the level or don't own anything from here 1866 # up. LockSet.remove() will check the case in which we don't own 1867 # all the needed resources, or we have a shared ownership. 1868 assert self.is_owned(level) or not self._upper_owned(level), ( 1869 "Cannot remove locks at a level while not owning it or" 1870 " owning some at a greater one") 1871 return self.__keyring[level].remove(names)
1872
1873 1874 -def _MonitorSortKey((item, idx, num)):
1875 """Sorting key function. 1876 1877 Sort by name, registration order and then order of information. This provides 1878 a stable sort order over different providers, even if they return the same 1879 name. 1880 1881 """ 1882 (name, _, _, _) = item 1883 1884 return (utils.NiceSortKey(name), num, idx)
1885
1886 1887 -class LockMonitor(object):
1888 _LOCK_ATTR = "_lock" 1889
1890 - def __init__(self):
1891 """Initializes this class. 1892 1893 """ 1894 self._lock = SharedLock("LockMonitor") 1895 1896 # Counter for stable sorting 1897 self._counter = itertools.count(0) 1898 1899 # Tracked locks. Weak references are used to avoid issues with circular 1900 # references and deletion. 1901 self._locks = weakref.WeakKeyDictionary()
1902 1903 @ssynchronized(_LOCK_ATTR)
1904 - def RegisterLock(self, provider):
1905 """Registers a new lock. 1906 1907 @param provider: Object with a callable method named C{GetLockInfo}, taking 1908 a single C{set} containing the requested information items 1909 @note: It would be nicer to only receive the function generating the 1910 requested information but, as it turns out, weak references to bound 1911 methods (e.g. C{self.GetLockInfo}) are tricky; there are several 1912 workarounds, but none of the ones I found works properly in combination 1913 with a standard C{WeakKeyDictionary} 1914 1915 """ 1916 assert provider not in self._locks, "Duplicate registration" 1917 1918 # There used to be a check for duplicate names here. As it turned out, when 1919 # a lock is re-created with the same name in a very short timeframe, the 1920 # previous instance might not yet be removed from the weakref dictionary. 1921 # By keeping track of the order of incoming registrations, a stable sort 1922 # ordering can still be guaranteed. 1923 1924 self._locks[provider] = self._counter.next()
1925
1926 - def _GetLockInfo(self, requested):
1927 """Get information from all locks. 1928 1929 """ 1930 # Must hold lock while getting consistent list of tracked items 1931 self._lock.acquire(shared=1) 1932 try: 1933 items = self._locks.items() 1934 finally: 1935 self._lock.release() 1936 1937 return [(info, idx, num) 1938 for (provider, num) in items 1939 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1940
1941 - def _Query(self, fields):
1942 """Queries information from all locks. 1943 1944 @type fields: list of strings 1945 @param fields: List of fields to return 1946 1947 """ 1948 qobj = query.Query(query.LOCK_FIELDS, fields) 1949 1950 # Get all data with internal lock held and then sort by name and incoming 1951 # order 1952 lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), 1953 key=_MonitorSortKey) 1954 1955 # Extract lock information and build query data 1956 return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1957
1958 - def QueryLocks(self, fields):
1959 """Queries information from all locks. 1960 1961 @type fields: list of strings 1962 @param fields: List of fields to return 1963 1964 """ 1965 (qobj, ctx) = self._Query(fields) 1966 1967 # Prepare query response 1968 return query.GetQueryResponse(qobj, ctx)
1969