Package ganeti :: Module locking
[hide private]
[frames] | no frames]

Source Code for Module ganeti.locking

   1  # 
   2  # 
   3   
   4  # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. 
   5  # 
   6  # This program is free software; you can redistribute it and/or modify 
   7  # it under the terms of the GNU General Public License as published by 
   8  # the Free Software Foundation; either version 2 of the License, or 
   9  # (at your option) any later version. 
  10  # 
  11  # This program is distributed in the hope that it will be useful, but 
  12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  14  # General Public License for more details. 
  15  # 
  16  # You should have received a copy of the GNU General Public License 
  17  # along with this program; if not, write to the Free Software 
  18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 
  19  # 02110-1301, USA. 
  20   
  21  """Module implementing the Ganeti locking code.""" 
  22   
  23  # pylint: disable-msg=W0212 
  24   
  25  # W0212 since e.g. LockSet methods use (a lot) the internals of 
  26  # SharedLock 
  27   
  28  import os 
  29  import select 
  30  import threading 
  31  import time 
  32  import errno 
  33  import weakref 
  34  import logging 
  35   
  36  from ganeti import errors 
  37  from ganeti import utils 
  38  from ganeti import compat 
  39   
  40   
  41  _EXCLUSIVE_TEXT = "exclusive" 
  42  _SHARED_TEXT = "shared" 
