1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36 import time
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
49
50
51
52 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
53
54
55 (_LS_ACQUIRE_EXACT,
56 _LS_ACQUIRE_ALL,
57 _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
58
59 _LS_ACQUIRE_MODES = compat.UniqueFrozenset([
60 _LS_ACQUIRE_EXACT,
61 _LS_ACQUIRE_ALL,
62 _LS_ACQUIRE_OPPORTUNISTIC,
63 ])
67 """Shared Synchronization decorator.
68
69 Calls the function holding the given lock, either in exclusive or shared
70 mode. It requires the passed lock to be a SharedLock (or support its
71 semantics).
72
73 @type mylock: lockable object or string
74 @param mylock: lock to acquire or class member name of the lock to acquire
75
76 """
77 def wrap(fn):
78 def sync_function(*args, **kwargs):
79 if isinstance(mylock, basestring):
80 assert args, "cannot ssynchronize on non-class method: self not found"
81
82 lock = getattr(args[0], mylock)
83 else:
84 lock = mylock
85 lock.acquire(shared=shared)
86 try:
87 return fn(*args, **kwargs)
88 finally:
89 lock.release()
90 return sync_function
91 return wrap
92
95 """Helper class for SingleNotifyPipeCondition
96
97 """
98 __slots__ = [
99 "_fd",
100 ]
101
103 """Constructor for _SingleNotifyPipeConditionWaiter
104
105 @type fd: int
106 @param fd: File descriptor to wait for
107
108 """
109 object.__init__(self)
110 self._fd = fd
111
113 """Wait for something to happen on the pipe.
114
115 @type timeout: float or None
116 @param timeout: Timeout for waiting (can be None)
117
118 """
119 running_timeout = utils.RunningTimeout(timeout, True)
120 poller = select.poll()
121 poller.register(self._fd, select.POLLHUP)
122
123 while True:
124 remaining_time = running_timeout.Remaining()
125
126 if remaining_time is not None:
127 if remaining_time < 0.0:
128 break
129
130
131 remaining_time *= 1000
132
133 try:
134 result = poller.poll(remaining_time)
135 except EnvironmentError, err:
136 if err.errno != errno.EINTR:
137 raise
138 result = None
139
140
141 if result and result[0][0] == self._fd:
142 break
143
146 """Base class containing common code for conditions.
147
148 Some of this code is taken from python's threading module.
149
150 """
151 __slots__ = [
152 "_lock",
153 "acquire",
154 "release",
155 "_is_owned",
156 "_acquire_restore",
157 "_release_save",
158 ]
159
187
189 """Check whether lock is owned by current thread.
190
191 """
192 if self._lock.acquire(0):
193 self._lock.release()
194 return False
195 return True
196
199
202
204 """Raise an exception if the current thread doesn't own the lock.
205
206 """
207 if not self._is_owned():
208 raise RuntimeError("cannot work with un-aquired lock")
209
212 """Condition which can only be notified once.
213
214 This condition class uses pipes and poll, internally, to be able to wait for
215 notification with a timeout, without resorting to polling. It is almost
216 compatible with Python's threading.Condition, with the following differences:
217 - notifyAll can only be called once, and no wait can happen after that
218 - notify is not supported, only notifyAll
219
220 """
221
222 __slots__ = [
223 "_read_fd",
224 "_write_fd",
225 "_nwaiters",
226 "_notified",
227 ]
228
229 _waiter_class = _SingleNotifyPipeConditionWaiter
230
232 """Constructor for SingleNotifyPipeCondition
233
234 """
235 _BaseCondition.__init__(self, lock)
236 self._nwaiters = 0
237 self._notified = False
238 self._read_fd = None
239 self._write_fd = None
240
242 """Throws an exception if already notified.
243
244 """
245 if self._notified:
246 raise RuntimeError("cannot use already notified condition")
247
249 """Cleanup open file descriptors, if any.
250
251 """
252 if self._read_fd is not None:
253 os.close(self._read_fd)
254 self._read_fd = None
255
256 if self._write_fd is not None:
257 os.close(self._write_fd)
258 self._write_fd = None
259
260 - def wait(self, timeout):
261 """Wait for a notification.
262
263 @type timeout: float or None
264 @param timeout: Waiting timeout (can be None)
265
266 """
267 self._check_owned()
268 self._check_unnotified()
269
270 self._nwaiters += 1
271 try:
272 if self._read_fd is None:
273 (self._read_fd, self._write_fd) = os.pipe()
274
275 wait_fn = self._waiter_class(self._read_fd)
276 state = self._release_save()
277 try:
278
279 wait_fn(timeout)
280 finally:
281
282 self._acquire_restore(state)
283 finally:
284 self._nwaiters -= 1
285 if self._nwaiters == 0:
286 self._Cleanup()
287
289 """Close the writing side of the pipe to notify all waiters.
290
291 """
292 self._check_owned()
293 self._check_unnotified()
294 self._notified = True
295 if self._write_fd is not None:
296 os.close(self._write_fd)
297 self._write_fd = None
298
301 """Group-only non-polling condition with counters.
302
303 This condition class uses pipes and poll, internally, to be able to wait for
304 notification with a timeout, without resorting to polling. It is almost
305 compatible with Python's threading.Condition, but only supports notifyAll and
306 non-recursive locks. As an additional features it's able to report whether
307 there are any waiting threads.
308
309 """
310 __slots__ = [
311 "_waiters",
312 "_single_condition",
313 ]
314
315 _single_condition_class = SingleNotifyPipeCondition
316
318 """Initializes this class.
319
320 """
321 _BaseCondition.__init__(self, lock)
322 self._waiters = set()
323 self._single_condition = self._single_condition_class(self._lock)
324
325 - def wait(self, timeout):
326 """Wait for a notification.
327
328 @type timeout: float or None
329 @param timeout: Waiting timeout (can be None)
330
331 """
332 self._check_owned()
333
334
335
336 cond = self._single_condition
337
338 self._waiters.add(threading.currentThread())
339 try:
340 cond.wait(timeout)
341 finally:
342 self._check_owned()
343 self._waiters.remove(threading.currentThread())
344
346 """Notify all currently waiting threads.
347
348 """
349 self._check_owned()
350 self._single_condition.notifyAll()
351 self._single_condition = self._single_condition_class(self._lock)
352
354 """Returns a list of all waiting threads.
355
356 """
357 self._check_owned()
358
359 return self._waiters
360
362 """Returns whether there are active waiters.
363
364 """
365 self._check_owned()
366
367 return bool(self._waiters)
368
370 return ("<%s.%s waiters=%s at %#x>" %
371 (self.__class__.__module__, self.__class__.__name__,
372 self._waiters, id(self)))
373
376 __slots__ = [
377 "shared",
378 ]
379
381 """Initializes this class.
382
383 """
384 self.shared = shared
385 PipeCondition.__init__(self, lock)
386
389 """Implements a shared lock.
390
391 Multiple threads can acquire the lock in a shared way by calling
392 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
393 threads can call C{acquire(shared=0)}.
394
395 Notes on data structures: C{__pending} contains a priority queue (heapq) of
396 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
397 ...]}. Each per-priority queue contains a normal in-order list of conditions
398 to be notified when the lock can be acquired. Shared locks are grouped
399 together by priority and the condition for them is stored in
400 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
401 references for the per-priority queues indexed by priority for faster access.
402
403 @type name: string
404 @ivar name: the name of the lock
405
406 """
407 __slots__ = [
408 "__weakref__",
409 "__deleted",
410 "__exc",
411 "__lock",
412 "__pending",
413 "__pending_by_prio",
414 "__pending_shared",
415 "__shr",
416 "__time_fn",
417 "name",
418 ]
419
420 __condition_class = _PipeConditionWithMode
421
422 - def __init__(self, name, monitor=None, _time_fn=time.time):
423 """Construct a new SharedLock.
424
425 @param name: the name of the lock
426 @type monitor: L{LockMonitor}
427 @param monitor: Lock monitor with which to register
428
429 """
430 object.__init__(self)
431
432 self.name = name
433
434
435 self.__time_fn = _time_fn
436
437
438 self.__lock = threading.Lock()
439
440
441 self.__pending = []
442 self.__pending_by_prio = {}
443 self.__pending_shared = {}
444
445
446 self.__shr = set()
447 self.__exc = None
448
449
450 self.__deleted = False
451
452
453 if monitor:
454 logging.debug("Adding lock %s to monitor", name)
455 monitor.RegisterLock(self)
456
458 return ("<%s.%s name=%s at %#x>" %
459 (self.__class__.__module__, self.__class__.__name__,
460 self.name, id(self)))
461
463 """Retrieves information for querying locks.
464
465 @type requested: set
466 @param requested: Requested information, see C{query.LQ_*}
467
468 """
469 self.__lock.acquire()
470 try:
471
472
473
474 mode = None
475 owner_names = None
476
477 if query.LQ_MODE in requested:
478 if self.__deleted:
479 mode = _DELETED_TEXT
480 assert not (self.__exc or self.__shr)
481 elif self.__exc:
482 mode = _EXCLUSIVE_TEXT
483 elif self.__shr:
484 mode = _SHARED_TEXT
485
486
487 if query.LQ_OWNER in requested:
488 if self.__exc:
489 owner = [self.__exc]
490 else:
491 owner = self.__shr
492
493 if owner:
494 assert not self.__deleted
495 owner_names = [i.getName() for i in owner]
496
497
498 if query.LQ_PENDING in requested:
499 pending = []
500
501
502 for (_, prioqueue) in sorted(self.__pending):
503 for cond in prioqueue:
504 if cond.shared:
505 pendmode = _SHARED_TEXT
506 else:
507 pendmode = _EXCLUSIVE_TEXT
508
509
510 pending.append((pendmode, [i.getName()
511 for i in cond.get_waiting()]))
512 else:
513 pending = None
514
515 return [(self.name, mode, owner_names, pending)]
516 finally:
517 self.__lock.release()
518
520 """Raises an exception if the lock has been deleted.
521
522 """
523 if self.__deleted:
524 raise errors.LockError("Deleted lock %s" % self.name)
525
527 """Is the current thread sharing the lock at this time?
528
529 """
530 return threading.currentThread() in self.__shr
531
533 """Is the current thread holding the lock exclusively at this time?
534
535 """
536 return threading.currentThread() == self.__exc
537
539 """Is the current thread somehow owning the lock at this time?
540
541 This is a private version of the function, which presumes you're holding
542 the internal lock.
543
544 """
545 if shared < 0:
546 return self.__is_sharer() or self.__is_exclusive()
547 elif shared:
548 return self.__is_sharer()
549 else:
550 return self.__is_exclusive()
551
553 """Is the current thread somehow owning the lock at this time?
554
555 @param shared:
556 - < 0: check for any type of ownership (default)
557 - 0: check for exclusive ownership
558 - > 0: check for shared ownership
559
560 """
561 self.__lock.acquire()
562 try:
563 return self.__is_owned(shared=shared)
564 finally:
565 self.__lock.release()
566
567
568
569 _is_owned = is_owned
570
572 """Returns the number of pending acquires.
573
574 @rtype: int
575
576 """
577 self.__lock.acquire()
578 try:
579 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
580 finally:
581 self.__lock.release()
582
584 """Checks whether there are any pending acquires.
585
586 @rtype: bool
587
588 """
589 self.__lock.acquire()
590 try:
591
592 (_, prioqueue) = self.__find_first_pending_queue()
593
594 return not (prioqueue or
595 self.__pending or
596 self.__pending_by_prio or
597 self.__pending_shared)
598 finally:
599 self.__lock.release()
600
602 """Actually acquire the lock.
603
604 """
605 if shared:
606 self.__shr.add(threading.currentThread())
607 else:
608 self.__exc = threading.currentThread()
609
611 """Determine whether lock can be acquired.
612
613 """
614 if shared:
615 return self.__exc is None
616 else:
617 return len(self.__shr) == 0 and self.__exc is None
618
620 """Tries to find the topmost queued entry with pending acquires.
621
622 Removes empty entries while going through the list.
623
624 """
625 while self.__pending:
626 (priority, prioqueue) = self.__pending[0]
627
628 if prioqueue:
629 return (priority, prioqueue)
630
631
632 heapq.heappop(self.__pending)
633 del self.__pending_by_prio[priority]
634 assert priority not in self.__pending_shared
635
636 return (None, None)
637
639 """Checks whether the passed condition is on top of the queue.
640
641 The caller must make sure the queue isn't empty.
642
643 """
644 (_, prioqueue) = self.__find_first_pending_queue()
645
646 return cond == prioqueue[0]
647
649 """Acquire a shared lock.
650
651 @param shared: whether to acquire in shared mode; by default an
652 exclusive lock will be acquired
653 @param timeout: maximum waiting time before giving up
654 @type priority: integer
655 @param priority: Priority for acquiring lock
656
657 """
658 self.__check_deleted()
659
660
661 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
662 " %s" % self.name)
663
664
665 self.__find_first_pending_queue()
666
667
668 if not self.__pending and self.__can_acquire(shared):
669
670 self.__do_acquire(shared)
671 return True
672
673
674
675
676 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
677 return False
678
679 prioqueue = self.__pending_by_prio.get(priority, None)
680
681 if shared:
682
683 wait_condition = self.__pending_shared.get(priority, None)
684 assert (wait_condition is None or
685 (wait_condition.shared and wait_condition in prioqueue))
686 else:
687 wait_condition = None
688
689 if wait_condition is None:
690 if prioqueue is None:
691 assert priority not in self.__pending_by_prio
692
693 prioqueue = []
694 heapq.heappush(self.__pending, (priority, prioqueue))
695 self.__pending_by_prio[priority] = prioqueue
696
697 wait_condition = self.__condition_class(self.__lock, shared)
698 prioqueue.append(wait_condition)
699
700 if shared:
701
702
703 assert priority not in self.__pending_shared
704 self.__pending_shared[priority] = wait_condition
705
706 wait_start = self.__time_fn()
707 acquired = False
708
709 try:
710
711
712 while True:
713 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
714 self.__do_acquire(shared)
715 acquired = True
716 break
717
718
719
720 if (timeout is not None and
721 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
722 break
723
724
725 wait_condition.wait(timeout)
726 self.__check_deleted()
727 finally:
728
729 if not wait_condition.has_waiting():
730 prioqueue.remove(wait_condition)
731 if wait_condition.shared:
732
733
734 self.__pending_shared.pop(priority, None)
735
736 return acquired
737
738 - def acquire(self, shared=0, timeout=None, priority=None,
739 test_notify=None):
740 """Acquire a shared lock.
741
742 @type shared: integer (0/1) used as a boolean
743 @param shared: whether to acquire in shared mode; by default an
744 exclusive lock will be acquired
745 @type timeout: float
746 @param timeout: maximum waiting time before giving up
747 @type priority: integer
748 @param priority: Priority for acquiring lock
749 @type test_notify: callable or None
750 @param test_notify: Special callback function for unittesting
751
752 """
753 if priority is None:
754 priority = _DEFAULT_PRIORITY
755
756 self.__lock.acquire()
757 try:
758
759 if __debug__ and callable(test_notify):
760 test_notify()
761
762 return self.__acquire_unlocked(shared, timeout, priority)
763 finally:
764 self.__lock.release()
765
767 """Changes the lock mode from exclusive to shared.
768
769 Pending acquires in shared mode on the same priority will go ahead.
770
771 """
772 self.__lock.acquire()
773 try:
774 assert self.__is_owned(), "Lock must be owned"
775
776 if self.__is_exclusive():
777
778 self.__exc = None
779 self.__do_acquire(1)
780
781
782
783
784
785 (priority, prioqueue) = self.__find_first_pending_queue()
786 if prioqueue:
787
788 cond = self.__pending_shared.pop(priority, None)
789 if cond:
790 assert cond.shared
791 assert cond in prioqueue
792
793
794 if len(prioqueue) > 1:
795 prioqueue.remove(cond)
796 prioqueue.insert(0, cond)
797
798
799 cond.notifyAll()
800
801 assert not self.__is_exclusive()
802 assert self.__is_sharer()
803
804 return True
805 finally:
806 self.__lock.release()
807
809 """Release a Shared Lock.
810
811 You must have acquired the lock, either in shared or in exclusive mode,
812 before calling this function.
813
814 """
815 self.__lock.acquire()
816 try:
817 assert self.__is_exclusive() or self.__is_sharer(), \
818 "Cannot release non-owned lock"
819
820
821 if self.__is_exclusive():
822 self.__exc = None
823 notify = True
824 else:
825 self.__shr.remove(threading.currentThread())
826 notify = not self.__shr
827
828
829
830 if notify:
831 self.__notify_topmost()
832 finally:
833 self.__lock.release()
834
836 """Notifies topmost condition in queue of pending acquires.
837
838 """
839 (priority, prioqueue) = self.__find_first_pending_queue()
840 if prioqueue:
841 cond = prioqueue[0]
842 cond.notifyAll()
843 if cond.shared:
844
845
846 self.__pending_shared.pop(priority, None)
847
849 """Exported version of L{__notify_topmost}.
850
851 """
852 self.__lock.acquire()
853 try:
854 return self.__notify_topmost()
855 finally:
856 self.__lock.release()
857
858 - def delete(self, timeout=None, priority=None):
859 """Delete a Shared Lock.
860
861 This operation will declare the lock for removal. First the lock will be
862 acquired in exclusive mode if you don't already own it, then the lock
863 will be put in a state where any future and pending acquire() fail.
864
865 @type timeout: float
866 @param timeout: maximum waiting time before giving up
867 @type priority: integer
868 @param priority: Priority for acquiring lock
869
870 """
871 if priority is None:
872 priority = _DEFAULT_PRIORITY
873
874 self.__lock.acquire()
875 try:
876 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
877
878 self.__check_deleted()
879
880
881 acquired = self.__is_exclusive()
882
883 if not acquired:
884 acquired = self.__acquire_unlocked(0, timeout, priority)
885
886 if acquired:
887 assert self.__is_exclusive() and not self.__is_sharer(), \
888 "Lock wasn't acquired in exclusive mode"
889
890 self.__deleted = True
891 self.__exc = None
892
893 assert not (self.__exc or self.__shr), "Found owner during deletion"
894
895
896 for (_, prioqueue) in self.__pending:
897 for cond in prioqueue:
898 cond.notifyAll()
899
900 assert self.__deleted
901
902 return acquired
903 finally:
904 self.__lock.release()
905
910
913
914
915
916
917 ALL_SET = None
921 """Returns the number zero.
922
923 """
924 return 0
925
928 """Determines modes and timeouts for L{LockSet.acquire}.
929
930 @type want_all: boolean
931 @param want_all: Whether all locks in set should be acquired
932 @param timeout: Timeout in seconds or C{None}
933 @param opportunistic: Whther locks should be acquired opportunistically
934 @rtype: tuple
935 @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
936 (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
937 acquiring the lockset-internal lock (might be C{None}) and a function to
938 calculate the timeout for acquiring individual locks
939
940 """
941
942 if opportunistic and not want_all:
943 assert timeout is None, "Got timeout for an opportunistic acquisition"
944 return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
945
946
947
948 running_timeout = utils.RunningTimeout(timeout, False)
949
950 if want_all:
951 mode = _LS_ACQUIRE_ALL
952 ls_timeout_fn = running_timeout.Remaining
953 else:
954 mode = _LS_ACQUIRE_EXACT
955 ls_timeout_fn = None
956
957 if opportunistic:
958 mode = _LS_ACQUIRE_OPPORTUNISTIC
959 timeout_fn = _TimeoutZero
960 else:
961 timeout_fn = running_timeout.Remaining
962
963 return (mode, ls_timeout_fn, timeout_fn)
964
967 """Internal exception to abort an acquire on a timeout.
968
969 """
970
973 """Implements a set of locks.
974
975 This abstraction implements a set of shared locks for the same resource type,
976 distinguished by name. The user can lock a subset of the resources and the
977 LockSet will take care of acquiring the locks always in the same order, thus
978 preventing deadlock.
979
980 All the locks needed in the same set must be acquired together, though.
981
982 @type name: string
983 @ivar name: the name of the lockset
984
985 """
986 - def __init__(self, members, name, monitor=None):
987 """Constructs a new LockSet.
988
989 @type members: list of strings
990 @param members: initial members of the set
991 @type monitor: L{LockMonitor}
992 @param monitor: Lock monitor with which to register member locks
993
994 """
995 assert members is not None, "members parameter is not a list"
996 self.name = name
997
998
999 self.__monitor = monitor
1000
1001
1002 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
1003
1004
1005
1006 self.__lockdict = {}
1007
1008 for mname in members:
1009 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
1010 monitor=monitor)
1011
1012
1013
1014
1015
1016
1017
1018
1019 self.__owners = {}
1020
1022 """Returns the name for a member lock.
1023
1024 """
1025 return "%s/%s" % (self.name, mname)
1026
1028 """Returns the lockset-internal lock.
1029
1030 """
1031 return self.__lock
1032
1034 """Returns the lockset-internal lock dictionary.
1035
1036 Accessing this structure is only safe in single-thread usage or when the
1037 lockset-internal lock is held.
1038
1039 """
1040 return self.__lockdict
1041
1043 """Is the current thread a current level owner?
1044
1045 @note: Use L{check_owned} to check if a specific lock is held
1046
1047 """
1048 return threading.currentThread() in self.__owners
1049
1051 """Check if locks are owned in a specific mode.
1052
1053 @type names: sequence or string
1054 @param names: Lock names (or a single lock name)
1055 @param shared: See L{SharedLock.is_owned}
1056 @rtype: bool
1057 @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1058 L{list_owned} to get the names of all owned locks
1059
1060 """
1061 if isinstance(names, basestring):
1062 names = [names]
1063
1064
1065 if names and self.is_owned():
1066 candidates = []
1067
1068
1069 for lname in names:
1070 try:
1071 lock = self.__lockdict[lname]
1072 except KeyError:
1073 raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1074 " have been removed)" % (lname, self.name))
1075 else:
1076 candidates.append(lock)
1077
1078 return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1079 else:
1080 return False
1081
1083 """Checks whether current thread owns internal lock.
1084
1085 Holding the internal lock is equivalent with holding all locks in the set
1086 (the opposite does not necessarily hold as it can not be easily
1087 determined). L{add} and L{remove} require the internal lock.
1088
1089 @rtype: boolean
1090
1091 """
1092 return self.__lock.is_owned()
1093
1095 """Note the current thread owns the given lock"""
1096 if name is None:
1097 if not self.is_owned():
1098 self.__owners[threading.currentThread()] = set()
1099 else:
1100 if self.is_owned():
1101 self.__owners[threading.currentThread()].add(name)
1102 else:
1103 self.__owners[threading.currentThread()] = set([name])
1104
1106 """Note the current thread owns the given lock"""
1107
1108 assert not (name is None and self.__lock.is_owned()), \
1109 "Cannot hold internal lock when deleting owner status"
1110
1111 if name is not None:
1112 self.__owners[threading.currentThread()].remove(name)
1113
1114
1115 if not (self.__lock.is_owned() or
1116 self.__owners[threading.currentThread()]):
1117 del self.__owners[threading.currentThread()]
1118
1120 """Get the set of resource names owned by the current thread"""
1121 if self.is_owned():
1122 return self.__owners[threading.currentThread()].copy()
1123 else:
1124 return set()
1125
1127 """Release and delete all resources owned by the current thread"""
1128 for lname in self.list_owned():
1129 lock = self.__lockdict[lname]
1130 if lock.is_owned():
1131 lock.release()
1132 self._del_owned(name=lname)
1133
1135 """Return the current set of names.
1136
1137 Only call this function while holding __lock and don't iterate on the
1138 result after releasing the lock.
1139
1140 """
1141 return self.__lockdict.keys()
1142
1144 """Return a copy of the current set of elements.
1145
1146 Used only for debugging purposes.
1147
1148 """
1149
1150
1151 release_lock = False
1152 if not self.__lock.is_owned():
1153 release_lock = True
1154 self.__lock.acquire(shared=1)
1155 try:
1156 result = self.__names()
1157 finally:
1158 if release_lock:
1159 self.__lock.release()
1160 return set(result)
1161
1162 - def acquire(self, names, timeout=None, shared=0, priority=None,
1163 opportunistic=False, test_notify=None):
1164 """Acquire a set of resource locks.
1165
1166 @note: When acquiring locks opportunistically, any number of locks might
1167 actually be acquired, even zero.
1168
1169 @type names: list of strings (or string)
1170 @param names: the names of the locks which shall be acquired
1171 (special lock names, or instance/node names)
1172 @type shared: integer (0/1) used as a boolean
1173 @param shared: whether to acquire in shared mode; by default an
1174 exclusive lock will be acquired
1175 @type timeout: float or None
1176 @param timeout: Maximum time to acquire all locks; for opportunistic
1177 acquisitions, a timeout can only be given when C{names} is C{None}, in
1178 which case it is exclusively used for acquiring the L{LockSet}-internal
1179 lock; opportunistic acquisitions don't use a timeout for acquiring
1180 individual locks
1181 @type priority: integer
1182 @param priority: Priority for acquiring locks
1183 @type opportunistic: boolean
1184 @param opportunistic: Acquire locks opportunistically; use the return value
1185 to determine which locks were actually acquired
1186 @type test_notify: callable or None
1187 @param test_notify: Special callback function for unittesting
1188
1189 @return: Set of all locks successfully acquired or None in case of timeout
1190
1191 @raise errors.LockError: when any lock we try to acquire has
1192 been deleted before we succeed. In this case none of the
1193 locks requested will be acquired.
1194
1195 """
1196 assert timeout is None or timeout >= 0.0
1197
1198
1199 assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1200 " (lockset %s)" % self.name)
1201
1202 if priority is None:
1203 priority = _DEFAULT_PRIORITY
1204
1205 try:
1206 if names is not None:
1207 assert timeout is None or not opportunistic, \
1208 ("Opportunistic acquisitions can only use a timeout if no"
1209 " names are given; see docstring for details")
1210
1211
1212 if isinstance(names, basestring):
1213 names = [names]
1214
1215 (mode, _, timeout_fn) = \
1216 _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
1217
1218 return self.__acquire_inner(names, mode, shared, priority,
1219 timeout_fn, test_notify)
1220
1221 else:
1222 (mode, ls_timeout_fn, timeout_fn) = \
1223 _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234 if not self.__lock.acquire(shared=shared, priority=priority,
1235 timeout=ls_timeout_fn()):
1236 raise _AcquireTimeout()
1237
1238 try:
1239
1240 self._add_owned()
1241
1242 return self.__acquire_inner(self.__names(), mode, shared,
1243 priority, timeout_fn, test_notify)
1244 except:
1245
1246
1247
1248 self.__lock.release()
1249 self._del_owned()
1250 raise
1251
1252 except _AcquireTimeout:
1253 return None
1254
1255 - def __acquire_inner(self, names, mode, shared, priority,
1256 timeout_fn, test_notify):
1257 """Inner logic for acquiring a number of locks.
1258
1259 Acquisition modes:
1260
1261 - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
1262 deleted locks can be ignored as the whole set is being acquired with
1263 its internal lock held
1264 - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
1265 timeouts and deleted locks are fatal
1266 - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
1267 all within the set) which should be acquired opportunistically, that is
1268 failures are ignored
1269
1270 @param names: Names of the locks to be acquired
1271 @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
1272 @param shared: Whether to acquire in shared mode
1273 @param timeout_fn: Function returning remaining timeout (C{None} for
1274 opportunistic acquisitions)
1275 @param priority: Priority for acquiring locks
1276 @param test_notify: Special callback function for unittesting
1277
1278 """
1279 assert mode in _LS_ACQUIRE_MODES
1280
1281 acquire_list = []
1282
1283
1284
1285
1286
1287 for lname in sorted(frozenset(names)):
1288 try:
1289 lock = self.__lockdict[lname]
1290 except KeyError:
1291
1292
1293
1294 if mode == _LS_ACQUIRE_EXACT:
1295 raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1296 " been removed)" % (lname, self.name))
1297 else:
1298 acquire_list.append((lname, lock))
1299
1300
1301 acquired = set()
1302
1303 try:
1304
1305
1306
1307
1308
1309 for (lname, lock) in acquire_list:
1310 if __debug__ and callable(test_notify):
1311 test_notify_fn = lambda: test_notify(lname)
1312 else:
1313 test_notify_fn = None
1314
1315 timeout = timeout_fn()
1316
1317 try:
1318
1319 acq_success = lock.acquire(shared=shared, timeout=timeout,
1320 priority=priority,
1321 test_notify=test_notify_fn)
1322 except errors.LockError:
1323 if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
1324
1325
1326 continue
1327
1328 raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1329 " been removed)" % (lname, self.name))
1330
1331 if not acq_success:
1332
1333 if mode == _LS_ACQUIRE_OPPORTUNISTIC:
1334
1335 continue
1336
1337 if timeout is None:
1338
1339
1340 raise errors.LockError("Failed to get lock %s (set %s)" %
1341 (lname, self.name))
1342
1343 raise _AcquireTimeout()
1344
1345 try:
1346
1347 self._add_owned(name=lname)
1348 acquired.add(lname)
1349
1350 except:
1351
1352
1353
1354 if lock.is_owned():
1355 lock.release()
1356 raise
1357
1358 except:
1359
1360 self._release_and_delete_owned()
1361 raise
1362
1363 return acquired
1364
1366 """Downgrade a set of resource locks from exclusive to shared mode.
1367
1368 The locks must have been acquired in exclusive mode.
1369
1370 """
1371 assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1372 " lock" % self.name)
1373
1374
1375 if isinstance(names, basestring):
1376 names = [names]
1377
1378 owned = self.list_owned()
1379
1380 if names is None:
1381 names = owned
1382 else:
1383 names = set(names)
1384 assert owned.issuperset(names), \
1385 ("downgrade() on unheld resources %s (set %s)" %
1386 (names.difference(owned), self.name))
1387
1388 for lockname in names:
1389 self.__lockdict[lockname].downgrade()
1390
1391
1392 if self.__lock.is_owned(shared=0):
1393
1394 if not compat.any(lock.is_owned(shared=0)
1395 for lock in self.__lockdict.values()):
1396 self.__lock.downgrade()
1397 assert self.__lock.is_owned(shared=1)
1398
1399 return True
1400
1402 """Release a set of resource locks, at the same level.
1403
1404 You must have acquired the locks, either in shared or in exclusive mode,
1405 before releasing them.
1406
1407 @type names: list of strings, or None
1408 @param names: the names of the locks which shall be released
1409 (defaults to all the locks acquired at that level).
1410
1411 """
1412 assert self.is_owned(), ("release() on lock set %s while not owner" %
1413 self.name)
1414
1415
1416 if isinstance(names, basestring):
1417 names = [names]
1418
1419 if names is None:
1420 names = self.list_owned()
1421 else:
1422 names = set(names)
1423 assert self.list_owned().issuperset(names), (
1424 "release() on unheld resources %s (set %s)" %
1425 (names.difference(self.list_owned()), self.name))
1426
1427
1428
1429 if self.__lock.is_owned():
1430 self.__lock.release()
1431 self._del_owned()
1432
1433 for lockname in names:
1434
1435
1436 self.__lockdict[lockname].release()
1437 self._del_owned(name=lockname)
1438
1439 - def add(self, names, acquired=0, shared=0):
1440 """Add a new set of elements to the set
1441
1442 @type names: list of strings
1443 @param names: names of the new elements to add
1444 @type acquired: integer (0/1) used as a boolean
1445 @param acquired: pre-acquire the new resource?
1446 @type shared: integer (0/1) used as a boolean
1447 @param shared: is the pre-acquisition shared?
1448
1449 """
1450
1451 assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1452 ("Cannot add locks if the set %s is only partially owned, or shared" %
1453 self.name)
1454
1455
1456 if isinstance(names, basestring):
1457 names = [names]
1458
1459
1460
1461 release_lock = False
1462 if not self.__lock.is_owned():
1463 release_lock = True
1464 self.__lock.acquire()
1465
1466 try:
1467 invalid_names = set(self.__names()).intersection(names)
1468 if invalid_names:
1469
1470
1471
1472 raise errors.LockError("duplicate add(%s) on lockset %s" %
1473 (invalid_names, self.name))
1474
1475 for lockname in names:
1476 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1477
1478 if acquired:
1479
1480
1481 lock.acquire(shared=shared)
1482
1483 try:
1484 self._add_owned(name=lockname)
1485 except:
1486
1487
1488
1489
1490
1491
1492 lock.release()
1493 raise
1494
1495 self.__lockdict[lockname] = lock
1496
1497 finally:
1498
1499 if release_lock:
1500 self.__lock.release()
1501
1502 return True
1503
1505 """Remove elements from the lock set.
1506
1507 You can either not hold anything in the lockset or already hold a superset
1508 of the elements you want to delete, exclusively.
1509
1510 @type names: list of strings
1511 @param names: names of the resource to remove.
1512
1513 @return: a list of locks which we removed; the list is always
1514 equal to the names list if we were holding all the locks
1515 exclusively
1516
1517 """
1518
1519 if isinstance(names, basestring):
1520 names = [names]
1521
1522
1523
1524
1525 assert not self.is_owned() or self.list_owned().issuperset(names), (
1526 "remove() on acquired lockset %s while not owning all elements" %
1527 self.name)
1528
1529 removed = []
1530
1531 for lname in names:
1532
1533
1534
1535
1536
1537 try:
1538 self.__lockdict[lname].delete()
1539 removed.append(lname)
1540 except (KeyError, errors.LockError):
1541
1542 assert not self.is_owned(), ("remove failed while holding lockset %s" %
1543 self.name)
1544 else:
1545
1546
1547
1548
1549
1550
1551
1552 del self.__lockdict[lname]
1553
1554 if self.is_owned():
1555 self._del_owned(name=lname)
1556
1557 return removed
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576 (LEVEL_CLUSTER,
1577 LEVEL_INSTANCE,
1578 LEVEL_NODE_ALLOC,
1579 LEVEL_NODEGROUP,
1580 LEVEL_NODE,
1581 LEVEL_NODE_RES,
1582 LEVEL_NETWORK) = range(0, 7)
1583
1584 LEVELS = [
1585 LEVEL_CLUSTER,
1586 LEVEL_INSTANCE,
1587 LEVEL_NODE_ALLOC,
1588 LEVEL_NODEGROUP,
1589 LEVEL_NODE,
1590 LEVEL_NODE_RES,
1591 LEVEL_NETWORK,
1592 ]
1593
1594
1595 LEVELS_MOD = compat.UniqueFrozenset([
1596 LEVEL_NODE_RES,
1597 LEVEL_NODE,
1598 LEVEL_NODEGROUP,
1599 LEVEL_INSTANCE,
1600 LEVEL_NETWORK,
1601 ])
1602
1603
1604 LEVEL_NAMES = {
1605 LEVEL_CLUSTER: "cluster",
1606 LEVEL_INSTANCE: "instance",
1607 LEVEL_NODE_ALLOC: "node-alloc",
1608 LEVEL_NODEGROUP: "nodegroup",
1609 LEVEL_NODE: "node",
1610 LEVEL_NODE_RES: "node-res",
1611 LEVEL_NETWORK: "network",
1612 }
1613
1614
1615 BGL = "BGL"
1616
1617
1618 NAL = "NAL"
1622 """The Ganeti Locking Library
1623
1624 The purpose of this small library is to manage locking for ganeti clusters
1625 in a central place, while at the same time doing dynamic checks against
1626 possible deadlocks. It will also make it easier to transition to a different
1627 lock type should we migrate away from python threads.
1628
1629 """
1630 _instance = None
1631
1632 - def __init__(self, node_uuids, nodegroups, instance_names, networks):
1633 """Constructs a new GanetiLockManager object.
1634
1635 There should be only a GanetiLockManager object at any time, so this
1636 function raises an error if this is not the case.
1637
1638 @param node_uuids: list of node UUIDs
1639 @param nodegroups: list of nodegroup uuids
1640 @param instance_names: list of instance names
1641
1642 """
1643 assert self.__class__._instance is None, \
1644 "double GanetiLockManager instance"
1645
1646 self.__class__._instance = self
1647
1648 self._monitor = LockMonitor()
1649
1650
1651
1652 self.__keyring = {
1653 LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1654 LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
1655 LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
1656 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1657 LEVEL_INSTANCE: LockSet(instance_names, "instance",
1658 monitor=self._monitor),
1659 LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
1660 LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
1661 }
1662
1663 assert compat.all(ls.name == LEVEL_NAMES[level]
1664 for (level, ls) in self.__keyring.items()), \
1665 "Keyring name mismatch"
1666
1668 """Registers a new lock with the monitor.
1669
1670 See L{LockMonitor.RegisterLock}.
1671
1672 """
1673 return self._monitor.RegisterLock(provider)
1674
1676 """Queries information from all locks.
1677
1678 See L{LockMonitor.QueryLocks}.
1679
1680 """
1681 return self._monitor.QueryLocks(fields)
1682
1684 """List the lock names at the given level.
1685
1686 This can be used for debugging/testing purposes.
1687
1688 @param level: the level whose list of locks to get
1689
1690 """
1691 assert level in LEVELS, "Invalid locking level %s" % level
1692 return self.__keyring[level]._names()
1693
1695 """Check whether we are owning locks at the given level
1696
1697 """
1698 return self.__keyring[level].is_owned()
1699
1701 """Get the set of owned locks at the given level
1702
1703 """
1704 return self.__keyring[level].list_owned()
1705
1707 """Check if locks at a certain level are owned in a specific mode.
1708
1709 @see: L{LockSet.check_owned}
1710
1711 """
1712 return self.__keyring[level].check_owned(names, shared=shared)
1713
1715 """Checks whether current thread owns all locks at a certain level.
1716
1717 @see: L{LockSet.owning_all}
1718
1719 """
1720 return self.__keyring[level].owning_all()
1721
1723 """Check that we don't own any lock at a level greater than the given one.
1724
1725 """
1726
1727
1728 return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1729
1731 """Check if the current thread owns the BGL.
1732
1733 Both an exclusive or a shared acquisition work.
1734
1735 """
1736 return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1737
1738 @staticmethod
1740 """Check if the level contains the BGL.
1741
1742 Check if acting on the given level and set of names will change
1743 the status of the Big Ganeti Lock.
1744
1745 """
1746 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1747
1748 - def acquire(self, level, names, timeout=None, shared=0, priority=None,
1749 opportunistic=False):
1750 """Acquire a set of resource locks, at the same level.
1751
1752 @type level: member of locking.LEVELS
1753 @param level: the level at which the locks shall be acquired
1754 @type names: list of strings (or string)
1755 @param names: the names of the locks which shall be acquired
1756 (special lock names, or instance/node names)
1757 @type shared: integer (0/1) used as a boolean
1758 @param shared: whether to acquire in shared mode; by default
1759 an exclusive lock will be acquired
1760 @type timeout: float
1761 @param timeout: Maximum time to acquire all locks
1762 @type priority: integer
1763 @param priority: Priority for acquiring lock
1764 @type opportunistic: boolean
1765 @param opportunistic: Acquire locks opportunistically; use the return value
1766 to determine which locks were actually acquired
1767
1768 """
1769 assert level in LEVELS, "Invalid locking level %s" % level
1770
1771
1772
1773
1774
1775
1776
1777 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1778 "You must own the Big Ganeti Lock before acquiring any other")
1779
1780
1781 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1782 " while owning some at a greater one")
1783
1784
1785 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1786 priority=priority,
1787 opportunistic=opportunistic)
1788
1790 """Downgrade a set of resource locks from exclusive to shared mode.
1791
1792 You must have acquired the locks in exclusive mode.
1793
1794 @type level: member of locking.LEVELS
1795 @param level: the level at which the locks shall be downgraded
1796 @type names: list of strings, or None
1797 @param names: the names of the locks which shall be downgraded
1798 (defaults to all the locks acquired at the level)
1799
1800 """
1801 assert level in LEVELS, "Invalid locking level %s" % level
1802
1803 return self.__keyring[level].downgrade(names=names)
1804
1805 - def release(self, level, names=None):
1806 """Release a set of resource locks, at the same level.
1807
1808 You must have acquired the locks, either in shared or in exclusive
1809 mode, before releasing them.
1810
1811 @type level: member of locking.LEVELS
1812 @param level: the level at which the locks shall be released
1813 @type names: list of strings, or None
1814 @param names: the names of the locks which shall be released
1815 (defaults to all the locks acquired at that level)
1816
1817 """
1818 assert level in LEVELS, "Invalid locking level %s" % level
1819 assert (not self._contains_BGL(level, names) or
1820 not self._upper_owned(LEVEL_CLUSTER)), (
1821 "Cannot release the Big Ganeti Lock while holding something"
1822 " at upper levels (%r)" %
1823 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1824 for i in self.__keyring.keys()]), ))
1825
1826
1827 return self.__keyring[level].release(names)
1828
1829 - def add(self, level, names, acquired=0, shared=0):
1830 """Add locks at the specified level.
1831
1832 @type level: member of locking.LEVELS_MOD
1833 @param level: the level at which the locks shall be added
1834 @type names: list of strings
1835 @param names: names of the locks to acquire
1836 @type acquired: integer (0/1) used as a boolean
1837 @param acquired: whether to acquire the newly added locks
1838 @type shared: integer (0/1) used as a boolean
1839 @param shared: whether the acquisition will be shared
1840
1841 """
1842 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1843 assert self._BGL_owned(), ("You must own the BGL before performing other"
1844 " operations")
1845 assert not self._upper_owned(level), ("Cannot add locks at a level"
1846 " while owning some at a greater one")
1847 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1848
1849 - def remove(self, level, names):
1850 """Remove locks from the specified level.
1851
1852 You must either already own the locks you are trying to remove
1853 exclusively or not own any lock at an upper level.
1854
1855 @type level: member of locking.LEVELS_MOD
1856 @param level: the level at which the locks shall be removed
1857 @type names: list of strings
1858 @param names: the names of the locks which shall be removed
1859 (special lock names, or instance/node names)
1860
1861 """
1862 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1863 assert self._BGL_owned(), ("You must own the BGL before performing other"
1864 " operations")
1865
1866
1867
1868 assert self.is_owned(level) or not self._upper_owned(level), (
1869 "Cannot remove locks at a level while not owning it or"
1870 " owning some at a greater one")
1871 return self.__keyring[level].remove(names)
1872
1875 """Sorting key function.
1876
1877 Sort by name, registration order and then order of information. This provides
1878 a stable sort order over different providers, even if they return the same
1879 name.
1880
1881 """
1882 (name, _, _, _) = item
1883
1884 return (utils.NiceSortKey(name), num, idx)
1885
1888 _LOCK_ATTR = "_lock"
1889
1891 """Initializes this class.
1892
1893 """
1894 self._lock = SharedLock("LockMonitor")
1895
1896
1897 self._counter = itertools.count(0)
1898
1899
1900
1901 self._locks = weakref.WeakKeyDictionary()
1902
1903 @ssynchronized(_LOCK_ATTR)
1905 """Registers a new lock.
1906
1907 @param provider: Object with a callable method named C{GetLockInfo}, taking
1908 a single C{set} containing the requested information items
1909 @note: It would be nicer to only receive the function generating the
1910 requested information but, as it turns out, weak references to bound
1911 methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1912 workarounds, but none of the ones I found works properly in combination
1913 with a standard C{WeakKeyDictionary}
1914
1915 """
1916 assert provider not in self._locks, "Duplicate registration"
1917
1918
1919
1920
1921
1922
1923
1924 self._locks[provider] = self._counter.next()
1925
1927 """Get information from all locks.
1928
1929 """
1930
1931 self._lock.acquire(shared=1)
1932 try:
1933 items = self._locks.items()
1934 finally:
1935 self._lock.release()
1936
1937 return [(info, idx, num)
1938 for (provider, num) in items
1939 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1940
1957
1959 """Queries information from all locks.
1960
1961 @type fields: list of strings
1962 @param fields: List of fields to return
1963
1964 """
1965 (qobj, ctx) = self._Query(fields)
1966
1967
1968 return query.GetQueryResponse(qobj, ctx)
1969