1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Module implementing the Ganeti locking code."""
31
32
33
34
35
36
37 import os
38 import select
39 import threading
40 import errno
41 import weakref
42 import logging
43 import heapq
44 import itertools
45 import time
46
47 from ganeti import errors
48 from ganeti import utils
49 from ganeti import compat
50 from ganeti import query
51
52
53 _EXCLUSIVE_TEXT = "exclusive"
54 _SHARED_TEXT = "shared"
55 _DELETED_TEXT = "deleted"
56
57 _DEFAULT_PRIORITY = 0
58
59
60
61 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
62
63
64 (_LS_ACQUIRE_EXACT,
65 _LS_ACQUIRE_ALL,
66 _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
67
68 _LS_ACQUIRE_MODES = compat.UniqueFrozenset([
69 _LS_ACQUIRE_EXACT,
70 _LS_ACQUIRE_ALL,
71 _LS_ACQUIRE_OPPORTUNISTIC,
72 ])
76 """Shared Synchronization decorator.
77
78 Calls the function holding the given lock, either in exclusive or shared
79 mode. It requires the passed lock to be a SharedLock (or support its
80 semantics).
81
82 @type mylock: lockable object or string
83 @param mylock: lock to acquire or class member name of the lock to acquire
84
85 """
86 def wrap(fn):
87 def sync_function(*args, **kwargs):
88 if isinstance(mylock, basestring):
89 assert args, "cannot ssynchronize on non-class method: self not found"
90
91 lock = getattr(args[0], mylock)
92 else:
93 lock = mylock
94 lock.acquire(shared=shared)
95 try:
96 return fn(*args, **kwargs)
97 finally:
98 lock.release()
99 return sync_function
100 return wrap
101
104 """Helper class for SingleNotifyPipeCondition
105
106 """
107 __slots__ = [
108 "_fd",
109 ]
110
112 """Constructor for _SingleNotifyPipeConditionWaiter
113
114 @type fd: int
115 @param fd: File descriptor to wait for
116
117 """
118 object.__init__(self)
119 self._fd = fd
120
122 """Wait for something to happen on the pipe.
123
124 @type timeout: float or None
125 @param timeout: Timeout for waiting (can be None)
126
127 """
128 running_timeout = utils.RunningTimeout(timeout, True)
129 poller = select.poll()
130 poller.register(self._fd, select.POLLHUP)
131
132 while True:
133 remaining_time = running_timeout.Remaining()
134
135 if remaining_time is not None:
136 if remaining_time < 0.0:
137 break
138
139
140 remaining_time *= 1000
141
142 try:
143 result = poller.poll(remaining_time)
144 except EnvironmentError, err:
145 if err.errno != errno.EINTR:
146 raise
147 result = None
148
149
150 if result and result[0][0] == self._fd:
151 break
152
155 """Base class containing common code for conditions.
156
157 Some of this code is taken from python's threading module.
158
159 """
160 __slots__ = [
161 "_lock",
162 "acquire",
163 "release",
164 "_is_owned",
165 "_acquire_restore",
166 "_release_save",
167 ]
168
196
198 """Check whether lock is owned by current thread.
199
200 """
201 if self._lock.acquire(0):
202 self._lock.release()
203 return False
204 return True
205
208
211
213 """Raise an exception if the current thread doesn't own the lock.
214
215 """
216 if not self._is_owned():
217 raise RuntimeError("cannot work with un-aquired lock")
218
221 """Condition which can only be notified once.
222
223 This condition class uses pipes and poll, internally, to be able to wait for
224 notification with a timeout, without resorting to polling. It is almost
225 compatible with Python's threading.Condition, with the following differences:
226 - notifyAll can only be called once, and no wait can happen after that
227 - notify is not supported, only notifyAll
228
229 """
230
231 __slots__ = [
232 "_read_fd",
233 "_write_fd",
234 "_nwaiters",
235 "_notified",
236 ]
237
238 _waiter_class = _SingleNotifyPipeConditionWaiter
239
241 """Constructor for SingleNotifyPipeCondition
242
243 """
244 _BaseCondition.__init__(self, lock)
245 self._nwaiters = 0
246 self._notified = False
247 self._read_fd = None
248 self._write_fd = None
249
251 """Throws an exception if already notified.
252
253 """
254 if self._notified:
255 raise RuntimeError("cannot use already notified condition")
256
258 """Cleanup open file descriptors, if any.
259
260 """
261 if self._read_fd is not None:
262 os.close(self._read_fd)
263 self._read_fd = None
264
265 if self._write_fd is not None:
266 os.close(self._write_fd)
267 self._write_fd = None
268
269 - def wait(self, timeout):
270 """Wait for a notification.
271
272 @type timeout: float or None
273 @param timeout: Waiting timeout (can be None)
274
275 """
276 self._check_owned()
277 self._check_unnotified()
278
279 self._nwaiters += 1
280 try:
281 if self._read_fd is None:
282 (self._read_fd, self._write_fd) = os.pipe()
283
284 wait_fn = self._waiter_class(self._read_fd)
285 state = self._release_save()
286 try:
287
288 wait_fn(timeout)
289 finally:
290
291 self._acquire_restore(state)
292 finally:
293 self._nwaiters -= 1
294 if self._nwaiters == 0:
295 self._Cleanup()
296
298 """Close the writing side of the pipe to notify all waiters.
299
300 """
301 self._check_owned()
302 self._check_unnotified()
303 self._notified = True
304 if self._write_fd is not None:
305 os.close(self._write_fd)
306 self._write_fd = None
307
310 """Group-only non-polling condition with counters.
311
312 This condition class uses pipes and poll, internally, to be able to wait for
313 notification with a timeout, without resorting to polling. It is almost
314 compatible with Python's threading.Condition, but only supports notifyAll and
315 non-recursive locks. As an additional features it's able to report whether
316 there are any waiting threads.
317
318 """
319 __slots__ = [
320 "_waiters",
321 "_single_condition",
322 ]
323
324 _single_condition_class = SingleNotifyPipeCondition
325
327 """Initializes this class.
328
329 """
330 _BaseCondition.__init__(self, lock)
331 self._waiters = set()
332 self._single_condition = self._single_condition_class(self._lock)
333
334 - def wait(self, timeout):
335 """Wait for a notification.
336
337 @type timeout: float or None
338 @param timeout: Waiting timeout (can be None)
339
340 """
341 self._check_owned()
342
343
344
345 cond = self._single_condition
346
347 self._waiters.add(threading.currentThread())
348 try:
349 cond.wait(timeout)
350 finally:
351 self._check_owned()
352 self._waiters.remove(threading.currentThread())
353
355 """Notify all currently waiting threads.
356
357 """
358 self._check_owned()
359 self._single_condition.notifyAll()
360 self._single_condition = self._single_condition_class(self._lock)
361
363 """Returns a list of all waiting threads.
364
365 """
366 self._check_owned()
367
368 return self._waiters
369
371 """Returns whether there are active waiters.
372
373 """
374 self._check_owned()
375
376 return bool(self._waiters)
377
379 return ("<%s.%s waiters=%s at %#x>" %
380 (self.__class__.__module__, self.__class__.__name__,
381 self._waiters, id(self)))
382
385 __slots__ = [
386 "shared",
387 ]
388
390 """Initializes this class.
391
392 """
393 self.shared = shared
394 PipeCondition.__init__(self, lock)
395
398 """Implements a shared lock.
399
400 Multiple threads can acquire the lock in a shared way by calling
401 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
402 threads can call C{acquire(shared=0)}.
403
404 Notes on data structures: C{__pending} contains a priority queue (heapq) of
405 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
406 ...]}. Each per-priority queue contains a normal in-order list of conditions
407 to be notified when the lock can be acquired. Shared locks are grouped
408 together by priority and the condition for them is stored in
409 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
410 references for the per-priority queues indexed by priority for faster access.
411
412 @type name: string
413 @ivar name: the name of the lock
414
415 """
416 __slots__ = [
417 "__weakref__",
418 "__deleted",
419 "__exc",
420 "__lock",
421 "__pending",
422 "__pending_by_prio",
423 "__pending_shared",
424 "__shr",
425 "__time_fn",
426 "name",
427 ]
428
429 __condition_class = _PipeConditionWithMode
430
431 - def __init__(self, name, monitor=None, _time_fn=time.time):
432 """Construct a new SharedLock.
433
434 @param name: the name of the lock
435 @type monitor: L{LockMonitor}
436 @param monitor: Lock monitor with which to register
437
438 """
439 object.__init__(self)
440
441 self.name = name
442
443
444 self.__time_fn = _time_fn
445
446
447 self.__lock = threading.Lock()
448
449
450 self.__pending = []
451 self.__pending_by_prio = {}
452 self.__pending_shared = {}
453
454
455 self.__shr = set()
456 self.__exc = None
457
458
459 self.__deleted = False
460
461
462 if monitor:
463 logging.debug("Adding lock %s to monitor", name)
464 monitor.RegisterLock(self)
465
467 return ("<%s.%s name=%s at %#x>" %
468 (self.__class__.__module__, self.__class__.__name__,
469 self.name, id(self)))
470
472 """Retrieves information for querying locks.
473
474 @type requested: set
475 @param requested: Requested information, see C{query.LQ_*}
476
477 """
478 self.__lock.acquire()
479 try:
480
481
482
483 mode = None
484 owner_names = None
485
486 if query.LQ_MODE in requested:
487 if self.__deleted:
488 mode = _DELETED_TEXT
489 assert not (self.__exc or self.__shr)
490 elif self.__exc:
491 mode = _EXCLUSIVE_TEXT
492 elif self.__shr:
493 mode = _SHARED_TEXT
494
495
496 if query.LQ_OWNER in requested:
497 if self.__exc:
498 owner = [self.__exc]
499 else:
500 owner = self.__shr
501
502 if owner:
503 assert not self.__deleted
504 owner_names = [i.getName() for i in owner]
505
506
507 if query.LQ_PENDING in requested:
508 pending = []
509
510
511 for (_, prioqueue) in sorted(self.__pending):
512 for cond in prioqueue:
513 if cond.shared:
514 pendmode = _SHARED_TEXT
515 else:
516 pendmode = _EXCLUSIVE_TEXT
517
518
519 pending.append((pendmode, [i.getName()
520 for i in cond.get_waiting()]))
521 else:
522 pending = None
523
524 return [(self.name, mode, owner_names, pending)]
525 finally:
526 self.__lock.release()
527
529 """Raises an exception if the lock has been deleted.
530
531 """
532 if self.__deleted:
533 raise errors.LockError("Deleted lock %s" % self.name)
534
536 """Is the current thread sharing the lock at this time?
537
538 """
539 return threading.currentThread() in self.__shr
540
542 """Is the current thread holding the lock exclusively at this time?
543
544 """
545 return threading.currentThread() == self.__exc
546
548 """Is the current thread somehow owning the lock at this time?
549
550 This is a private version of the function, which presumes you're holding
551 the internal lock.
552
553 """
554 if shared < 0:
555 return self.__is_sharer() or self.__is_exclusive()
556 elif shared:
557 return self.__is_sharer()
558 else:
559 return self.__is_exclusive()
560
562 """Is the current thread somehow owning the lock at this time?
563
564 @param shared:
565 - < 0: check for any type of ownership (default)
566 - 0: check for exclusive ownership
567 - > 0: check for shared ownership
568
569 """
570 self.__lock.acquire()
571 try:
572 return self.__is_owned(shared=shared)
573 finally:
574 self.__lock.release()
575
576
577
578 _is_owned = is_owned
579
581 """Returns the number of pending acquires.
582
583 @rtype: int
584
585 """
586 self.__lock.acquire()
587 try:
588 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
589 finally:
590 self.__lock.release()
591
593 """Checks whether there are any pending acquires.
594
595 @rtype: bool
596
597 """
598 self.__lock.acquire()
599 try:
600
601 (_, prioqueue) = self.__find_first_pending_queue()
602
603 return not (prioqueue or
604 self.__pending or
605 self.__pending_by_prio or
606 self.__pending_shared)
607 finally:
608 self.__lock.release()
609
611 """Actually acquire the lock.
612
613 """
614 if shared:
615 self.__shr.add(threading.currentThread())
616 else:
617 self.__exc = threading.currentThread()
618
620 """Determine whether lock can be acquired.
621
622 """
623 if shared:
624 return self.__exc is None
625 else:
626 return len(self.__shr) == 0 and self.__exc is None
627
629 """Tries to find the topmost queued entry with pending acquires.
630
631 Removes empty entries while going through the list.
632
633 """
634 while self.__pending:
635 (priority, prioqueue) = self.__pending[0]
636
637 if prioqueue:
638 return (priority, prioqueue)
639
640
641 heapq.heappop(self.__pending)
642 del self.__pending_by_prio[priority]
643 assert priority not in self.__pending_shared
644
645 return (None, None)
646
648 """Checks whether the passed condition is on top of the queue.
649
650 The caller must make sure the queue isn't empty.
651
652 """
653 (_, prioqueue) = self.__find_first_pending_queue()
654
655 return cond == prioqueue[0]
656
658 """Acquire a shared lock.
659
660 @param shared: whether to acquire in shared mode; by default an
661 exclusive lock will be acquired
662 @param timeout: maximum waiting time before giving up
663 @type priority: integer
664 @param priority: Priority for acquiring lock
665
666 """
667 self.__check_deleted()
668
669
670 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
671 " %s" % self.name)
672
673
674 self.__find_first_pending_queue()
675
676
677 if not self.__pending and self.__can_acquire(shared):
678
679 self.__do_acquire(shared)
680 return True
681
682
683
684
685 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
686 return False
687
688 prioqueue = self.__pending_by_prio.get(priority, None)
689
690 if shared:
691
692 wait_condition = self.__pending_shared.get(priority, None)
693 assert (wait_condition is None or
694 (wait_condition.shared and wait_condition in prioqueue))
695 else:
696 wait_condition = None
697
698 if wait_condition is None:
699 if prioqueue is None:
700 assert priority not in self.__pending_by_prio
701
702 prioqueue = []
703 heapq.heappush(self.__pending, (priority, prioqueue))
704 self.__pending_by_prio[priority] = prioqueue
705
706 wait_condition = self.__condition_class(self.__lock, shared)
707 prioqueue.append(wait_condition)
708
709 if shared:
710
711
712 assert priority not in self.__pending_shared
713 self.__pending_shared[priority] = wait_condition
714
715 wait_start = self.__time_fn()
716 acquired = False
717
718 try:
719
720
721 while True:
722 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
723 self.__do_acquire(shared)
724 acquired = True
725 break
726
727
728
729 if (timeout is not None and
730 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
731 break
732
733
734 wait_condition.wait(timeout)
735 self.__check_deleted()
736 finally:
737
738 if not wait_condition.has_waiting():
739 prioqueue.remove(wait_condition)
740 if wait_condition.shared:
741
742
743 self.__pending_shared.pop(priority, None)
744
745 return acquired
746
747 - def acquire(self, shared=0, timeout=None, priority=None,
748 test_notify=None):
749 """Acquire a shared lock.
750
751 @type shared: integer (0/1) used as a boolean
752 @param shared: whether to acquire in shared mode; by default an
753 exclusive lock will be acquired
754 @type timeout: float
755 @param timeout: maximum waiting time before giving up
756 @type priority: integer
757 @param priority: Priority for acquiring lock
758 @type test_notify: callable or None
759 @param test_notify: Special callback function for unittesting
760
761 """
762 if priority is None:
763 priority = _DEFAULT_PRIORITY
764
765 self.__lock.acquire()
766 try:
767
768 if __debug__ and callable(test_notify):
769 test_notify()
770
771 return self.__acquire_unlocked(shared, timeout, priority)
772 finally:
773 self.__lock.release()
774
776 """Changes the lock mode from exclusive to shared.
777
778 Pending acquires in shared mode on the same priority will go ahead.
779
780 """
781 self.__lock.acquire()
782 try:
783 assert self.__is_owned(), "Lock must be owned"
784
785 if self.__is_exclusive():
786
787 self.__exc = None
788 self.__do_acquire(1)
789
790
791
792
793
794 (priority, prioqueue) = self.__find_first_pending_queue()
795 if prioqueue:
796
797 cond = self.__pending_shared.pop(priority, None)
798 if cond:
799 assert cond.shared
800 assert cond in prioqueue
801
802
803 if len(prioqueue) > 1:
804 prioqueue.remove(cond)
805 prioqueue.insert(0, cond)
806
807
808 cond.notifyAll()
809
810 assert not self.__is_exclusive()
811 assert self.__is_sharer()
812
813 return True
814 finally:
815 self.__lock.release()
816
818 """Release a Shared Lock.
819
820 You must have acquired the lock, either in shared or in exclusive mode,
821 before calling this function.
822
823 """
824 self.__lock.acquire()
825 try:
826 assert self.__is_exclusive() or self.__is_sharer(), \
827 "Cannot release non-owned lock"
828
829
830 if self.__is_exclusive():
831 self.__exc = None
832 notify = True
833 else:
834 self.__shr.remove(threading.currentThread())
835 notify = not self.__shr
836
837
838
839 if notify:
840 self.__notify_topmost()
841 finally:
842 self.__lock.release()
843
845 """Notifies topmost condition in queue of pending acquires.
846
847 """
848 (priority, prioqueue) = self.__find_first_pending_queue()
849 if prioqueue:
850 cond = prioqueue[0]
851 cond.notifyAll()
852 if cond.shared:
853
854
855 self.__pending_shared.pop(priority, None)
856
858 """Exported version of L{__notify_topmost}.
859
860 """
861 self.__lock.acquire()
862 try:
863 return self.__notify_topmost()
864 finally:
865 self.__lock.release()
866
867 - def delete(self, timeout=None, priority=None):
868 """Delete a Shared Lock.
869
870 This operation will declare the lock for removal. First the lock will be
871 acquired in exclusive mode if you don't already own it, then the lock
872 will be put in a state where any future and pending acquire() fail.
873
874 @type timeout: float
875 @param timeout: maximum waiting time before giving up
876 @type priority: integer
877 @param priority: Priority for acquiring lock
878
879 """
880 if priority is None:
881 priority = _DEFAULT_PRIORITY
882
883 self.__lock.acquire()
884 try:
885 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
886
887 self.__check_deleted()
888
889
890 acquired = self.__is_exclusive()
891
892 if not acquired:
893 acquired = self.__acquire_unlocked(0, timeout, priority)
894
895 if acquired:
896 assert self.__is_exclusive() and not self.__is_sharer(), \
897 "Lock wasn't acquired in exclusive mode"
898
899 self.__deleted = True
900 self.__exc = None
901
902 assert not (self.__exc or self.__shr), "Found owner during deletion"
903
904
905 for (_, prioqueue) in self.__pending:
906 for cond in prioqueue:
907 cond.notifyAll()
908
909 assert self.__deleted
910
911 return acquired
912 finally:
913 self.__lock.release()
914
919
922
923
924
925
926 ALL_SET = None
930 """Returns the number zero.
931
932 """
933 return 0
934
937 """Determines modes and timeouts for L{LockSet.acquire}.
938
939 @type want_all: boolean
940 @param want_all: Whether all locks in set should be acquired
941 @param timeout: Timeout in seconds or C{None}
942 @param opportunistic: Whther locks should be acquired opportunistically
943 @rtype: tuple
944 @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
945 (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
946 acquiring the lockset-internal lock (might be C{None}) and a function to
947 calculate the timeout for acquiring individual locks
948
949 """
950
951 if opportunistic and not want_all:
952 assert timeout is None, "Got timeout for an opportunistic acquisition"
953 return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
954
955
956
957 running_timeout = utils.RunningTimeout(timeout, False)
958
959 if want_all:
960 mode = _LS_ACQUIRE_ALL
961 ls_timeout_fn = running_timeout.Remaining
962 else:
963 mode = _LS_ACQUIRE_EXACT
964 ls_timeout_fn = None
965
966 if opportunistic:
967 mode = _LS_ACQUIRE_OPPORTUNISTIC
968 timeout_fn = _TimeoutZero
969 else:
970 timeout_fn = running_timeout.Remaining
971
972 return (mode, ls_timeout_fn, timeout_fn)
973
976 """Internal exception to abort an acquire on a timeout.
977
978 """
979
982 """Implements a set of locks.
983
984 This abstraction implements a set of shared locks for the same resource type,
985 distinguished by name. The user can lock a subset of the resources and the
986 LockSet will take care of acquiring the locks always in the same order, thus
987 preventing deadlock.
988
989 All the locks needed in the same set must be acquired together, though.
990
991 @type name: string
992 @ivar name: the name of the lockset
993
994 """
995 - def __init__(self, members, name, monitor=None):
996 """Constructs a new LockSet.
997
998 @type members: list of strings
999 @param members: initial members of the set
1000 @type monitor: L{LockMonitor}
1001 @param monitor: Lock monitor with which to register member locks
1002
1003 """
1004 assert members is not None, "members parameter is not a list"
1005 self.name = name
1006
1007
1008 self.__monitor = monitor
1009
1010
1011 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
1012
1013
1014
1015 self.__lockdict = {}
1016
1017 for mname in members:
1018 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
1019 monitor=monitor)
1020
1021
1022
1023
1024
1025
1026
1027
1028 self.__owners = {}
1029
1031 """Returns the name for a member lock.
1032
1033 """
1034 return "%s/%s" % (self.name, mname)
1035
1037 """Returns the lockset-internal lock.
1038
1039 """
1040 return self.__lock
1041
1043 """Returns the lockset-internal lock dictionary.
1044
1045 Accessing this structure is only safe in single-thread usage or when the
1046 lockset-internal lock is held.
1047
1048 """
1049 return self.__lockdict
1050
1052 """Is the current thread a current level owner?
1053
1054 @note: Use L{check_owned} to check if a specific lock is held
1055
1056 """
1057 return threading.currentThread() in self.__owners
1058
1060 """Check if locks are owned in a specific mode.
1061
1062 @type names: sequence or string
1063 @param names: Lock names (or a single lock name)
1064 @param shared: See L{SharedLock.is_owned}
1065 @rtype: bool
1066 @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1067 L{list_owned} to get the names of all owned locks
1068
1069 """
1070 if isinstance(names, basestring):
1071 names = [names]
1072
1073
1074 if names and self.is_owned():
1075 candidates = []
1076
1077
1078 for lname in names:
1079 try:
1080 lock = self.__lockdict[lname]
1081 except KeyError:
1082 raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1083 " have been removed)" % (lname, self.name))
1084 else:
1085 candidates.append(lock)
1086
1087 return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1088 else:
1089 return False
1090
1092 """Checks whether current thread owns internal lock.
1093
1094 Holding the internal lock is equivalent with holding all locks in the set
1095 (the opposite does not necessarily hold as it can not be easily
1096 determined). L{add} and L{remove} require the internal lock.
1097
1098 @rtype: boolean
1099
1100 """
1101 return self.__lock.is_owned()
1102
1104 """Note the current thread owns the given lock"""
1105 if name is None:
1106 if not self.is_owned():
1107 self.__owners[threading.currentThread()] = set()
1108 else:
1109 if self.is_owned():
1110 self.__owners[threading.currentThread()].add(name)
1111 else:
1112 self.__owners[threading.currentThread()] = set([name])
1113
1115 """Note the current thread owns the given lock"""
1116
1117 assert not (name is None and self.__lock.is_owned()), \
1118 "Cannot hold internal lock when deleting owner status"
1119
1120 if name is not None:
1121 self.__owners[threading.currentThread()].remove(name)
1122
1123
1124 if not (self.__lock.is_owned() or
1125 self.__owners[threading.currentThread()]):
1126 del self.__owners[threading.currentThread()]
1127
1129 """Get the set of resource names owned by the current thread"""
1130 if self.is_owned():
1131 return self.__owners[threading.currentThread()].copy()
1132 else:
1133 return set()
1134
1136 """Release and delete all resources owned by the current thread"""
1137 for lname in self.list_owned():
1138 lock = self.__lockdict[lname]
1139 if lock.is_owned():
1140 lock.release()
1141 self._del_owned(name=lname)
1142
1144 """Return the current set of names.
1145
1146 Only call this function while holding __lock and don't iterate on the
1147 result after releasing the lock.
1148
1149 """
1150 return self.__lockdict.keys()
1151
1153 """Return a copy of the current set of elements.
1154
1155 Used only for debugging purposes.
1156
1157 """
1158
1159
1160 release_lock = False
1161 if not self.__lock.is_owned():
1162 release_lock = True
1163 self.__lock.acquire(shared=1)
1164 try:
1165 result = self.__names()
1166 finally:
1167 if release_lock:
1168 self.__lock.release()
1169 return set(result)
1170
1171 - def acquire(self, names, timeout=None, shared=0, priority=None,
1172 opportunistic=False, test_notify=None):
1173 """Acquire a set of resource locks.
1174
1175 @note: When acquiring locks opportunistically, any number of locks might
1176 actually be acquired, even zero.
1177
1178 @type names: list of strings (or string)
1179 @param names: the names of the locks which shall be acquired
1180 (special lock names, or instance/node names)
1181 @type shared: integer (0/1) used as a boolean
1182 @param shared: whether to acquire in shared mode; by default an
1183 exclusive lock will be acquired
1184 @type timeout: float or None
1185 @param timeout: Maximum time to acquire all locks; for opportunistic
1186 acquisitions, a timeout can only be given when C{names} is C{None}, in
1187 which case it is exclusively used for acquiring the L{LockSet}-internal
1188 lock; opportunistic acquisitions don't use a timeout for acquiring
1189 individual locks
1190 @type priority: integer
1191 @param priority: Priority for acquiring locks
1192 @type opportunistic: boolean
1193 @param opportunistic: Acquire locks opportunistically; use the return value
1194 to determine which locks were actually acquired
1195 @type test_notify: callable or None
1196 @param test_notify: Special callback function for unittesting
1197
1198 @return: Set of all locks successfully acquired or None in case of timeout
1199
1200 @raise errors.LockError: when any lock we try to acquire has
1201 been deleted before we succeed. In this case none of the
1202 locks requested will be acquired.
1203
1204 """
1205 assert timeout is None or timeout >= 0.0
1206
1207
1208 assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1209 " (lockset %s)" % self.name)
1210
1211 if priority is None:
1212 priority = _DEFAULT_PRIORITY
1213
1214 try:
1215 if names is not None:
1216 assert timeout is None or not opportunistic, \
1217 ("Opportunistic acquisitions can only use a timeout if no"
1218 " names are given; see docstring for details")
1219
1220
1221 if isinstance(names, basestring):
1222 names = [names]
1223
1224 (mode, _, timeout_fn) = \
1225 _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
1226
1227 return self.__acquire_inner(names, mode, shared, priority,
1228 timeout_fn, test_notify)
1229
1230 else:
1231 (mode, ls_timeout_fn, timeout_fn) = \
1232 _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243 if not self.__lock.acquire(shared=shared, priority=priority,
1244 timeout=ls_timeout_fn()):
1245 raise _AcquireTimeout()
1246
1247 try:
1248
1249 self._add_owned()
1250
1251 return self.__acquire_inner(self.__names(), mode, shared,
1252 priority, timeout_fn, test_notify)
1253 except:
1254
1255
1256
1257 self.__lock.release()
1258 self._del_owned()
1259 raise
1260
1261 except _AcquireTimeout:
1262 return None
1263
1264 - def __acquire_inner(self, names, mode, shared, priority,
1265 timeout_fn, test_notify):
1266 """Inner logic for acquiring a number of locks.
1267
1268 Acquisition modes:
1269
1270 - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
1271 deleted locks can be ignored as the whole set is being acquired with
1272 its internal lock held
1273 - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
1274 timeouts and deleted locks are fatal
1275 - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
1276 all within the set) which should be acquired opportunistically, that is
1277 failures are ignored
1278
1279 @param names: Names of the locks to be acquired
1280 @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
1281 @param shared: Whether to acquire in shared mode
1282 @param timeout_fn: Function returning remaining timeout (C{None} for
1283 opportunistic acquisitions)
1284 @param priority: Priority for acquiring locks
1285 @param test_notify: Special callback function for unittesting
1286
1287 """
1288 assert mode in _LS_ACQUIRE_MODES
1289
1290 acquire_list = []
1291
1292
1293
1294
1295
1296 for lname in sorted(frozenset(names)):
1297 try:
1298 lock = self.__lockdict[lname]
1299 except KeyError:
1300
1301
1302
1303 if mode == _LS_ACQUIRE_EXACT:
1304 raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1305 " been removed)" % (lname, self.name))
1306 else:
1307 acquire_list.append((lname, lock))
1308
1309
1310 acquired = set()
1311
1312 try:
1313
1314
1315
1316
1317
1318 for (lname, lock) in acquire_list:
1319 if __debug__ and callable(test_notify):
1320 test_notify_fn = lambda: test_notify(lname)
1321 else:
1322 test_notify_fn = None
1323
1324 timeout = timeout_fn()
1325
1326 try:
1327
1328 acq_success = lock.acquire(shared=shared, timeout=timeout,
1329 priority=priority,
1330 test_notify=test_notify_fn)
1331 except errors.LockError:
1332 if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
1333
1334
1335 continue
1336
1337 raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1338 " been removed)" % (lname, self.name))
1339
1340 if not acq_success:
1341
1342 if mode == _LS_ACQUIRE_OPPORTUNISTIC:
1343
1344 continue
1345
1346 if timeout is None:
1347
1348
1349 raise errors.LockError("Failed to get lock %s (set %s)" %
1350 (lname, self.name))
1351
1352 raise _AcquireTimeout()
1353
1354 try:
1355
1356 self._add_owned(name=lname)
1357 acquired.add(lname)
1358
1359 except:
1360
1361
1362
1363 if lock.is_owned():
1364 lock.release()
1365 raise
1366
1367 except:
1368
1369 self._release_and_delete_owned()
1370 raise
1371
1372 return acquired
1373
1375 """Downgrade a set of resource locks from exclusive to shared mode.
1376
1377 The locks must have been acquired in exclusive mode.
1378
1379 """
1380 assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1381 " lock" % self.name)
1382
1383
1384 if isinstance(names, basestring):
1385 names = [names]
1386
1387 owned = self.list_owned()
1388
1389 if names is None:
1390 names = owned
1391 else:
1392 names = set(names)
1393 assert owned.issuperset(names), \
1394 ("downgrade() on unheld resources %s (set %s)" %
1395 (names.difference(owned), self.name))
1396
1397 for lockname in names:
1398 self.__lockdict[lockname].downgrade()
1399
1400
1401 if self.__lock.is_owned(shared=0):
1402
1403 if not compat.any(lock.is_owned(shared=0)
1404 for lock in self.__lockdict.values()):
1405 self.__lock.downgrade()
1406 assert self.__lock.is_owned(shared=1)
1407
1408 return True
1409
1411 """Release a set of resource locks, at the same level.
1412
1413 You must have acquired the locks, either in shared or in exclusive mode,
1414 before releasing them.
1415
1416 @type names: list of strings, or None
1417 @param names: the names of the locks which shall be released
1418 (defaults to all the locks acquired at that level).
1419
1420 """
1421 assert self.is_owned(), ("release() on lock set %s while not owner" %
1422 self.name)
1423
1424
1425 if isinstance(names, basestring):
1426 names = [names]
1427
1428 if names is None:
1429 names = self.list_owned()
1430 else:
1431 names = set(names)
1432 assert self.list_owned().issuperset(names), (
1433 "release() on unheld resources %s (set %s)" %
1434 (names.difference(self.list_owned()), self.name))
1435
1436
1437
1438 if self.__lock.is_owned():
1439 self.__lock.release()
1440 self._del_owned()
1441
1442 for lockname in names:
1443
1444
1445 self.__lockdict[lockname].release()
1446 self._del_owned(name=lockname)
1447
1448 - def add(self, names, acquired=0, shared=0):
1449 """Add a new set of elements to the set
1450
1451 @type names: list of strings
1452 @param names: names of the new elements to add
1453 @type acquired: integer (0/1) used as a boolean
1454 @param acquired: pre-acquire the new resource?
1455 @type shared: integer (0/1) used as a boolean
1456 @param shared: is the pre-acquisition shared?
1457
1458 """
1459
1460 assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1461 ("Cannot add locks if the set %s is only partially owned, or shared" %
1462 self.name)
1463
1464
1465 if isinstance(names, basestring):
1466 names = [names]
1467
1468
1469
1470 release_lock = False
1471 if not self.__lock.is_owned():
1472 release_lock = True
1473 self.__lock.acquire()
1474
1475 try:
1476 invalid_names = set(self.__names()).intersection(names)
1477 if invalid_names:
1478
1479
1480
1481 raise errors.LockError("duplicate add(%s) on lockset %s" %
1482 (invalid_names, self.name))
1483
1484 for lockname in names:
1485 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1486
1487 if acquired:
1488
1489
1490 lock.acquire(shared=shared)
1491
1492 try:
1493 self._add_owned(name=lockname)
1494 except:
1495
1496
1497
1498
1499
1500
1501 lock.release()
1502 raise
1503
1504 self.__lockdict[lockname] = lock
1505
1506 finally:
1507
1508 if release_lock:
1509 self.__lock.release()
1510
1511 return True
1512
1514 """Remove elements from the lock set.
1515
1516 You can either not hold anything in the lockset or already hold a superset
1517 of the elements you want to delete, exclusively.
1518
1519 @type names: list of strings
1520 @param names: names of the resource to remove.
1521
1522 @return: a list of locks which we removed; the list is always
1523 equal to the names list if we were holding all the locks
1524 exclusively
1525
1526 """
1527
1528 if isinstance(names, basestring):
1529 names = [names]
1530
1531
1532
1533
1534 assert not self.is_owned() or self.list_owned().issuperset(names), (
1535 "remove() on acquired lockset %s while not owning all elements" %
1536 self.name)
1537
1538 removed = []
1539
1540 for lname in names:
1541
1542
1543
1544
1545
1546 try:
1547 self.__lockdict[lname].delete()
1548 removed.append(lname)
1549 except (KeyError, errors.LockError):
1550
1551 assert not self.is_owned(), ("remove failed while holding lockset %s" %
1552 self.name)
1553 else:
1554
1555
1556
1557
1558
1559
1560
1561 del self.__lockdict[lname]
1562
1563 if self.is_owned():
1564 self._del_owned(name=lname)
1565
1566 return removed
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585 (LEVEL_CLUSTER,
1586 LEVEL_INSTANCE,
1587 LEVEL_NODE_ALLOC,
1588 LEVEL_NODEGROUP,
1589 LEVEL_NODE,
1590 LEVEL_NODE_RES,
1591 LEVEL_NETWORK) = range(0, 7)
1592
1593 LEVELS = [
1594 LEVEL_CLUSTER,
1595 LEVEL_INSTANCE,
1596 LEVEL_NODE_ALLOC,
1597 LEVEL_NODEGROUP,
1598 LEVEL_NODE,
1599 LEVEL_NODE_RES,
1600 LEVEL_NETWORK,
1601 ]
1602
1603
1604 LEVELS_MOD = compat.UniqueFrozenset([
1605 LEVEL_NODE_RES,
1606 LEVEL_NODE,
1607 LEVEL_NODEGROUP,
1608 LEVEL_INSTANCE,
1609 LEVEL_NETWORK,
1610 ])
1611
1612
1613 LEVEL_NAMES = {
1614 LEVEL_CLUSTER: "cluster",
1615 LEVEL_INSTANCE: "instance",
1616 LEVEL_NODE_ALLOC: "node-alloc",
1617 LEVEL_NODEGROUP: "nodegroup",
1618 LEVEL_NODE: "node",
1619 LEVEL_NODE_RES: "node-res",
1620 LEVEL_NETWORK: "network",
1621 }
1622
1623
1624 BGL = "BGL"
1625
1626
1627 NAL = "NAL"
1631 """The Ganeti Locking Library
1632
1633 The purpose of this small library is to manage locking for ganeti clusters
1634 in a central place, while at the same time doing dynamic checks against
1635 possible deadlocks. It will also make it easier to transition to a different
1636 lock type should we migrate away from python threads.
1637
1638 """
1639 _instance = None
1640
1641 - def __init__(self, node_uuids, nodegroups, instance_names, networks):
1642 """Constructs a new GanetiLockManager object.
1643
1644 There should be only a GanetiLockManager object at any time, so this
1645 function raises an error if this is not the case.
1646
1647 @param node_uuids: list of node UUIDs
1648 @param nodegroups: list of nodegroup uuids
1649 @param instance_names: list of instance names
1650
1651 """
1652 assert self.__class__._instance is None, \
1653 "double GanetiLockManager instance"
1654
1655 self.__class__._instance = self
1656
1657 self._monitor = LockMonitor()
1658
1659
1660
1661 self.__keyring = {
1662 LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1663 LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
1664 LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
1665 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1666 LEVEL_INSTANCE: LockSet(instance_names, "instance",
1667 monitor=self._monitor),
1668 LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
1669 LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
1670 }
1671
1672 assert compat.all(ls.name == LEVEL_NAMES[level]
1673 for (level, ls) in self.__keyring.items()), \
1674 "Keyring name mismatch"
1675
1677 """Registers a new lock with the monitor.
1678
1679 See L{LockMonitor.RegisterLock}.
1680
1681 """
1682 return self._monitor.RegisterLock(provider)
1683
1685 """Queries information from all locks.
1686
1687 See L{LockMonitor.QueryLocks}.
1688
1689 """
1690 return self._monitor.QueryLocks(fields)
1691
1693 """List the lock names at the given level.
1694
1695 This can be used for debugging/testing purposes.
1696
1697 @param level: the level whose list of locks to get
1698
1699 """
1700 assert level in LEVELS, "Invalid locking level %s" % level
1701 return self.__keyring[level]._names()
1702
1704 """Check whether we are owning locks at the given level
1705
1706 """
1707 return self.__keyring[level].is_owned()
1708
1710 """Get the set of owned locks at the given level
1711
1712 """
1713 return self.__keyring[level].list_owned()
1714
1716 """Check if locks at a certain level are owned in a specific mode.
1717
1718 @see: L{LockSet.check_owned}
1719
1720 """
1721 return self.__keyring[level].check_owned(names, shared=shared)
1722
1724 """Checks whether current thread owns all locks at a certain level.
1725
1726 @see: L{LockSet.owning_all}
1727
1728 """
1729 return self.__keyring[level].owning_all()
1730
1732 """Check that we don't own any lock at a level greater than the given one.
1733
1734 """
1735
1736
1737 return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1738
1740 """Check if the current thread owns the BGL.
1741
1742 Both an exclusive or a shared acquisition work.
1743
1744 """
1745 return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1746
1747 @staticmethod
1749 """Check if the level contains the BGL.
1750
1751 Check if acting on the given level and set of names will change
1752 the status of the Big Ganeti Lock.
1753
1754 """
1755 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1756
1757 - def acquire(self, level, names, timeout=None, shared=0, priority=None,
1758 opportunistic=False):
1759 """Acquire a set of resource locks, at the same level.
1760
1761 @type level: member of locking.LEVELS
1762 @param level: the level at which the locks shall be acquired
1763 @type names: list of strings (or string)
1764 @param names: the names of the locks which shall be acquired
1765 (special lock names, or instance/node names)
1766 @type shared: integer (0/1) used as a boolean
1767 @param shared: whether to acquire in shared mode; by default
1768 an exclusive lock will be acquired
1769 @type timeout: float
1770 @param timeout: Maximum time to acquire all locks
1771 @type priority: integer
1772 @param priority: Priority for acquiring lock
1773 @type opportunistic: boolean
1774 @param opportunistic: Acquire locks opportunistically; use the return value
1775 to determine which locks were actually acquired
1776
1777 """
1778 assert level in LEVELS, "Invalid locking level %s" % level
1779
1780
1781
1782
1783
1784
1785
1786 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1787 "You must own the Big Ganeti Lock before acquiring any other")
1788
1789
1790 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1791 " while owning some at a greater one")
1792
1793
1794 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1795 priority=priority,
1796 opportunistic=opportunistic)
1797
1799 """Downgrade a set of resource locks from exclusive to shared mode.
1800
1801 You must have acquired the locks in exclusive mode.
1802
1803 @type level: member of locking.LEVELS
1804 @param level: the level at which the locks shall be downgraded
1805 @type names: list of strings, or None
1806 @param names: the names of the locks which shall be downgraded
1807 (defaults to all the locks acquired at the level)
1808
1809 """
1810 assert level in LEVELS, "Invalid locking level %s" % level
1811
1812 return self.__keyring[level].downgrade(names=names)
1813
1814 - def release(self, level, names=None):
1815 """Release a set of resource locks, at the same level.
1816
1817 You must have acquired the locks, either in shared or in exclusive
1818 mode, before releasing them.
1819
1820 @type level: member of locking.LEVELS
1821 @param level: the level at which the locks shall be released
1822 @type names: list of strings, or None
1823 @param names: the names of the locks which shall be released
1824 (defaults to all the locks acquired at that level)
1825
1826 """
1827 assert level in LEVELS, "Invalid locking level %s" % level
1828 assert (not self._contains_BGL(level, names) or
1829 not self._upper_owned(LEVEL_CLUSTER)), (
1830 "Cannot release the Big Ganeti Lock while holding something"
1831 " at upper levels (%r)" %
1832 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1833 for i in self.__keyring.keys()]), ))
1834
1835
1836 return self.__keyring[level].release(names)
1837
1838 - def add(self, level, names, acquired=0, shared=0):
1839 """Add locks at the specified level.
1840
1841 @type level: member of locking.LEVELS_MOD
1842 @param level: the level at which the locks shall be added
1843 @type names: list of strings
1844 @param names: names of the locks to acquire
1845 @type acquired: integer (0/1) used as a boolean
1846 @param acquired: whether to acquire the newly added locks
1847 @type shared: integer (0/1) used as a boolean
1848 @param shared: whether the acquisition will be shared
1849
1850 """
1851 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1852 assert self._BGL_owned(), ("You must own the BGL before performing other"
1853 " operations")
1854 assert not self._upper_owned(level), ("Cannot add locks at a level"
1855 " while owning some at a greater one")
1856 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1857
1858 - def remove(self, level, names):
1859 """Remove locks from the specified level.
1860
1861 You must either already own the locks you are trying to remove
1862 exclusively or not own any lock at an upper level.
1863
1864 @type level: member of locking.LEVELS_MOD
1865 @param level: the level at which the locks shall be removed
1866 @type names: list of strings
1867 @param names: the names of the locks which shall be removed
1868 (special lock names, or instance/node names)
1869
1870 """
1871 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1872 assert self._BGL_owned(), ("You must own the BGL before performing other"
1873 " operations")
1874
1875
1876
1877 assert self.is_owned(level) or not self._upper_owned(level), (
1878 "Cannot remove locks at a level while not owning it or"
1879 " owning some at a greater one")
1880 return self.__keyring[level].remove(names)
1881
1884 """Sorting key function.
1885
1886 Sort by name, registration order and then order of information. This provides
1887 a stable sort order over different providers, even if they return the same
1888 name.
1889
1890 """
1891 (name, _, _, _) = item
1892
1893 return (utils.NiceSortKey(name), num, idx)
1894
1897 _LOCK_ATTR = "_lock"
1898
1900 """Initializes this class.
1901
1902 """
1903 self._lock = SharedLock("LockMonitor")
1904
1905
1906 self._counter = itertools.count(0)
1907
1908
1909
1910 self._locks = weakref.WeakKeyDictionary()
1911
1912 @ssynchronized(_LOCK_ATTR)
1914 """Registers a new lock.
1915
1916 @param provider: Object with a callable method named C{GetLockInfo}, taking
1917 a single C{set} containing the requested information items
1918 @note: It would be nicer to only receive the function generating the
1919 requested information but, as it turns out, weak references to bound
1920 methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1921 workarounds, but none of the ones I found works properly in combination
1922 with a standard C{WeakKeyDictionary}
1923
1924 """
1925 assert provider not in self._locks, "Duplicate registration"
1926
1927
1928
1929
1930
1931
1932
1933 self._locks[provider] = self._counter.next()
1934
1936 """Get information from all locks.
1937
1938 """
1939
1940 self._lock.acquire(shared=1)
1941 try:
1942 items = self._locks.items()
1943 finally:
1944 self._lock.release()
1945
1946 return [(info, idx, num)
1947 for (provider, num) in items
1948 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1949
1966
1968 """Queries information from all locks.
1969
1970 @type fields: list of strings
1971 @param fields: List of fields to return
1972
1973 """
1974 (qobj, ctx) = self._Query(fields)
1975
1976
1977 return query.GetQueryResponse(qobj, ctx)
1978