43 44 45 -def ssynchronized(mylock, shared=0):
46 """Shared Synchronization decorator. 47 48 Calls the function holding the given lock, either in exclusive or shared 49 mode. It requires the passed lock to be a SharedLock (or support its 50 semantics). 51 52 @type mylock: lockable object or string 53 @param mylock: lock to acquire or class member name of the lock to acquire 54 55 """ 56 def wrap(fn): 57 def sync_function(*args, **kwargs): 58 if isinstance(mylock, basestring): 59 assert args, "cannot ssynchronize on non-class method: self not found" 60 # args[0] is "self" 61 lock = getattr(args[0], mylock) 62 else: 63 lock = mylock 64 lock.acquire(shared=shared) 65 try: 66 return fn(*args, **kwargs) 67 finally: 68 lock.release()
69 return sync_function 70 return wrap 71
72 73 -class RunningTimeout(object):
74 """Class to calculate remaining timeout when doing several operations. 75 76 """ 77 __slots__ = [ 78 "_allow_negative", 79 "_start_time", 80 "_time_fn", 81 "_timeout", 82 ] 83
84 - def __init__(self, timeout, allow_negative, _time_fn=time.time):
85 """Initializes this class. 86 87 @type timeout: float 88 @param timeout: Timeout duration 89 @type allow_negative: bool 90 @param allow_negative: Whether to return values below zero 91 @param _time_fn: Time function for unittests 92 93 """ 94 object.__init__(self) 95 96 if timeout is not None and timeout < 0.0: 97 raise ValueError("Timeout must not be negative") 98 99 self._timeout = timeout 100 self._allow_negative = allow_negative 101 self._time_fn = _time_fn 102 103 self._start_time = None
104
105 - def Remaining(self):
106 """Returns the remaining timeout. 107 108 """ 109 if self._timeout is None: 110 return None 111 112 # Get start time on first calculation 113 if self._start_time is None: 114 self._start_time = self._time_fn() 115 116 # Calculate remaining time 117 remaining_timeout = self._start_time + self._timeout - self._time_fn() 118 119 if not self._allow_negative: 120 # Ensure timeout is always >= 0 121 return max(0.0, remaining_timeout) 122 123 return remaining_timeout
124
125 126 -class _SingleNotifyPipeConditionWaiter(object):
127 """Helper class for SingleNotifyPipeCondition 128 129 """ 130 __slots__ = [ 131 "_fd", 132 "_poller", 133 ] 134
135 - def __init__(self, poller, fd):
136 """Constructor for _SingleNotifyPipeConditionWaiter 137 138 @type poller: select.poll 139 @param poller: Poller object 140 @type fd: int 141 @param fd: File descriptor to wait for 142 143 """ 144 object.__init__(self) 145 self._poller = poller 146 self._fd = fd
147
148 - def __call__(self, timeout):
149 """Wait for something to happen on the pipe. 150 151 @type timeout: float or None 152 @param timeout: Timeout for waiting (can be None) 153 154 """ 155 running_timeout = RunningTimeout(timeout, True) 156 157 while True: 158 remaining_time = running_timeout.Remaining() 159 160 if remaining_time is not None: 161 if remaining_time < 0.0: 162 break 163 164 # Our calculation uses seconds, poll() wants milliseconds 165 remaining_time *= 1000 166 167 try: 168 result = self._poller.poll(remaining_time) 169 except EnvironmentError, err: 170 if err.errno != errno.EINTR: 171 raise 172 result = None 173 174 # Check whether we were notified 175 if result and result[0][0] == self._fd: 176 break
177
178 179 -class _BaseCondition(object):
180 """Base class containing common code for conditions. 181 182 Some of this code is taken from python's threading module. 183 184 """ 185 __slots__ = [ 186 "_lock", 187 "acquire", 188 "release", 189 "_is_owned", 190 "_acquire_restore", 191 "_release_save", 192 ] 193
194 - def __init__(self, lock):
195 """Constructor for _BaseCondition. 196 197 @type lock: threading.Lock 198 @param lock: condition base lock 199 200 """ 201 object.__init__(self) 202 203 try: 204 self._release_save = lock._release_save 205 except AttributeError: 206 self._release_save = self._base_release_save 207 try: 208 self._acquire_restore = lock._acquire_restore 209 except AttributeError: 210 self._acquire_restore = self._base_acquire_restore 211 try: 212 self._is_owned = lock._is_owned 213 except AttributeError: 214 self._is_owned = self._base_is_owned 215 216 self._lock = lock 217 218 # Export the lock's acquire() and release() methods 219 self.acquire = lock.acquire 220 self.release = lock.release
221
222 - def _base_is_owned(self):
223 """Check whether lock is owned by current thread. 224 225 """ 226 if self._lock.acquire(0): 227 self._lock.release() 228 return False 229 return True
230
231 - def _base_release_save(self):
232 self._lock.release()
233
234 - def _base_acquire_restore(self, _):
235 self._lock.acquire()
236
237 - def _check_owned(self):
238 """Raise an exception if the current thread doesn't own the lock. 239 240 """ 241 if not self._is_owned(): 242 raise RuntimeError("cannot work with un-aquired lock")
243
244 245 -class SingleNotifyPipeCondition(_BaseCondition):
246 """Condition which can only be notified once. 247 248 This condition class uses pipes and poll, internally, to be able to wait for 249 notification with a timeout, without resorting to polling. It is almost 250 compatible with Python's threading.Condition, with the following differences: 251 - notifyAll can only be called once, and no wait can happen after that 252 - notify is not supported, only notifyAll 253 254 """ 255 256 __slots__ = [ 257 "_poller", 258 "_read_fd", 259 "_write_fd", 260 "_nwaiters", 261 "_notified", 262 ] 263 264 _waiter_class = _SingleNotifyPipeConditionWaiter 265
266 - def __init__(self, lock):
267 """Constructor for SingleNotifyPipeCondition 268 269 """ 270 _BaseCondition.__init__(self, lock) 271 self._nwaiters = 0 272 self._notified = False 273 self._read_fd = None 274 self._write_fd = None 275 self._poller = None
276
277 - def _check_unnotified(self):
278 """Throws an exception if already notified. 279 280 """ 281 if self._notified: 282 raise RuntimeError("cannot use already notified condition")
283
284 - def _Cleanup(self):
285 """Cleanup open file descriptors, if any. 286 287 """ 288 if self._read_fd is not None: 289 os.close(self._read_fd) 290 self._read_fd = None 291 292 if self._write_fd is not None: 293 os.close(self._write_fd) 294 self._write_fd = None 295 self._poller = None
296
297 - def wait(self, timeout=None):
298 """Wait for a notification. 299 300 @type timeout: float or None 301 @param timeout: Waiting timeout (can be None) 302 303 """ 304 self._check_owned() 305 self._check_unnotified() 306 307 self._nwaiters += 1 308 try: 309 if self._poller is None: 310 (self._read_fd, self._write_fd) = os.pipe() 311 self._poller = select.poll() 312 self._poller.register(self._read_fd, select.POLLHUP) 313 314 wait_fn = self._waiter_class(self._poller, self._read_fd) 315 state = self._release_save() 316 try: 317 # Wait for notification 318 wait_fn(timeout) 319 finally: 320 # Re-acquire lock 321 self._acquire_restore(state) 322 finally: 323 self._nwaiters -= 1 324 if self._nwaiters == 0: 325 self._Cleanup()
326
327 - def notifyAll(self): # pylint: disable-msg=C0103
328 """Close the writing side of the pipe to notify all waiters. 329 330 """ 331 self._check_owned() 332 self._check_unnotified() 333 self._notified = True 334 if self._write_fd is not None: 335 os.close(self._write_fd) 336 self._write_fd = None
337
338 339 -class PipeCondition(_BaseCondition):
340 """Group-only non-polling condition with counters. 341 342 This condition class uses pipes and poll, internally, to be able to wait for 343 notification with a timeout, without resorting to polling. It is almost 344 compatible with Python's threading.Condition, but only supports notifyAll and 345 non-recursive locks. As an additional features it's able to report whether 346 there are any waiting threads. 347 348 """ 349 __slots__ = [ 350 "_waiters", 351 "_single_condition", 352 ] 353 354 _single_condition_class = SingleNotifyPipeCondition 355
356 - def __init__(self, lock):
357 """Initializes this class. 358 359 """ 360 _BaseCondition.__init__(self, lock) 361 self._waiters = set() 362 self._single_condition = self._single_condition_class(self._lock)
363
364 - def wait(self, timeout=None):
365 """Wait for a notification. 366 367 @type timeout: float or None 368 @param timeout: Waiting timeout (can be None) 369 370 """ 371 self._check_owned() 372 373 # Keep local reference to the pipe. It could be replaced by another thread 374 # notifying while we're waiting. 375 cond = self._single_condition 376 377 self._waiters.add(threading.currentThread()) 378 try: 379 cond.wait(timeout) 380 finally: 381 self._check_owned() 382 self._waiters.remove(threading.currentThread())
383
384 - def notifyAll(self): # pylint: disable-msg=C0103
385 """Notify all currently waiting threads. 386 387 """ 388 self._check_owned() 389 self._single_condition.notifyAll() 390 self._single_condition = self._single_condition_class(self._lock)
391
392 - def get_waiting(self):
393 """Returns a list of all waiting threads. 394 395 """ 396 self._check_owned() 397 398 return self._waiters
399
400 - def has_waiting(self):
401 """Returns whether there are active waiters. 402 403 """ 404 self._check_owned() 405 406 return bool(self._waiters)
407
408 409 -class SharedLock(object):
410 """Implements a shared lock. 411 412 Multiple threads can acquire the lock in a shared way by calling 413 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way 414 threads can call C{acquire(shared=0)}. 415 416 The lock prevents starvation but does not guarantee that threads will acquire 417 the shared lock in the order they queued for it, just that they will 418 eventually do so. 419 420 @type name: string 421 @ivar name: the name of the lock 422 423 """ 424 __slots__ = [ 425 "__weakref__", 426 "__active_shr_c", 427 "__inactive_shr_c", 428 "__deleted", 429 "__exc", 430 "__lock", 431 "__pending", 432 "__shr", 433 "name", 434 ] 435 436 __condition_class = PipeCondition 437
438 - def __init__(self, name, monitor=None):
439 """Construct a new SharedLock. 440 441 @param name: the name of the lock 442 @type monitor: L{LockMonitor} 443 @param monitor: Lock monitor with which to register 444 445 """ 446 object.__init__(self) 447 448 self.name = name 449 450 # Internal lock 451 self.__lock = threading.Lock() 452 453 # Queue containing waiting acquires 454 self.__pending = [] 455 456 # Active and inactive conditions for shared locks 457 self.__active_shr_c = self.__condition_class(self.__lock) 458 self.__inactive_shr_c = self.__condition_class(self.__lock) 459 460 # Current lock holders 461 self.__shr = set() 462 self.__exc = None 463 464 # is this lock in the deleted state? 465 self.__deleted = False 466 467 # Register with lock monitor 468 if monitor: 469 monitor.RegisterLock(self)
470
471 - def GetInfo(self, fields):
472 """Retrieves information for querying locks. 473 474 @type fields: list of strings 475 @param fields: List of fields to return 476 477 """ 478 self.__lock.acquire() 479 try: 480 info = [] 481 482 # Note: to avoid unintentional race conditions, no references to 483 # modifiable objects should be returned unless they were created in this 484 # function. 485 for fname in fields: 486 if fname == "name": 487 info.append(self.name) 488 elif fname == "mode": 489 if self.__deleted: 490 info.append("deleted") 491 assert not (self.__exc or self.__shr) 492 elif self.__exc: 493 info.append(_EXCLUSIVE_TEXT) 494 elif self.__shr: 495 info.append(_SHARED_TEXT) 496 else: 497 info.append(None) 498 elif fname == "owner": 499 if self.__exc: 500 owner = [self.__exc] 501 else: 502 owner = self.__shr 503 504 if owner: 505 assert not self.__deleted 506 info.append([i.getName() for i in owner]) 507 else: 508 info.append(None) 509 elif fname == "pending": 510 data = [] 511 512 for cond in self.__pending: 513 if cond in (self.__active_shr_c, self.__inactive_shr_c): 514 mode = _SHARED_TEXT 515 else: 516 mode = _EXCLUSIVE_TEXT 517 518 # This function should be fast as it runs with the lock held. Hence 519 # not using utils.NiceSort. 520 data.append((mode, sorted([i.getName() 521 for i in cond.get_waiting()]))) 522 523 info.append(data) 524 else: 525 raise errors.OpExecError("Invalid query field '%s'" % fname) 526 527 return info 528 finally: 529 self.__lock.release()
530
531 - def __check_deleted(self):
532 """Raises an exception if the lock has been deleted. 533 534 """ 535 if self.__deleted: 536 raise errors.LockError("Deleted lock %s" % self.name)
537
538 - def __is_sharer(self):
539 """Is the current thread sharing the lock at this time? 540 541 """ 542 return threading.currentThread() in self.__shr
543
544 - def __is_exclusive(self):
545 """Is the current thread holding the lock exclusively at this time? 546 547 """ 548 return threading.currentThread() == self.__exc
549
550 - def __is_owned(self, shared=-1):
551 """Is the current thread somehow owning the lock at this time? 552 553 This is a private version of the function, which presumes you're holding 554 the internal lock. 555 556 """ 557 if shared < 0: 558 return self.__is_sharer() or self.__is_exclusive() 559 elif shared: 560 return self.__is_sharer() 561 else: 562 return self.__is_exclusive()
563
564 - def _is_owned(self, shared=-1):
565 """Is the current thread somehow owning the lock at this time? 566 567 @param shared: 568 - < 0: check for any type of ownership (default) 569 - 0: check for exclusive ownership 570 - > 0: check for shared ownership 571 572 """ 573 self.__lock.acquire() 574 try: 575 return self.__is_owned(shared=shared) 576 finally: 577 self.__lock.release()
578
579 - def _count_pending(self):
580 """Returns the number of pending acquires. 581 582 @rtype: int 583 584 """ 585 self.__lock.acquire() 586 try: 587 return len(self.__pending) 588 finally: 589 self.__lock.release()
590
591 - def __do_acquire(self, shared):
592 """Actually acquire the lock. 593 594 """ 595 if shared: 596 self.__shr.add(threading.currentThread()) 597 else: 598 self.__exc = threading.currentThread()
599
600 - def __can_acquire(self, shared):
601 """Determine whether lock can be acquired. 602 603 """ 604 if shared: 605 return self.__exc is None 606 else: 607 return len(self.__shr) == 0 and self.__exc is None
608
609 - def __is_on_top(self, cond):
610 """Checks whether the passed condition is on top of the queue. 611 612 The caller must make sure the queue isn't empty. 613 614 """ 615 return self.__pending[0] == cond
616
617 - def __acquire_unlocked(self, shared, timeout):
618 """Acquire a shared lock. 619 620 @param shared: whether to acquire in shared mode; by default an 621 exclusive lock will be acquired 622 @param timeout: maximum waiting time before giving up 623 624 """ 625 self.__check_deleted() 626 627 # We cannot acquire the lock if we already have it 628 assert not self.__is_owned(), ("double acquire() on a non-recursive lock" 629 " %s" % self.name) 630 631 # Check whether someone else holds the lock or there are pending acquires. 632 if not self.__pending and self.__can_acquire(shared): 633 # Apparently not, can acquire lock directly. 634 self.__do_acquire(shared) 635 return True 636 637 if shared: 638 wait_condition = self.__active_shr_c 639 640 # Check if we're not yet in the queue 641 if wait_condition not in self.__pending: 642 self.__pending.append(wait_condition) 643 else: 644 wait_condition = self.__condition_class(self.__lock) 645 # Always add to queue 646 self.__pending.append(wait_condition) 647 648 try: 649 # Wait until we become the topmost acquire in the queue or the timeout 650 # expires. 651 while not (self.__is_on_top(wait_condition) and 652 self.__can_acquire(shared)): 653 # Wait for notification 654 wait_condition.wait(timeout) 655 self.__check_deleted() 656 657 # A lot of code assumes blocking acquires always succeed. Loop 658 # internally for that case. 659 if timeout is not None: 660 break 661 662 if self.__is_on_top(wait_condition) and self.__can_acquire(shared): 663 self.__do_acquire(shared) 664 return True 665 finally: 666 # Remove condition from queue if there are no more waiters 667 if not wait_condition.has_waiting() and not self.__deleted: 668 self.__pending.remove(wait_condition) 669 670 return False
671
672 - def acquire(self, shared=0, timeout=None, test_notify=None):
673 """Acquire a shared lock. 674 675 @type shared: integer (0/1) used as a boolean 676 @param shared: whether to acquire in shared mode; by default an 677 exclusive lock will be acquired 678 @type timeout: float 679 @param timeout: maximum waiting time before giving up 680 @type test_notify: callable or None 681 @param test_notify: Special callback function for unittesting 682 683 """ 684 self.__lock.acquire() 685 try: 686 # We already got the lock, notify now 687 if __debug__ and callable(test_notify): 688 test_notify() 689 690 return self.__acquire_unlocked(shared, timeout) 691 finally: 692 self.__lock.release()
693
694 - def release(self):
695 """Release a Shared Lock. 696 697 You must have acquired the lock, either in shared or in exclusive mode, 698 before calling this function. 699 700 """ 701 self.__lock.acquire() 702 try: 703 assert self.__is_exclusive() or self.__is_sharer(), \ 704 "Cannot release non-owned lock" 705 706 # Autodetect release type 707 if self.__is_exclusive(): 708 self.__exc = None 709 else: 710 self.__shr.remove(threading.currentThread()) 711 712 # Notify topmost condition in queue 713 if self.__pending: 714 first_condition = self.__pending[0] 715 first_condition.notifyAll() 716 717 if first_condition == self.__active_shr_c: 718 self.__active_shr_c = self.__inactive_shr_c 719 self.__inactive_shr_c = first_condition 720 721 finally: 722 self.__lock.release()
723
724 - def delete(self, timeout=None):
725 """Delete a Shared Lock. 726 727 This operation will declare the lock for removal. First the lock will be 728 acquired in exclusive mode if you don't already own it, then the lock 729 will be put in a state where any future and pending acquire() fail. 730 731 @type timeout: float 732 @param timeout: maximum waiting time before giving up 733 734 """ 735 self.__lock.acquire() 736 try: 737 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" 738 739 self.__check_deleted() 740 741 # The caller is allowed to hold the lock exclusively already. 742 acquired = self.__is_exclusive() 743 744 if not acquired: 745 acquired = self.__acquire_unlocked(0, timeout) 746 747 assert self.__is_exclusive() and not self.__is_sharer(), \ 748 "Lock wasn't acquired in exclusive mode" 749 750 if acquired: 751 self.__deleted = True 752 self.__exc = None 753 754 assert not (self.__exc or self.__shr), "Found owner during deletion" 755 756 # Notify all acquires. They'll throw an error. 757 while self.__pending: 758 self.__pending.pop().notifyAll() 759 760 return acquired 761 finally: 762 self.__lock.release()
763
764 - def _release_save(self):
765 shared = self.__is_sharer() 766 self.release() 767 return shared
768
769 - def _acquire_restore(self, shared):
770 self.acquire(shared=shared)
771 772 773 # Whenever we want to acquire a full LockSet we pass None as the value 774 # to acquire. Hide this behind this nicely named constant. 775 ALL_SET = None
776 777 778 -class _AcquireTimeout(Exception):
779 """Internal exception to abort an acquire on a timeout. 780 781 """
782
783 784 -class LockSet:
785 """Implements a set of locks. 786 787 This abstraction implements a set of shared locks for the same resource type, 788 distinguished by name. The user can lock a subset of the resources and the 789 LockSet will take care of acquiring the locks always in the same order, thus 790 preventing deadlock. 791 792 All the locks needed in the same set must be acquired together, though. 793 794 @type name: string 795 @ivar name: the name of the lockset 796 797 """
798 - def __init__(self, members, name, monitor=None):
799 """Constructs a new LockSet. 800 801 @type members: list of strings 802 @param members: initial members of the set 803 @type monitor: L{LockMonitor} 804 @param monitor: Lock monitor with which to register member locks 805 806 """ 807 assert members is not None, "members parameter is not a list" 808 self.name = name 809 810 # Lock monitor 811 self.__monitor = monitor 812 813 # Used internally to guarantee coherency. 814 self.__lock = SharedLock(name) 815 816 # The lockdict indexes the relationship name -> lock 817 # The order-of-locking is implied by the alphabetical order of names 818 self.__lockdict = {} 819 820 for mname in members: 821 self.__lockdict[mname] = SharedLock(self._GetLockName(mname), 822 monitor=monitor) 823 824 # The owner dict contains the set of locks each thread owns. For 825 # performance each thread can access its own key without a global lock on 826 # this structure. It is paramount though that *no* other type of access is 827 # done to this structure (eg. no looping over its keys). *_owner helper 828 # function are defined to guarantee access is correct, but in general never 829 # do anything different than __owners[threading.currentThread()], or there 830 # will be trouble. 831 self.__owners = {}
832
833 - def _GetLockName(self, mname):
834 """Returns the name for a member lock. 835 836 """ 837 return "%s/%s" % (self.name, mname)
838
839 - def _is_owned(self):
840 """Is the current thread a current level owner?""" 841 return threading.currentThread() in self.__owners
842
843 - def _add_owned(self, name=None):
844 """Note the current thread owns the given lock""" 845 if name is None: 846 if not self._is_owned(): 847 self.__owners[threading.currentThread()] = set() 848 else: 849 if self._is_owned(): 850 self.__owners[threading.currentThread()].add(name) 851 else: 852 self.__owners[threading.currentThread()] = set([name])
853
854 - def _del_owned(self, name=None):
855 """Note the current thread owns the given lock""" 856 857 assert not (name is None and self.__lock._is_owned()), \ 858 "Cannot hold internal lock when deleting owner status" 859 860 if name is not None: 861 self.__owners[threading.currentThread()].remove(name) 862 863 # Only remove the key if we don't hold the set-lock as well 864 if (not self.__lock._is_owned() and 865 not self.__owners[threading.currentThread()]): 866 del self.__owners[threading.currentThread()]
867
868 - def _list_owned(self):
869 """Get the set of resource names owned by the current thread""" 870 if self._is_owned(): 871 return self.__owners[threading.currentThread()].copy() 872 else: 873 return set()
874
875 - def _release_and_delete_owned(self):
876 """Release and delete all resources owned by the current thread""" 877 for lname in self._list_owned(): 878 lock = self.__lockdict[lname] 879 if lock._is_owned(): 880 lock.release() 881 self._del_owned(name=lname)
882
883 - def __names(self):
884 """Return the current set of names. 885 886 Only call this function while holding __lock and don't iterate on the 887 result after releasing the lock. 888 889 """ 890 return self.__lockdict.keys()
891
892 - def _names(self):
893 """Return a copy of the current set of elements. 894 895 Used only for debugging purposes. 896 897 """ 898 # If we don't already own the set-level lock acquired 899 # we'll get it and note we need to release it later. 900 release_lock = False 901 if not self.__lock._is_owned(): 902 release_lock = True 903 self.__lock.acquire(shared=1) 904 try: 905 result = self.__names() 906 finally: 907 if release_lock: 908 self.__lock.release() 909 return set(result)
910
911 - def acquire(self, names, timeout=None, shared=0, test_notify=None):
912 """Acquire a set of resource locks. 913 914 @type names: list of strings (or string) 915 @param names: the names of the locks which shall be acquired 916 (special lock names, or instance/node names) 917 @type shared: integer (0/1) used as a boolean 918 @param shared: whether to acquire in shared mode; by default an 919 exclusive lock will be acquired 920 @type timeout: float or None 921 @param timeout: Maximum time to acquire all locks 922 @type test_notify: callable or None 923 @param test_notify: Special callback function for unittesting 924 925 @return: Set of all locks successfully acquired or None in case of timeout 926 927 @raise errors.LockError: when any lock we try to acquire has 928 been deleted before we succeed. In this case none of the 929 locks requested will be acquired. 930 931 """ 932 assert timeout is None or timeout >= 0.0 933 934 # Check we don't already own locks at this level 935 assert not self._is_owned(), ("Cannot acquire locks in the same set twice" 936 " (lockset %s)" % self.name) 937 938 # We need to keep track of how long we spent waiting for a lock. The 939 # timeout passed to this function is over all lock acquires. 940 running_timeout = RunningTimeout(timeout, False) 941 942 try: 943 if names is not None: 944 # Support passing in a single resource to acquire rather than many 945 if isinstance(names, basestring): 946 names = [names] 947 948 return self.__acquire_inner(names, False, shared, 949 running_timeout.Remaining, test_notify) 950 951 else: 952 # If no names are given acquire the whole set by not letting new names 953 # being added before we release, and getting the current list of names. 954 # Some of them may then be deleted later, but we'll cope with this. 955 # 956 # We'd like to acquire this lock in a shared way, as it's nice if 957 # everybody else can use the instances at the same time. If are 958 # acquiring them exclusively though they won't be able to do this 959 # anyway, though, so we'll get the list lock exclusively as well in 960 # order to be able to do add() on the set while owning it. 961 if not self.__lock.acquire(shared=shared, 962 timeout=running_timeout.Remaining()): 963 raise _AcquireTimeout() 964 try: 965 # note we own the set-lock 966 self._add_owned() 967 968 return self.__acquire_inner(self.__names(), True, shared, 969 running_timeout.Remaining, test_notify) 970 except: 971 # We shouldn't have problems adding the lock to the owners list, but 972 # if we did we'll try to release this lock and re-raise exception. 973 # Of course something is going to be really wrong, after this. 974 self.__lock.release() 975 self._del_owned() 976 raise 977 978 except _AcquireTimeout: 979 return None
980
981 - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
982 """Inner logic for acquiring a number of locks. 983 984 @param names: Names of the locks to be acquired 985 @param want_all: Whether all locks in the set should be acquired 986 @param shared: Whether to acquire in shared mode 987 @param timeout_fn: Function returning remaining timeout 988 @param test_notify: Special callback function for unittesting 989 990 """ 991 acquire_list = [] 992 993 # First we look the locks up on __lockdict. We have no way of being sure 994 # they will still be there after, but this makes it a lot faster should 995 # just one of them be the already wrong. Using a sorted sequence to prevent 996 # deadlocks. 997 for lname in sorted(utils.UniqueSequence(names)): 998 try: 999 lock = self.__lockdict[lname] # raises KeyError if lock is not there 1000 except KeyError: 1001 if want_all: 1002 # We are acquiring all the set, it doesn't matter if this particular 1003 # element is not there anymore. 1004 continue 1005 1006 raise errors.LockError("Non-existing lock %s in set %s" % 1007 (lname, self.name)) 1008 1009 acquire_list.append((lname, lock)) 1010 1011 # This will hold the locknames we effectively acquired. 1012 acquired = set() 1013 1014 try: 1015 # Now acquire_list contains a sorted list of resources and locks we 1016 # want. In order to get them we loop on this (private) list and 1017 # acquire() them. We gave no real guarantee they will still exist till 1018 # this is done but .acquire() itself is safe and will alert us if the 1019 # lock gets deleted. 1020 for (lname, lock) in acquire_list: 1021 if __debug__ and callable(test_notify): 1022 test_notify_fn = lambda: test_notify(lname) 1023 else: 1024 test_notify_fn = None 1025 1026 timeout = timeout_fn() 1027 1028 try: 1029 # raises LockError if the lock was deleted 1030 acq_success = lock.acquire(shared=shared, timeout=timeout, 1031 test_notify=test_notify_fn) 1032 except errors.LockError: 1033 if want_all: 1034 # We are acquiring all the set, it doesn't matter if this 1035 # particular element is not there anymore. 1036 continue 1037 1038 raise errors.LockError("Non-existing lock %s in set %s" % 1039 (lname, self.name)) 1040 1041 if not acq_success: 1042 # Couldn't get lock or timeout occurred 1043 if timeout is None: 1044 # This shouldn't happen as SharedLock.acquire(timeout=None) is 1045 # blocking. 1046 raise errors.LockError("Failed to get lock %s (set %s)" % 1047 (lname, self.name)) 1048 1049 raise _AcquireTimeout() 1050 1051 try: 1052 # now the lock cannot be deleted, we have it! 1053 self._add_owned(name=lname) 1054 acquired.add(lname) 1055 1056 except: 1057 # We shouldn't have problems adding the lock to the owners list, but 1058 # if we did we'll try to release this lock and re-raise exception. 1059 # Of course something is going to be really wrong after this. 1060 if lock._is_owned(): 1061 lock.release() 1062 raise 1063 1064 except: 1065 # Release all owned locks 1066 self._release_and_delete_owned() 1067 raise 1068 1069 return acquired
1070
1071 - def release(self, names=None):
1072 """Release a set of resource locks, at the same level. 1073 1074 You must have acquired the locks, either in shared or in exclusive mode, 1075 before releasing them. 1076 1077 @type names: list of strings, or None 1078 @param names: the names of the locks which shall be released 1079 (defaults to all the locks acquired at that level). 1080 1081 """ 1082 assert self._is_owned(), ("release() on lock set %s while not owner" % 1083 self.name) 1084 1085 # Support passing in a single resource to release rather than many 1086 if isinstance(names, basestring): 1087 names = [names] 1088 1089 if names is None: 1090 names = self._list_owned() 1091 else: 1092 names = set(names) 1093 assert self._list_owned().issuperset(names), ( 1094 "release() on unheld resources %s (set %s)" % 1095 (names.difference(self._list_owned()), self.name)) 1096 1097 # First of all let's release the "all elements" lock, if set. 1098 # After this 'add' can work again 1099 if self.__lock._is_owned(): 1100 self.__lock.release() 1101 self._del_owned() 1102 1103 for lockname in names: 1104 # If we are sure the lock doesn't leave __lockdict without being 1105 # exclusively held we can do this... 1106 self.__lockdict[lockname].release() 1107 self._del_owned(name=lockname)
1108
1109 - def add(self, names, acquired=0, shared=0):
1110 """Add a new set of elements to the set 1111 1112 @type names: list of strings 1113 @param names: names of the new elements to add 1114 @type acquired: integer (0/1) used as a boolean 1115 @param acquired: pre-acquire the new resource? 1116 @type shared: integer (0/1) used as a boolean 1117 @param shared: is the pre-acquisition shared? 1118 1119 """ 1120 # Check we don't already own locks at this level 1121 assert not self._is_owned() or self.__lock._is_owned(shared=0), \ 1122 ("Cannot add locks if the set %s is only partially owned, or shared" % 1123 self.name) 1124 1125 # Support passing in a single resource to add rather than many 1126 if isinstance(names, basestring): 1127 names = [names] 1128 1129 # If we don't already own the set-level lock acquired in an exclusive way 1130 # we'll get it and note we need to release it later. 1131 release_lock = False 1132 if not self.__lock._is_owned(): 1133 release_lock = True 1134 self.__lock.acquire() 1135 1136 try: 1137 invalid_names = set(self.__names()).intersection(names) 1138 if invalid_names: 1139 # This must be an explicit raise, not an assert, because assert is 1140 # turned off when using optimization, and this can happen because of 1141 # concurrency even if the user doesn't want it. 1142 raise errors.LockError("duplicate add(%s) on lockset %s" % 1143 (invalid_names, self.name)) 1144 1145 for lockname in names: 1146 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) 1147 1148 if acquired: 1149 lock.acquire(shared=shared) 1150 # now the lock cannot be deleted, we have it! 1151 try: 1152 self._add_owned(name=lockname) 1153 except: 1154 # We shouldn't have problems adding the lock to the owners list, 1155 # but if we did we'll try to release this lock and re-raise 1156 # exception. Of course something is going to be really wrong, 1157 # after this. On the other hand the lock hasn't been added to the 1158 # __lockdict yet so no other threads should be pending on it. This 1159 # release is just a safety measure. 1160 lock.release() 1161 raise 1162 1163 self.__lockdict[lockname] = lock 1164 1165 finally: 1166 # Only release __lock if we were not holding it previously. 1167 if release_lock: 1168 self.__lock.release() 1169 1170 return True
1171
1172 - def remove(self, names):
1173 """Remove elements from the lock set. 1174 1175 You can either not hold anything in the lockset or already hold a superset 1176 of the elements you want to delete, exclusively. 1177 1178 @type names: list of strings 1179 @param names: names of the resource to remove. 1180 1181 @return: a list of locks which we removed; the list is always 1182 equal to the names list if we were holding all the locks 1183 exclusively 1184 1185 """ 1186 # Support passing in a single resource to remove rather than many 1187 if isinstance(names, basestring): 1188 names = [names] 1189 1190 # If we own any subset of this lock it must be a superset of what we want 1191 # to delete. The ownership must also be exclusive, but that will be checked 1192 # by the lock itself. 1193 assert not self._is_owned() or self._list_owned().issuperset(names), ( 1194 "remove() on acquired lockset %s while not owning all elements" % 1195 self.name) 1196 1197 removed = [] 1198 1199 for lname in names: 1200 # Calling delete() acquires the lock exclusively if we don't already own 1201 # it, and causes all pending and subsequent lock acquires to fail. It's 1202 # fine to call it out of order because delete() also implies release(), 1203 # and the assertion above guarantees that if we either already hold 1204 # everything we want to delete, or we hold none. 1205 try: 1206 self.__lockdict[lname].delete() 1207 removed.append(lname) 1208 except (KeyError, errors.LockError): 1209 # This cannot happen if we were already holding it, verify: 1210 assert not self._is_owned(), ("remove failed while holding lockset %s" 1211 % self.name) 1212 else: 1213 # If no LockError was raised we are the ones who deleted the lock. 1214 # This means we can safely remove it from lockdict, as any further or 1215 # pending delete() or acquire() will fail (and nobody can have the lock 1216 # since before our call to delete()). 1217 # 1218 # This is done in an else clause because if the exception was thrown 1219 # it's the job of the one who actually deleted it. 1220 del self.__lockdict[lname] 1221 # And let's remove it from our private list if we owned it. 1222 if self._is_owned(): 1223 self._del_owned(name=lname) 1224 1225 return removed
1226 1227 1228 # Locking levels, must be acquired in increasing order. 1229 # Current rules are: 1230 # - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be 1231 # acquired before performing any operation, either in shared or in exclusive 1232 # mode. acquiring the BGL in exclusive mode is discouraged and should be 1233 # avoided. 1234 # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. 1235 # If you need more than one node, or more than one instance, acquire them at 1236 # the same time. 1237 LEVEL_CLUSTER = 0 1238 LEVEL_INSTANCE = 1 1239 LEVEL_NODE = 2 1240 1241 LEVELS = [LEVEL_CLUSTER, 1242 LEVEL_INSTANCE, 1243 LEVEL_NODE] 1244 1245 # Lock levels which are modifiable 1246 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] 1247 1248 LEVEL_NAMES = { 1249 LEVEL_CLUSTER: "cluster", 1250 LEVEL_INSTANCE: "instance", 1251 LEVEL_NODE: "node", 1252 } 1253 1254 # Constant for the big ganeti lock 1255 BGL = 'BGL'
1256 1257 1258 -class GanetiLockManager:
1259 """The Ganeti Locking Library 1260 1261 The purpose of this small library is to manage locking for ganeti clusters 1262 in a central place, while at the same time doing dynamic checks against 1263 possible deadlocks. It will also make it easier to transition to a different 1264 lock type should we migrate away from python threads. 1265 1266 """ 1267 _instance = None 1268
1269 - def __init__(self, nodes=None, instances=None):
1270 """Constructs a new GanetiLockManager object. 1271 1272 There should be only a GanetiLockManager object at any time, so this 1273 function raises an error if this is not the case. 1274 1275 @param nodes: list of node names 1276 @param instances: list of instance names 1277 1278 """ 1279 assert self.__class__._instance is None, \ 1280 "double GanetiLockManager instance" 1281 1282 self.__class__._instance = self 1283 1284 self._monitor = LockMonitor() 1285 1286 # The keyring contains all the locks, at their level and in the correct 1287 # locking order. 1288 self.__keyring = { 1289 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), 1290 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), 1291 LEVEL_INSTANCE: LockSet(instances, "instances", 1292 monitor=self._monitor), 1293 }
1294
1295 - def QueryLocks(self, fields, sync):
1296 """Queries information from all locks. 1297 1298 See L{LockMonitor.QueryLocks}. 1299 1300 """ 1301 return self._monitor.QueryLocks(fields, sync)
1302
1303 - def _names(self, level):
1304 """List the lock names at the given level. 1305 1306 This can be used for debugging/testing purposes. 1307 1308 @param level: the level whose list of locks to get 1309 1310 """ 1311 assert level in LEVELS, "Invalid locking level %s" % level 1312 return self.__keyring[level]._names()
1313
1314 - def _is_owned(self, level):
1315 """Check whether we are owning locks at the given level 1316 1317 """ 1318 return self.__keyring[level]._is_owned()
1319 1320 is_owned = _is_owned 1321
1322 - def _list_owned(self, level):
1323 """Get the set of owned locks at the given level 1324 1325 """ 1326 return self.__keyring[level]._list_owned()
1327
1328 - def _upper_owned(self, level):
1329 """Check that we don't own any lock at a level greater than the given one. 1330 1331 """ 1332 # This way of checking only works if LEVELS[i] = i, which we check for in 1333 # the test cases. 1334 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1335
1336 - def _BGL_owned(self): # pylint: disable-msg=C0103
1337 """Check if the current thread owns the BGL. 1338 1339 Both an exclusive or a shared acquisition work. 1340 1341 """ 1342 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1343 1344 @staticmethod
1345 - def _contains_BGL(level, names): # pylint: disable-msg=C0103
1346 """Check if the level contains the BGL. 1347 1348 Check if acting on the given level and set of names will change 1349 the status of the Big Ganeti Lock. 1350 1351 """ 1352 return level == LEVEL_CLUSTER and (names is None or BGL in names) 1353
1354 - def acquire(self, level, names, timeout=None, shared=0):
1355 """Acquire a set of resource locks, at the same level. 1356 1357 @type level: member of locking.LEVELS 1358 @param level: the level at which the locks shall be acquired 1359 @type names: list of strings (or string) 1360 @param names: the names of the locks which shall be acquired 1361 (special lock names, or instance/node names) 1362 @type shared: integer (0/1) used as a boolean 1363 @param shared: whether to acquire in shared mode; by default 1364 an exclusive lock will be acquired 1365 @type timeout: float 1366 @param timeout: Maximum time to acquire all locks 1367 1368 """ 1369 assert level in LEVELS, "Invalid locking level %s" % level 1370 1371 # Check that we are either acquiring the Big Ganeti Lock or we already own 1372 # it. Some "legacy" opcodes need to be sure they are run non-concurrently 1373 # so even if we've migrated we need to at least share the BGL to be 1374 # compatible with them. Of course if we own the BGL exclusively there's no 1375 # point in acquiring any other lock, unless perhaps we are half way through 1376 # the migration of the current opcode. 1377 assert (self._contains_BGL(level, names) or self._BGL_owned()), ( 1378 "You must own the Big Ganeti Lock before acquiring any other") 1379 1380 # Check we don't own locks at the same or upper levels. 1381 assert not self._upper_owned(level), ("Cannot acquire locks at a level" 1382 " while owning some at a greater one") 1383 1384 # Acquire the locks in the set. 1385 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1386
1387 - def release(self, level, names=None):
1388 """Release a set of resource locks, at the same level. 1389 1390 You must have acquired the locks, either in shared or in exclusive 1391 mode, before releasing them. 1392 1393 @type level: member of locking.LEVELS 1394 @param level: the level at which the locks shall be released 1395 @type names: list of strings, or None 1396 @param names: the names of the locks which shall be released 1397 (defaults to all the locks acquired at that level) 1398 1399 """ 1400 assert level in LEVELS, "Invalid locking level %s" % level 1401 assert (not self._contains_BGL(level, names) or 1402 not self._upper_owned(LEVEL_CLUSTER)), ( 1403 "Cannot release the Big Ganeti Lock while holding something" 1404 " at upper levels (%r)" % 1405 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) 1406 for i in self.__keyring.keys()]), )) 1407 1408 # Release will complain if we don't own the locks already 1409 return self.__keyring[level].release(names)
1410
1411 - def add(self, level, names, acquired=0, shared=0):
1412 """Add locks at the specified level. 1413 1414 @type level: member of locking.LEVELS_MOD 1415 @param level: the level at which the locks shall be added 1416 @type names: list of strings 1417 @param names: names of the locks to acquire 1418 @type acquired: integer (0/1) used as a boolean 1419 @param acquired: whether to acquire the newly added locks 1420 @type shared: integer (0/1) used as a boolean 1421 @param shared: whether the acquisition will be shared 1422 1423 """ 1424 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1425 assert self._BGL_owned(), ("You must own the BGL before performing other" 1426 " operations") 1427 assert not self._upper_owned(level), ("Cannot add locks at a level" 1428 " while owning some at a greater one") 1429 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1430
1431 - def remove(self, level, names):
1432 """Remove locks from the specified level. 1433 1434 You must either already own the locks you are trying to remove 1435 exclusively or not own any lock at an upper level. 1436 1437 @type level: member of locking.LEVELS_MOD 1438 @param level: the level at which the locks shall be removed 1439 @type names: list of strings 1440 @param names: the names of the locks which shall be removed 1441 (special lock names, or instance/node names) 1442 1443 """ 1444 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level 1445 assert self._BGL_owned(), ("You must own the BGL before performing other" 1446 " operations") 1447 # Check we either own the level or don't own anything from here 1448 # up. LockSet.remove() will check the case in which we don't own 1449 # all the needed resources, or we have a shared ownership. 1450 assert self._is_owned(level) or not self._upper_owned(level), ( 1451 "Cannot remove locks at a level while not owning it or" 1452 " owning some at a greater one") 1453 return self.__keyring[level].remove(names)
1454
1455 1456 -class LockMonitor(object):
1457 _LOCK_ATTR = "_lock" 1458
1459 - def __init__(self):
1460 """Initializes this class. 1461 1462 """ 1463 self._lock = SharedLock("LockMonitor") 1464 1465 # Tracked locks. Weak references are used to avoid issues with circular 1466 # references and deletion. 1467 self._locks = weakref.WeakKeyDictionary()
1468 1469 @ssynchronized(_LOCK_ATTR)
1470 - def RegisterLock(self, lock):
1471 """Registers a new lock. 1472 1473 """ 1474 logging.debug("Registering lock %s", lock.name) 1475 assert lock not in self._locks, "Duplicate lock registration" 1476 assert not compat.any(lock.name == i.name for i in self._locks.keys()), \ 1477 "Found duplicate lock name" 1478 self._locks[lock] = None
1479 1480 @ssynchronized(_LOCK_ATTR)
1481 - def _GetLockInfo(self, fields):
1482 """Get information from all locks while the monitor lock is held. 1483 1484 """ 1485 result = {} 1486 1487 for lock in self._locks.keys(): 1488 assert lock.name not in result, "Found duplicate lock name" 1489 result[lock.name] = lock.GetInfo(fields) 1490 1491 return result
1492
1493 - def QueryLocks(self, fields, sync):
1494 """Queries information from all locks. 1495 1496 @type fields: list of strings 1497 @param fields: List of fields to return 1498 @type sync: boolean 1499 @param sync: Whether to operate in synchronous mode 1500 1501 """ 1502 if sync: 1503 raise NotImplementedError("Synchronous queries are not implemented") 1504 1505 # Get all data without sorting 1506 result = self._GetLockInfo(fields) 1507 1508 # Sort by name 1509 return [result[name] for name in utils.NiceSort(result.keys())]
1510