1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import itertools
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40 from ganeti import query
41
42
43 _EXCLUSIVE_TEXT = "exclusive"
44 _SHARED_TEXT = "shared"
45 _DELETED_TEXT = "deleted"
46
47 _DEFAULT_PRIORITY = 0
51 """Shared Synchronization decorator.
52
53 Calls the function holding the given lock, either in exclusive or shared
54 mode. It requires the passed lock to be a SharedLock (or support its
55 semantics).
56
57 @type mylock: lockable object or string
58 @param mylock: lock to acquire or class member name of the lock to acquire
59
60 """
61 def wrap(fn):
62 def sync_function(*args, **kwargs):
63 if isinstance(mylock, basestring):
64 assert args, "cannot ssynchronize on non-class method: self not found"
65
66 lock = getattr(args[0], mylock)
67 else:
68 lock = mylock
69 lock.acquire(shared=shared)
70 try:
71 return fn(*args, **kwargs)
72 finally:
73 lock.release()
74 return sync_function
75 return wrap
76
79 """Helper class for SingleNotifyPipeCondition
80
81 """
82 __slots__ = [
83 "_fd",
84 "_poller",
85 ]
86
88 """Constructor for _SingleNotifyPipeConditionWaiter
89
90 @type poller: select.poll
91 @param poller: Poller object
92 @type fd: int
93 @param fd: File descriptor to wait for
94
95 """
96 object.__init__(self)
97 self._poller = poller
98 self._fd = fd
99
101 """Wait for something to happen on the pipe.
102
103 @type timeout: float or None
104 @param timeout: Timeout for waiting (can be None)
105
106 """
107 running_timeout = utils.RunningTimeout(timeout, True)
108
109 while True:
110 remaining_time = running_timeout.Remaining()
111
112 if remaining_time is not None:
113 if remaining_time < 0.0:
114 break
115
116
117 remaining_time *= 1000
118
119 try:
120 result = self._poller.poll(remaining_time)
121 except EnvironmentError, err:
122 if err.errno != errno.EINTR:
123 raise
124 result = None
125
126
127 if result and result[0][0] == self._fd:
128 break
129
132 """Base class containing common code for conditions.
133
134 Some of this code is taken from python's threading module.
135
136 """
137 __slots__ = [
138 "_lock",
139 "acquire",
140 "release",
141 "_is_owned",
142 "_acquire_restore",
143 "_release_save",
144 ]
145
173
175 """Check whether lock is owned by current thread.
176
177 """
178 if self._lock.acquire(0):
179 self._lock.release()
180 return False
181 return True
182
185
188
190 """Raise an exception if the current thread doesn't own the lock.
191
192 """
193 if not self._is_owned():
194 raise RuntimeError("cannot work with un-aquired lock")
195
198 """Condition which can only be notified once.
199
200 This condition class uses pipes and poll, internally, to be able to wait for
201 notification with a timeout, without resorting to polling. It is almost
202 compatible with Python's threading.Condition, with the following differences:
203 - notifyAll can only be called once, and no wait can happen after that
204 - notify is not supported, only notifyAll
205
206 """
207
208 __slots__ = [
209 "_poller",
210 "_read_fd",
211 "_write_fd",
212 "_nwaiters",
213 "_notified",
214 ]
215
216 _waiter_class = _SingleNotifyPipeConditionWaiter
217
219 """Constructor for SingleNotifyPipeCondition
220
221 """
222 _BaseCondition.__init__(self, lock)
223 self._nwaiters = 0
224 self._notified = False
225 self._read_fd = None
226 self._write_fd = None
227 self._poller = None
228
230 """Throws an exception if already notified.
231
232 """
233 if self._notified:
234 raise RuntimeError("cannot use already notified condition")
235
237 """Cleanup open file descriptors, if any.
238
239 """
240 if self._read_fd is not None:
241 os.close(self._read_fd)
242 self._read_fd = None
243
244 if self._write_fd is not None:
245 os.close(self._write_fd)
246 self._write_fd = None
247 self._poller = None
248
249 - def wait(self, timeout):
250 """Wait for a notification.
251
252 @type timeout: float or None
253 @param timeout: Waiting timeout (can be None)
254
255 """
256 self._check_owned()
257 self._check_unnotified()
258
259 self._nwaiters += 1
260 try:
261 if self._poller is None:
262 (self._read_fd, self._write_fd) = os.pipe()
263 self._poller = select.poll()
264 self._poller.register(self._read_fd, select.POLLHUP)
265
266 wait_fn = self._waiter_class(self._poller, self._read_fd)
267 state = self._release_save()
268 try:
269
270 wait_fn(timeout)
271 finally:
272
273 self._acquire_restore(state)
274 finally:
275 self._nwaiters -= 1
276 if self._nwaiters == 0:
277 self._Cleanup()
278
280 """Close the writing side of the pipe to notify all waiters.
281
282 """
283 self._check_owned()
284 self._check_unnotified()
285 self._notified = True
286 if self._write_fd is not None:
287 os.close(self._write_fd)
288 self._write_fd = None
289
292 """Group-only non-polling condition with counters.
293
294 This condition class uses pipes and poll, internally, to be able to wait for
295 notification with a timeout, without resorting to polling. It is almost
296 compatible with Python's threading.Condition, but only supports notifyAll and
297 non-recursive locks. As an additional features it's able to report whether
298 there are any waiting threads.
299
300 """
301 __slots__ = [
302 "_waiters",
303 "_single_condition",
304 ]
305
306 _single_condition_class = SingleNotifyPipeCondition
307
309 """Initializes this class.
310
311 """
312 _BaseCondition.__init__(self, lock)
313 self._waiters = set()
314 self._single_condition = self._single_condition_class(self._lock)
315
316 - def wait(self, timeout):
317 """Wait for a notification.
318
319 @type timeout: float or None
320 @param timeout: Waiting timeout (can be None)
321
322 """
323 self._check_owned()
324
325
326
327 cond = self._single_condition
328
329 self._waiters.add(threading.currentThread())
330 try:
331 cond.wait(timeout)
332 finally:
333 self._check_owned()
334 self._waiters.remove(threading.currentThread())
335
337 """Notify all currently waiting threads.
338
339 """
340 self._check_owned()
341 self._single_condition.notifyAll()
342 self._single_condition = self._single_condition_class(self._lock)
343
345 """Returns a list of all waiting threads.
346
347 """
348 self._check_owned()
349
350 return self._waiters
351
353 """Returns whether there are active waiters.
354
355 """
356 self._check_owned()
357
358 return bool(self._waiters)
359
362 __slots__ = [
363 "shared",
364 ]
365
367 """Initializes this class.
368
369 """
370 self.shared = shared
371 PipeCondition.__init__(self, lock)
372
375 """Implements a shared lock.
376
377 Multiple threads can acquire the lock in a shared way by calling
378 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379 threads can call C{acquire(shared=0)}.
380
381 Notes on data structures: C{__pending} contains a priority queue (heapq) of
382 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383 ...]}. Each per-priority queue contains a normal in-order list of conditions
384 to be notified when the lock can be acquired. Shared locks are grouped
385 together by priority and the condition for them is stored in
386 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387 references for the per-priority queues indexed by priority for faster access.
388
389 @type name: string
390 @ivar name: the name of the lock
391
392 """
393 __slots__ = [
394 "__weakref__",
395 "__deleted",
396 "__exc",
397 "__lock",
398 "__pending",
399 "__pending_by_prio",
400 "__pending_shared",
401 "__shr",
402 "name",
403 ]
404
405 __condition_class = _PipeConditionWithMode
406
407 - def __init__(self, name, monitor=None):
408 """Construct a new SharedLock.
409
410 @param name: the name of the lock
411 @type monitor: L{LockMonitor}
412 @param monitor: Lock monitor with which to register
413
414 """
415 object.__init__(self)
416
417 self.name = name
418
419
420 self.__lock = threading.Lock()
421
422
423 self.__pending = []
424 self.__pending_by_prio = {}
425 self.__pending_shared = {}
426
427
428 self.__shr = set()
429 self.__exc = None
430
431
432 self.__deleted = False
433
434
435 if monitor:
436 logging.debug("Adding lock %s to monitor", name)
437 monitor.RegisterLock(self)
438
440 """Retrieves information for querying locks.
441
442 @type requested: set
443 @param requested: Requested information, see C{query.LQ_*}
444
445 """
446 self.__lock.acquire()
447 try:
448
449
450
451 mode = None
452 owner_names = None
453
454 if query.LQ_MODE in requested:
455 if self.__deleted:
456 mode = _DELETED_TEXT
457 assert not (self.__exc or self.__shr)
458 elif self.__exc:
459 mode = _EXCLUSIVE_TEXT
460 elif self.__shr:
461 mode = _SHARED_TEXT
462
463
464 if query.LQ_OWNER in requested:
465 if self.__exc:
466 owner = [self.__exc]
467 else:
468 owner = self.__shr
469
470 if owner:
471 assert not self.__deleted
472 owner_names = [i.getName() for i in owner]
473
474
475 if query.LQ_PENDING in requested:
476 pending = []
477
478
479 for (_, prioqueue) in sorted(self.__pending):
480 for cond in prioqueue:
481 if cond.shared:
482 pendmode = _SHARED_TEXT
483 else:
484 pendmode = _EXCLUSIVE_TEXT
485
486
487 pending.append((pendmode, [i.getName()
488 for i in cond.get_waiting()]))
489 else:
490 pending = None
491
492 return [(self.name, mode, owner_names, pending)]
493 finally:
494 self.__lock.release()
495
497 """Raises an exception if the lock has been deleted.
498
499 """
500 if self.__deleted:
501 raise errors.LockError("Deleted lock %s" % self.name)
502
504 """Is the current thread sharing the lock at this time?
505
506 """
507 return threading.currentThread() in self.__shr
508
510 """Is the current thread holding the lock exclusively at this time?
511
512 """
513 return threading.currentThread() == self.__exc
514
516 """Is the current thread somehow owning the lock at this time?
517
518 This is a private version of the function, which presumes you're holding
519 the internal lock.
520
521 """
522 if shared < 0:
523 return self.__is_sharer() or self.__is_exclusive()
524 elif shared:
525 return self.__is_sharer()
526 else:
527 return self.__is_exclusive()
528
530 """Is the current thread somehow owning the lock at this time?
531
532 @param shared:
533 - < 0: check for any type of ownership (default)
534 - 0: check for exclusive ownership
535 - > 0: check for shared ownership
536
537 """
538 self.__lock.acquire()
539 try:
540 return self.__is_owned(shared=shared)
541 finally:
542 self.__lock.release()
543
544 is_owned = _is_owned
545
547 """Returns the number of pending acquires.
548
549 @rtype: int
550
551 """
552 self.__lock.acquire()
553 try:
554 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
555 finally:
556 self.__lock.release()
557
559 """Checks whether there are any pending acquires.
560
561 @rtype: bool
562
563 """
564 self.__lock.acquire()
565 try:
566
567 (_, prioqueue) = self.__find_first_pending_queue()
568
569 return not (prioqueue or
570 self.__pending or
571 self.__pending_by_prio or
572 self.__pending_shared)
573 finally:
574 self.__lock.release()
575
577 """Actually acquire the lock.
578
579 """
580 if shared:
581 self.__shr.add(threading.currentThread())
582 else:
583 self.__exc = threading.currentThread()
584
586 """Determine whether lock can be acquired.
587
588 """
589 if shared:
590 return self.__exc is None
591 else:
592 return len(self.__shr) == 0 and self.__exc is None
593
595 """Tries to find the topmost queued entry with pending acquires.
596
597 Removes empty entries while going through the list.
598
599 """
600 while self.__pending:
601 (priority, prioqueue) = self.__pending[0]
602
603 if prioqueue:
604 return (priority, prioqueue)
605
606
607 heapq.heappop(self.__pending)
608 del self.__pending_by_prio[priority]
609 assert priority not in self.__pending_shared
610
611 return (None, None)
612
614 """Checks whether the passed condition is on top of the queue.
615
616 The caller must make sure the queue isn't empty.
617
618 """
619 (_, prioqueue) = self.__find_first_pending_queue()
620
621 return cond == prioqueue[0]
622
624 """Acquire a shared lock.
625
626 @param shared: whether to acquire in shared mode; by default an
627 exclusive lock will be acquired
628 @param timeout: maximum waiting time before giving up
629 @type priority: integer
630 @param priority: Priority for acquiring lock
631
632 """
633 self.__check_deleted()
634
635
636 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
637 " %s" % self.name)
638
639
640 self.__find_first_pending_queue()
641
642
643 if not self.__pending and self.__can_acquire(shared):
644
645 self.__do_acquire(shared)
646 return True
647
648 prioqueue = self.__pending_by_prio.get(priority, None)
649
650 if shared:
651
652 wait_condition = self.__pending_shared.get(priority, None)
653 assert (wait_condition is None or
654 (wait_condition.shared and wait_condition in prioqueue))
655 else:
656 wait_condition = None
657
658 if wait_condition is None:
659 if prioqueue is None:
660 assert priority not in self.__pending_by_prio
661
662 prioqueue = []
663 heapq.heappush(self.__pending, (priority, prioqueue))
664 self.__pending_by_prio[priority] = prioqueue
665
666 wait_condition = self.__condition_class(self.__lock, shared)
667 prioqueue.append(wait_condition)
668
669 if shared:
670
671
672 assert priority not in self.__pending_shared
673 self.__pending_shared[priority] = wait_condition
674
675 try:
676
677
678
679 while not (self.__is_on_top(wait_condition) and
680 self.__can_acquire(shared)):
681
682 wait_condition.wait(timeout)
683 self.__check_deleted()
684
685
686
687 if timeout is not None:
688 break
689
690 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
691 self.__do_acquire(shared)
692 return True
693 finally:
694
695 if not wait_condition.has_waiting():
696 prioqueue.remove(wait_condition)
697 if wait_condition.shared:
698
699
700 self.__pending_shared.pop(priority, None)
701
702 return False
703
704 - def acquire(self, shared=0, timeout=None, priority=None,
705 test_notify=None):
706 """Acquire a shared lock.
707
708 @type shared: integer (0/1) used as a boolean
709 @param shared: whether to acquire in shared mode; by default an
710 exclusive lock will be acquired
711 @type timeout: float
712 @param timeout: maximum waiting time before giving up
713 @type priority: integer
714 @param priority: Priority for acquiring lock
715 @type test_notify: callable or None
716 @param test_notify: Special callback function for unittesting
717
718 """
719 if priority is None:
720 priority = _DEFAULT_PRIORITY
721
722 self.__lock.acquire()
723 try:
724
725 if __debug__ and callable(test_notify):
726 test_notify()
727
728 return self.__acquire_unlocked(shared, timeout, priority)
729 finally:
730 self.__lock.release()
731
733 """Changes the lock mode from exclusive to shared.
734
735 Pending acquires in shared mode on the same priority will go ahead.
736
737 """
738 self.__lock.acquire()
739 try:
740 assert self.__is_owned(), "Lock must be owned"
741
742 if self.__is_exclusive():
743
744 self.__exc = None
745 self.__do_acquire(1)
746
747
748
749
750
751 (priority, prioqueue) = self.__find_first_pending_queue()
752 if prioqueue:
753
754 cond = self.__pending_shared.pop(priority, None)
755 if cond:
756 assert cond.shared
757 assert cond in prioqueue
758
759
760 if len(prioqueue) > 1:
761 prioqueue.remove(cond)
762 prioqueue.insert(0, cond)
763
764
765 cond.notifyAll()
766
767 assert not self.__is_exclusive()
768 assert self.__is_sharer()
769
770 return True
771 finally:
772 self.__lock.release()
773
775 """Release a Shared Lock.
776
777 You must have acquired the lock, either in shared or in exclusive mode,
778 before calling this function.
779
780 """
781 self.__lock.acquire()
782 try:
783 assert self.__is_exclusive() or self.__is_sharer(), \
784 "Cannot release non-owned lock"
785
786
787 if self.__is_exclusive():
788 self.__exc = None
789 else:
790 self.__shr.remove(threading.currentThread())
791
792
793 (priority, prioqueue) = self.__find_first_pending_queue()
794 if prioqueue:
795 cond = prioqueue[0]
796 cond.notifyAll()
797 if cond.shared:
798
799
800 self.__pending_shared.pop(priority, None)
801
802 finally:
803 self.__lock.release()
804
805 - def delete(self, timeout=None, priority=None):
806 """Delete a Shared Lock.
807
808 This operation will declare the lock for removal. First the lock will be
809 acquired in exclusive mode if you don't already own it, then the lock
810 will be put in a state where any future and pending acquire() fail.
811
812 @type timeout: float
813 @param timeout: maximum waiting time before giving up
814 @type priority: integer
815 @param priority: Priority for acquiring lock
816
817 """
818 if priority is None:
819 priority = _DEFAULT_PRIORITY
820
821 self.__lock.acquire()
822 try:
823 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
824
825 self.__check_deleted()
826
827
828 acquired = self.__is_exclusive()
829
830 if not acquired:
831 acquired = self.__acquire_unlocked(0, timeout, priority)
832
833 assert self.__is_exclusive() and not self.__is_sharer(), \
834 "Lock wasn't acquired in exclusive mode"
835
836 if acquired:
837 self.__deleted = True
838 self.__exc = None
839
840 assert not (self.__exc or self.__shr), "Found owner during deletion"
841
842
843 for (_, prioqueue) in self.__pending:
844 for cond in prioqueue:
845 cond.notifyAll()
846
847 assert self.__deleted
848
849 return acquired
850 finally:
851 self.__lock.release()
852
857
860
861
862
863
864 ALL_SET = None
868 """Internal exception to abort an acquire on a timeout.
869
870 """
871
874 """Implements a set of locks.
875
876 This abstraction implements a set of shared locks for the same resource type,
877 distinguished by name. The user can lock a subset of the resources and the
878 LockSet will take care of acquiring the locks always in the same order, thus
879 preventing deadlock.
880
881 All the locks needed in the same set must be acquired together, though.
882
883 @type name: string
884 @ivar name: the name of the lockset
885
886 """
887 - def __init__(self, members, name, monitor=None):
888 """Constructs a new LockSet.
889
890 @type members: list of strings
891 @param members: initial members of the set
892 @type monitor: L{LockMonitor}
893 @param monitor: Lock monitor with which to register member locks
894
895 """
896 assert members is not None, "members parameter is not a list"
897 self.name = name
898
899
900 self.__monitor = monitor
901
902
903 self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
904
905
906
907 self.__lockdict = {}
908
909 for mname in members:
910 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
911 monitor=monitor)
912
913
914
915
916
917
918
919
920 self.__owners = {}
921
923 """Returns the name for a member lock.
924
925 """
926 return "%s/%s" % (self.name, mname)
927
929 """Returns the lockset-internal lock.
930
931 """
932 return self.__lock
933
935 """Returns the lockset-internal lock dictionary.
936
937 Accessing this structure is only safe in single-thread usage or when the
938 lockset-internal lock is held.
939
940 """
941 return self.__lockdict
942
944 """Is the current thread a current level owner?"""
945 return threading.currentThread() in self.__owners
946
948 """Note the current thread owns the given lock"""
949 if name is None:
950 if not self._is_owned():
951 self.__owners[threading.currentThread()] = set()
952 else:
953 if self._is_owned():
954 self.__owners[threading.currentThread()].add(name)
955 else:
956 self.__owners[threading.currentThread()] = set([name])
957
959 """Note the current thread owns the given lock"""
960
961 assert not (name is None and self.__lock._is_owned()), \
962 "Cannot hold internal lock when deleting owner status"
963
964 if name is not None:
965 self.__owners[threading.currentThread()].remove(name)
966
967
968 if (not self.__lock._is_owned() and
969 not self.__owners[threading.currentThread()]):
970 del self.__owners[threading.currentThread()]
971
973 """Get the set of resource names owned by the current thread"""
974 if self._is_owned():
975 return self.__owners[threading.currentThread()].copy()
976 else:
977 return set()
978
980 """Release and delete all resources owned by the current thread"""
981 for lname in self._list_owned():
982 lock = self.__lockdict[lname]
983 if lock._is_owned():
984 lock.release()
985 self._del_owned(name=lname)
986
988 """Return the current set of names.
989
990 Only call this function while holding __lock and don't iterate on the
991 result after releasing the lock.
992
993 """
994 return self.__lockdict.keys()
995
997 """Return a copy of the current set of elements.
998
999 Used only for debugging purposes.
1000
1001 """
1002
1003
1004 release_lock = False
1005 if not self.__lock._is_owned():
1006 release_lock = True
1007 self.__lock.acquire(shared=1)
1008 try:
1009 result = self.__names()
1010 finally:
1011 if release_lock:
1012 self.__lock.release()
1013 return set(result)
1014
1015 - def acquire(self, names, timeout=None, shared=0, priority=None,
1016 test_notify=None):
1017 """Acquire a set of resource locks.
1018
1019 @type names: list of strings (or string)
1020 @param names: the names of the locks which shall be acquired
1021 (special lock names, or instance/node names)
1022 @type shared: integer (0/1) used as a boolean
1023 @param shared: whether to acquire in shared mode; by default an
1024 exclusive lock will be acquired
1025 @type timeout: float or None
1026 @param timeout: Maximum time to acquire all locks
1027 @type priority: integer
1028 @param priority: Priority for acquiring locks
1029 @type test_notify: callable or None
1030 @param test_notify: Special callback function for unittesting
1031
1032 @return: Set of all locks successfully acquired or None in case of timeout
1033
1034 @raise errors.LockError: when any lock we try to acquire has
1035 been deleted before we succeed. In this case none of the
1036 locks requested will be acquired.
1037
1038 """
1039 assert timeout is None or timeout >= 0.0
1040
1041
1042 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1043 " (lockset %s)" % self.name)
1044
1045 if priority is None:
1046 priority = _DEFAULT_PRIORITY
1047
1048
1049
1050 running_timeout = utils.RunningTimeout(timeout, False)
1051
1052 try:
1053 if names is not None:
1054
1055 if isinstance(names, basestring):
1056 names = [names]
1057
1058 return self.__acquire_inner(names, False, shared, priority,
1059 running_timeout.Remaining, test_notify)
1060
1061 else:
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071 if not self.__lock.acquire(shared=shared, priority=priority,
1072 timeout=running_timeout.Remaining()):
1073 raise _AcquireTimeout()
1074 try:
1075
1076 self._add_owned()
1077
1078 return self.__acquire_inner(self.__names(), True, shared, priority,
1079 running_timeout.Remaining, test_notify)
1080 except:
1081
1082
1083
1084 self.__lock.release()
1085 self._del_owned()
1086 raise
1087
1088 except _AcquireTimeout:
1089 return None
1090
1091 - def __acquire_inner(self, names, want_all, shared, priority,
1092 timeout_fn, test_notify):
1093 """Inner logic for acquiring a number of locks.
1094
1095 @param names: Names of the locks to be acquired
1096 @param want_all: Whether all locks in the set should be acquired
1097 @param shared: Whether to acquire in shared mode
1098 @param timeout_fn: Function returning remaining timeout
1099 @param priority: Priority for acquiring locks
1100 @param test_notify: Special callback function for unittesting
1101
1102 """
1103 acquire_list = []
1104
1105
1106
1107
1108
1109 for lname in sorted(utils.UniqueSequence(names)):
1110 try:
1111 lock = self.__lockdict[lname]
1112 except KeyError:
1113 if want_all:
1114
1115
1116 continue
1117
1118 raise errors.LockError("Non-existing lock %s in set %s (it may have"
1119 " been removed)" % (lname, self.name))
1120
1121 acquire_list.append((lname, lock))
1122
1123
1124 acquired = set()
1125
1126 try:
1127
1128
1129
1130
1131
1132 for (lname, lock) in acquire_list:
1133 if __debug__ and callable(test_notify):
1134 test_notify_fn = lambda: test_notify(lname)
1135 else:
1136 test_notify_fn = None
1137
1138 timeout = timeout_fn()
1139
1140 try:
1141
1142 acq_success = lock.acquire(shared=shared, timeout=timeout,
1143 priority=priority,
1144 test_notify=test_notify_fn)
1145 except errors.LockError:
1146 if want_all:
1147
1148
1149 continue
1150
1151 raise errors.LockError("Non-existing lock %s in set %s (it may"
1152 " have been removed)" % (lname, self.name))
1153
1154 if not acq_success:
1155
1156 if timeout is None:
1157
1158
1159 raise errors.LockError("Failed to get lock %s (set %s)" %
1160 (lname, self.name))
1161
1162 raise _AcquireTimeout()
1163
1164 try:
1165
1166 self._add_owned(name=lname)
1167 acquired.add(lname)
1168
1169 except:
1170
1171
1172
1173 if lock._is_owned():
1174 lock.release()
1175 raise
1176
1177 except:
1178
1179 self._release_and_delete_owned()
1180 raise
1181
1182 return acquired
1183
1185 """Downgrade a set of resource locks from exclusive to shared mode.
1186
1187 The locks must have been acquired in exclusive mode.
1188
1189 """
1190 assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1191 " lock" % self.name)
1192
1193
1194 if isinstance(names, basestring):
1195 names = [names]
1196
1197 owned = self._list_owned()
1198
1199 if names is None:
1200 names = owned
1201 else:
1202 names = set(names)
1203 assert owned.issuperset(names), \
1204 ("downgrade() on unheld resources %s (set %s)" %
1205 (names.difference(owned), self.name))
1206
1207 for lockname in names:
1208 self.__lockdict[lockname].downgrade()
1209
1210
1211 if self.__lock._is_owned(shared=0):
1212
1213 if not compat.any(lock._is_owned(shared=0)
1214 for lock in self.__lockdict.values()):
1215 self.__lock.downgrade()
1216 assert self.__lock._is_owned(shared=1)
1217
1218 return True
1219
1221 """Release a set of resource locks, at the same level.
1222
1223 You must have acquired the locks, either in shared or in exclusive mode,
1224 before releasing them.
1225
1226 @type names: list of strings, or None
1227 @param names: the names of the locks which shall be released
1228 (defaults to all the locks acquired at that level).
1229
1230 """
1231 assert self._is_owned(), ("release() on lock set %s while not owner" %
1232 self.name)
1233
1234
1235 if isinstance(names, basestring):
1236 names = [names]
1237
1238 if names is None:
1239 names = self._list_owned()
1240 else:
1241 names = set(names)
1242 assert self._list_owned().issuperset(names), (
1243 "release() on unheld resources %s (set %s)" %
1244 (names.difference(self._list_owned()), self.name))
1245
1246
1247
1248 if self.__lock._is_owned():
1249 self.__lock.release()
1250 self._del_owned()
1251
1252 for lockname in names:
1253
1254
1255 self.__lockdict[lockname].release()
1256 self._del_owned(name=lockname)
1257
1258 - def add(self, names, acquired=0, shared=0):
1259 """Add a new set of elements to the set
1260
1261 @type names: list of strings
1262 @param names: names of the new elements to add
1263 @type acquired: integer (0/1) used as a boolean
1264 @param acquired: pre-acquire the new resource?
1265 @type shared: integer (0/1) used as a boolean
1266 @param shared: is the pre-acquisition shared?
1267
1268 """
1269
1270 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1271 ("Cannot add locks if the set %s is only partially owned, or shared" %
1272 self.name)
1273
1274
1275 if isinstance(names, basestring):
1276 names = [names]
1277
1278
1279
1280 release_lock = False
1281 if not self.__lock._is_owned():
1282 release_lock = True
1283 self.__lock.acquire()
1284
1285 try:
1286 invalid_names = set(self.__names()).intersection(names)
1287 if invalid_names:
1288
1289
1290
1291 raise errors.LockError("duplicate add(%s) on lockset %s" %
1292 (invalid_names, self.name))
1293
1294 for lockname in names:
1295 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1296
1297 if acquired:
1298
1299
1300 lock.acquire(shared=shared)
1301
1302 try:
1303 self._add_owned(name=lockname)
1304 except:
1305
1306
1307
1308
1309
1310
1311 lock.release()
1312 raise
1313
1314 self.__lockdict[lockname] = lock
1315
1316 finally:
1317
1318 if release_lock:
1319 self.__lock.release()
1320
1321 return True
1322
1324 """Remove elements from the lock set.
1325
1326 You can either not hold anything in the lockset or already hold a superset
1327 of the elements you want to delete, exclusively.
1328
1329 @type names: list of strings
1330 @param names: names of the resource to remove.
1331
1332 @return: a list of locks which we removed; the list is always
1333 equal to the names list if we were holding all the locks
1334 exclusively
1335
1336 """
1337
1338 if isinstance(names, basestring):
1339 names = [names]
1340
1341
1342
1343
1344 assert not self._is_owned() or self._list_owned().issuperset(names), (
1345 "remove() on acquired lockset %s while not owning all elements" %
1346 self.name)
1347
1348 removed = []
1349
1350 for lname in names:
1351
1352
1353
1354
1355
1356 try:
1357 self.__lockdict[lname].delete()
1358 removed.append(lname)
1359 except (KeyError, errors.LockError):
1360
1361 assert not self._is_owned(), ("remove failed while holding lockset %s"
1362 % self.name)
1363 else:
1364
1365
1366
1367
1368
1369
1370
1371 del self.__lockdict[lname]
1372
1373 if self._is_owned():
1374 self._del_owned(name=lname)
1375
1376 return removed
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388 LEVEL_CLUSTER = 0
1389 LEVEL_INSTANCE = 1
1390 LEVEL_NODEGROUP = 2
1391 LEVEL_NODE = 3
1392
1393 LEVELS = [LEVEL_CLUSTER,
1394 LEVEL_INSTANCE,
1395 LEVEL_NODEGROUP,
1396 LEVEL_NODE]
1397
1398
1399 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1400
1401 LEVEL_NAMES = {
1402 LEVEL_CLUSTER: "cluster",
1403 LEVEL_INSTANCE: "instance",
1404 LEVEL_NODEGROUP: "nodegroup",
1405 LEVEL_NODE: "node",
1406 }
1407
1408
1409 BGL = 'BGL'
1413 """The Ganeti Locking Library
1414
1415 The purpose of this small library is to manage locking for ganeti clusters
1416 in a central place, while at the same time doing dynamic checks against
1417 possible deadlocks. It will also make it easier to transition to a different
1418 lock type should we migrate away from python threads.
1419
1420 """
1421 _instance = None
1422
1423 - def __init__(self, nodes, nodegroups, instances):
1424 """Constructs a new GanetiLockManager object.
1425
1426 There should be only a GanetiLockManager object at any time, so this
1427 function raises an error if this is not the case.
1428
1429 @param nodes: list of node names
1430 @param nodegroups: list of nodegroup uuids
1431 @param instances: list of instance names
1432
1433 """
1434 assert self.__class__._instance is None, \
1435 "double GanetiLockManager instance"
1436
1437 self.__class__._instance = self
1438
1439 self._monitor = LockMonitor()
1440
1441
1442
1443 self.__keyring = {
1444 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1445 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1446 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1447 LEVEL_INSTANCE: LockSet(instances, "instances",
1448 monitor=self._monitor),
1449 }
1450
1452 """Registers a new lock with the monitor.
1453
1454 See L{LockMonitor.RegisterLock}.
1455
1456 """
1457 return self._monitor.RegisterLock(provider)
1458
1460 """Queries information from all locks.
1461
1462 See L{LockMonitor.QueryLocks}.
1463
1464 """
1465 return self._monitor.QueryLocks(fields)
1466
1468 """Queries information from all locks, returning old-style data.
1469
1470 See L{LockMonitor.OldStyleQueryLocks}.
1471
1472 """
1473 return self._monitor.OldStyleQueryLocks(fields)
1474
1476 """List the lock names at the given level.
1477
1478 This can be used for debugging/testing purposes.
1479
1480 @param level: the level whose list of locks to get
1481
1482 """
1483 assert level in LEVELS, "Invalid locking level %s" % level
1484 return self.__keyring[level]._names()
1485
1487 """Check whether we are owning locks at the given level
1488
1489 """
1490 return self.__keyring[level]._is_owned()
1491
1492 is_owned = _is_owned
1493
1495 """Get the set of owned locks at the given level
1496
1497 """
1498 return self.__keyring[level]._list_owned()
1499
1500 list_owned = _list_owned
1501
1503 """Check that we don't own any lock at a level greater than the given one.
1504
1505 """
1506
1507
1508 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1509
1511 """Check if the current thread owns the BGL.
1512
1513 Both an exclusive or a shared acquisition work.
1514
1515 """
1516 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1517
1518 @staticmethod
1520 """Check if the level contains the BGL.
1521
1522 Check if acting on the given level and set of names will change
1523 the status of the Big Ganeti Lock.
1524
1525 """
1526 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1527
1528 - def acquire(self, level, names, timeout=None, shared=0, priority=None):
1529 """Acquire a set of resource locks, at the same level.
1530
1531 @type level: member of locking.LEVELS
1532 @param level: the level at which the locks shall be acquired
1533 @type names: list of strings (or string)
1534 @param names: the names of the locks which shall be acquired
1535 (special lock names, or instance/node names)
1536 @type shared: integer (0/1) used as a boolean
1537 @param shared: whether to acquire in shared mode; by default
1538 an exclusive lock will be acquired
1539 @type timeout: float
1540 @param timeout: Maximum time to acquire all locks
1541 @type priority: integer
1542 @param priority: Priority for acquiring lock
1543
1544 """
1545 assert level in LEVELS, "Invalid locking level %s" % level
1546
1547
1548
1549
1550
1551
1552
1553 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1554 "You must own the Big Ganeti Lock before acquiring any other")
1555
1556
1557 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1558 " while owning some at a greater one")
1559
1560
1561 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1562 priority=priority)
1563
1565 """Downgrade a set of resource locks from exclusive to shared mode.
1566
1567 You must have acquired the locks in exclusive mode.
1568
1569 @type level: member of locking.LEVELS
1570 @param level: the level at which the locks shall be downgraded
1571 @type names: list of strings, or None
1572 @param names: the names of the locks which shall be downgraded
1573 (defaults to all the locks acquired at the level)
1574
1575 """
1576 assert level in LEVELS, "Invalid locking level %s" % level
1577
1578 return self.__keyring[level].downgrade(names=names)
1579
1580 - def release(self, level, names=None):
1581 """Release a set of resource locks, at the same level.
1582
1583 You must have acquired the locks, either in shared or in exclusive
1584 mode, before releasing them.
1585
1586 @type level: member of locking.LEVELS
1587 @param level: the level at which the locks shall be released
1588 @type names: list of strings, or None
1589 @param names: the names of the locks which shall be released
1590 (defaults to all the locks acquired at that level)
1591
1592 """
1593 assert level in LEVELS, "Invalid locking level %s" % level
1594 assert (not self._contains_BGL(level, names) or
1595 not self._upper_owned(LEVEL_CLUSTER)), (
1596 "Cannot release the Big Ganeti Lock while holding something"
1597 " at upper levels (%r)" %
1598 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1599 for i in self.__keyring.keys()]), ))
1600
1601
1602 return self.__keyring[level].release(names)
1603
1604 - def add(self, level, names, acquired=0, shared=0):
1605 """Add locks at the specified level.
1606
1607 @type level: member of locking.LEVELS_MOD
1608 @param level: the level at which the locks shall be added
1609 @type names: list of strings
1610 @param names: names of the locks to acquire
1611 @type acquired: integer (0/1) used as a boolean
1612 @param acquired: whether to acquire the newly added locks
1613 @type shared: integer (0/1) used as a boolean
1614 @param shared: whether the acquisition will be shared
1615
1616 """
1617 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1618 assert self._BGL_owned(), ("You must own the BGL before performing other"
1619 " operations")
1620 assert not self._upper_owned(level), ("Cannot add locks at a level"
1621 " while owning some at a greater one")
1622 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1623
1624 - def remove(self, level, names):
1625 """Remove locks from the specified level.
1626
1627 You must either already own the locks you are trying to remove
1628 exclusively or not own any lock at an upper level.
1629
1630 @type level: member of locking.LEVELS_MOD
1631 @param level: the level at which the locks shall be removed
1632 @type names: list of strings
1633 @param names: the names of the locks which shall be removed
1634 (special lock names, or instance/node names)
1635
1636 """
1637 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1638 assert self._BGL_owned(), ("You must own the BGL before performing other"
1639 " operations")
1640
1641
1642
1643 assert self._is_owned(level) or not self._upper_owned(level), (
1644 "Cannot remove locks at a level while not owning it or"
1645 " owning some at a greater one")
1646 return self.__keyring[level].remove(names)
1647
1650 """Sorting key function.
1651
1652 Sort by name, registration order and then order of information. This provides
1653 a stable sort order over different providers, even if they return the same
1654 name.
1655
1656 """
1657 (name, _, _, _) = item
1658
1659 return (utils.NiceSortKey(name), num, idx)
1660
1663 _LOCK_ATTR = "_lock"
1664
1666 """Initializes this class.
1667
1668 """
1669 self._lock = SharedLock("LockMonitor")
1670
1671
1672 self._counter = itertools.count(0)
1673
1674
1675
1676 self._locks = weakref.WeakKeyDictionary()
1677
1678 @ssynchronized(_LOCK_ATTR)
1680 """Registers a new lock.
1681
1682 @param provider: Object with a callable method named C{GetLockInfo}, taking
1683 a single C{set} containing the requested information items
1684 @note: It would be nicer to only receive the function generating the
1685 requested information but, as it turns out, weak references to bound
1686 methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1687 workarounds, but none of the ones I found works properly in combination
1688 with a standard C{WeakKeyDictionary}
1689
1690 """
1691 assert provider not in self._locks, "Duplicate registration"
1692
1693
1694
1695
1696
1697
1698
1699 self._locks[provider] = self._counter.next()
1700
1702 """Get information from all locks.
1703
1704 """
1705
1706 self._lock.acquire(shared=1)
1707 try:
1708 items = self._locks.items()
1709 finally:
1710 self._lock.release()
1711
1712 return [(info, idx, num)
1713 for (provider, num) in items
1714 for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1715
1732
1734 """Queries information from all locks.
1735
1736 @type fields: list of strings
1737 @param fields: List of fields to return
1738
1739 """
1740 (qobj, ctx) = self._Query(fields)
1741
1742
1743 return query.GetQueryResponse(qobj, ctx)
1744
1746 """Queries information from all locks, returning old-style data.
1747
1748 @type fields: list of strings
1749 @param fields: List of fields to return
1750
1751 """
1752 (qobj, ctx) = self._Query(fields)
1753
1754 return qobj.OldStyleQuery(ctx)
1755