1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33 import weakref
34 import logging
35
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39
40
41 _EXCLUSIVE_TEXT = "exclusive"
42 _SHARED_TEXT = "shared"
46 """Shared Synchronization decorator.
47
48 Calls the function holding the given lock, either in exclusive or shared
49 mode. It requires the passed lock to be a SharedLock (or support its
50 semantics).
51
52 @type mylock: lockable object or string
53 @param mylock: lock to acquire or class member name of the lock to acquire
54
55 """
56 def wrap(fn):
57 def sync_function(*args, **kwargs):
58 if isinstance(mylock, basestring):
59 assert args, "cannot ssynchronize on non-class method: self not found"
60
61 lock = getattr(args[0], mylock)
62 else:
63 lock = mylock
64 lock.acquire(shared=shared)
65 try:
66 return fn(*args, **kwargs)
67 finally:
68 lock.release()
69 return sync_function
70 return wrap
71
74 """Class to calculate remaining timeout when doing several operations.
75
76 """
77 __slots__ = [
78 "_allow_negative",
79 "_start_time",
80 "_time_fn",
81 "_timeout",
82 ]
83
84 - def __init__(self, timeout, allow_negative, _time_fn=time.time):
85 """Initializes this class.
86
87 @type timeout: float
88 @param timeout: Timeout duration
89 @type allow_negative: bool
90 @param allow_negative: Whether to return values below zero
91 @param _time_fn: Time function for unittests
92
93 """
94 object.__init__(self)
95
96 if timeout is not None and timeout < 0.0:
97 raise ValueError("Timeout must not be negative")
98
99 self._timeout = timeout
100 self._allow_negative = allow_negative
101 self._time_fn = _time_fn
102
103 self._start_time = None
104
105 - def Remaining(self):
106 """Returns the remaining timeout.
107
108 """
109 if self._timeout is None:
110 return None
111
112
113 if self._start_time is None:
114 self._start_time = self._time_fn()
115
116
117 remaining_timeout = self._start_time + self._timeout - self._time_fn()
118
119 if not self._allow_negative:
120
121 return max(0.0, remaining_timeout)
122
123 return remaining_timeout
124
127 """Helper class for SingleNotifyPipeCondition
128
129 """
130 __slots__ = [
131 "_fd",
132 "_poller",
133 ]
134
136 """Constructor for _SingleNotifyPipeConditionWaiter
137
138 @type poller: select.poll
139 @param poller: Poller object
140 @type fd: int
141 @param fd: File descriptor to wait for
142
143 """
144 object.__init__(self)
145 self._poller = poller
146 self._fd = fd
147
149 """Wait for something to happen on the pipe.
150
151 @type timeout: float or None
152 @param timeout: Timeout for waiting (can be None)
153
154 """
155 running_timeout = RunningTimeout(timeout, True)
156
157 while True:
158 remaining_time = running_timeout.Remaining()
159
160 if remaining_time is not None:
161 if remaining_time < 0.0:
162 break
163
164
165 remaining_time *= 1000
166
167 try:
168 result = self._poller.poll(remaining_time)
169 except EnvironmentError, err:
170 if err.errno != errno.EINTR:
171 raise
172 result = None
173
174
175 if result and result[0][0] == self._fd:
176 break
177
180 """Base class containing common code for conditions.
181
182 Some of this code is taken from python's threading module.
183
184 """
185 __slots__ = [
186 "_lock",
187 "acquire",
188 "release",
189 "_is_owned",
190 "_acquire_restore",
191 "_release_save",
192 ]
193
221
223 """Check whether lock is owned by current thread.
224
225 """
226 if self._lock.acquire(0):
227 self._lock.release()
228 return False
229 return True
230
233
236
238 """Raise an exception if the current thread doesn't own the lock.
239
240 """
241 if not self._is_owned():
242 raise RuntimeError("cannot work with un-aquired lock")
243
246 """Condition which can only be notified once.
247
248 This condition class uses pipes and poll, internally, to be able to wait for
249 notification with a timeout, without resorting to polling. It is almost
250 compatible with Python's threading.Condition, with the following differences:
251 - notifyAll can only be called once, and no wait can happen after that
252 - notify is not supported, only notifyAll
253
254 """
255
256 __slots__ = [
257 "_poller",
258 "_read_fd",
259 "_write_fd",
260 "_nwaiters",
261 "_notified",
262 ]
263
264 _waiter_class = _SingleNotifyPipeConditionWaiter
265
267 """Constructor for SingleNotifyPipeCondition
268
269 """
270 _BaseCondition.__init__(self, lock)
271 self._nwaiters = 0
272 self._notified = False
273 self._read_fd = None
274 self._write_fd = None
275 self._poller = None
276
278 """Throws an exception if already notified.
279
280 """
281 if self._notified:
282 raise RuntimeError("cannot use already notified condition")
283
285 """Cleanup open file descriptors, if any.
286
287 """
288 if self._read_fd is not None:
289 os.close(self._read_fd)
290 self._read_fd = None
291
292 if self._write_fd is not None:
293 os.close(self._write_fd)
294 self._write_fd = None
295 self._poller = None
296
297 - def wait(self, timeout=None):
298 """Wait for a notification.
299
300 @type timeout: float or None
301 @param timeout: Waiting timeout (can be None)
302
303 """
304 self._check_owned()
305 self._check_unnotified()
306
307 self._nwaiters += 1
308 try:
309 if self._poller is None:
310 (self._read_fd, self._write_fd) = os.pipe()
311 self._poller = select.poll()
312 self._poller.register(self._read_fd, select.POLLHUP)
313
314 wait_fn = self._waiter_class(self._poller, self._read_fd)
315 state = self._release_save()
316 try:
317
318 wait_fn(timeout)
319 finally:
320
321 self._acquire_restore(state)
322 finally:
323 self._nwaiters -= 1
324 if self._nwaiters == 0:
325 self._Cleanup()
326
328 """Close the writing side of the pipe to notify all waiters.
329
330 """
331 self._check_owned()
332 self._check_unnotified()
333 self._notified = True
334 if self._write_fd is not None:
335 os.close(self._write_fd)
336 self._write_fd = None
337
340 """Group-only non-polling condition with counters.
341
342 This condition class uses pipes and poll, internally, to be able to wait for
343 notification with a timeout, without resorting to polling. It is almost
344 compatible with Python's threading.Condition, but only supports notifyAll and
345 non-recursive locks. As an additional features it's able to report whether
346 there are any waiting threads.
347
348 """
349 __slots__ = [
350 "_waiters",
351 "_single_condition",
352 ]
353
354 _single_condition_class = SingleNotifyPipeCondition
355
357 """Initializes this class.
358
359 """
360 _BaseCondition.__init__(self, lock)
361 self._waiters = set()
362 self._single_condition = self._single_condition_class(self._lock)
363
364 - def wait(self, timeout=None):
365 """Wait for a notification.
366
367 @type timeout: float or None
368 @param timeout: Waiting timeout (can be None)
369
370 """
371 self._check_owned()
372
373
374
375 cond = self._single_condition
376
377 self._waiters.add(threading.currentThread())
378 try:
379 cond.wait(timeout)
380 finally:
381 self._check_owned()
382 self._waiters.remove(threading.currentThread())
383
385 """Notify all currently waiting threads.
386
387 """
388 self._check_owned()
389 self._single_condition.notifyAll()
390 self._single_condition = self._single_condition_class(self._lock)
391
393 """Returns a list of all waiting threads.
394
395 """
396 self._check_owned()
397
398 return self._waiters
399
401 """Returns whether there are active waiters.
402
403 """
404 self._check_owned()
405
406 return bool(self._waiters)
407
410 """Implements a shared lock.
411
412 Multiple threads can acquire the lock in a shared way by calling
413 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
414 threads can call C{acquire(shared=0)}.
415
416 The lock prevents starvation but does not guarantee that threads will acquire
417 the shared lock in the order they queued for it, just that they will
418 eventually do so.
419
420 @type name: string
421 @ivar name: the name of the lock
422
423 """
424 __slots__ = [
425 "__weakref__",
426 "__active_shr_c",
427 "__inactive_shr_c",
428 "__deleted",
429 "__exc",
430 "__lock",
431 "__pending",
432 "__shr",
433 "name",
434 ]
435
436 __condition_class = PipeCondition
437
438 - def __init__(self, name, monitor=None):
439 """Construct a new SharedLock.
440
441 @param name: the name of the lock
442 @type monitor: L{LockMonitor}
443 @param monitor: Lock monitor with which to register
444
445 """
446 object.__init__(self)
447
448 self.name = name
449
450
451 self.__lock = threading.Lock()
452
453
454 self.__pending = []
455
456
457 self.__active_shr_c = self.__condition_class(self.__lock)
458 self.__inactive_shr_c = self.__condition_class(self.__lock)
459
460
461 self.__shr = set()
462 self.__exc = None
463
464
465 self.__deleted = False
466
467
468 if monitor:
469 monitor.RegisterLock(self)
470
472 """Retrieves information for querying locks.
473
474 @type fields: list of strings
475 @param fields: List of fields to return
476
477 """
478 self.__lock.acquire()
479 try:
480 info = []
481
482
483
484
485 for fname in fields:
486 if fname == "name":
487 info.append(self.name)
488 elif fname == "mode":
489 if self.__deleted:
490 info.append("deleted")
491 assert not (self.__exc or self.__shr)
492 elif self.__exc:
493 info.append(_EXCLUSIVE_TEXT)
494 elif self.__shr:
495 info.append(_SHARED_TEXT)
496 else:
497 info.append(None)
498 elif fname == "owner":
499 if self.__exc:
500 owner = [self.__exc]
501 else:
502 owner = self.__shr
503
504 if owner:
505 assert not self.__deleted
506 info.append([i.getName() for i in owner])
507 else:
508 info.append(None)
509 elif fname == "pending":
510 data = []
511
512 for cond in self.__pending:
513 if cond in (self.__active_shr_c, self.__inactive_shr_c):
514 mode = _SHARED_TEXT
515 else:
516 mode = _EXCLUSIVE_TEXT
517
518
519
520 data.append((mode, sorted([i.getName()
521 for i in cond.get_waiting()])))
522
523 info.append(data)
524 else:
525 raise errors.OpExecError("Invalid query field '%s'" % fname)
526
527 return info
528 finally:
529 self.__lock.release()
530
532 """Raises an exception if the lock has been deleted.
533
534 """
535 if self.__deleted:
536 raise errors.LockError("Deleted lock %s" % self.name)
537
539 """Is the current thread sharing the lock at this time?
540
541 """
542 return threading.currentThread() in self.__shr
543
545 """Is the current thread holding the lock exclusively at this time?
546
547 """
548 return threading.currentThread() == self.__exc
549
551 """Is the current thread somehow owning the lock at this time?
552
553 This is a private version of the function, which presumes you're holding
554 the internal lock.
555
556 """
557 if shared < 0:
558 return self.__is_sharer() or self.__is_exclusive()
559 elif shared:
560 return self.__is_sharer()
561 else:
562 return self.__is_exclusive()
563
565 """Is the current thread somehow owning the lock at this time?
566
567 @param shared:
568 - < 0: check for any type of ownership (default)
569 - 0: check for exclusive ownership
570 - > 0: check for shared ownership
571
572 """
573 self.__lock.acquire()
574 try:
575 return self.__is_owned(shared=shared)
576 finally:
577 self.__lock.release()
578
580 """Returns the number of pending acquires.
581
582 @rtype: int
583
584 """
585 self.__lock.acquire()
586 try:
587 return len(self.__pending)
588 finally:
589 self.__lock.release()
590
592 """Actually acquire the lock.
593
594 """
595 if shared:
596 self.__shr.add(threading.currentThread())
597 else:
598 self.__exc = threading.currentThread()
599
601 """Determine whether lock can be acquired.
602
603 """
604 if shared:
605 return self.__exc is None
606 else:
607 return len(self.__shr) == 0 and self.__exc is None
608
610 """Checks whether the passed condition is on top of the queue.
611
612 The caller must make sure the queue isn't empty.
613
614 """
615 return self.__pending[0] == cond
616
618 """Acquire a shared lock.
619
620 @param shared: whether to acquire in shared mode; by default an
621 exclusive lock will be acquired
622 @param timeout: maximum waiting time before giving up
623
624 """
625 self.__check_deleted()
626
627
628 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
629 " %s" % self.name)
630
631
632 if not self.__pending and self.__can_acquire(shared):
633
634 self.__do_acquire(shared)
635 return True
636
637 if shared:
638 wait_condition = self.__active_shr_c
639
640
641 if wait_condition not in self.__pending:
642 self.__pending.append(wait_condition)
643 else:
644 wait_condition = self.__condition_class(self.__lock)
645
646 self.__pending.append(wait_condition)
647
648 try:
649
650
651 while not (self.__is_on_top(wait_condition) and
652 self.__can_acquire(shared)):
653
654 wait_condition.wait(timeout)
655 self.__check_deleted()
656
657
658
659 if timeout is not None:
660 break
661
662 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
663 self.__do_acquire(shared)
664 return True
665 finally:
666
667 if not wait_condition.has_waiting() and not self.__deleted:
668 self.__pending.remove(wait_condition)
669
670 return False
671
672 - def acquire(self, shared=0, timeout=None, test_notify=None):
673 """Acquire a shared lock.
674
675 @type shared: integer (0/1) used as a boolean
676 @param shared: whether to acquire in shared mode; by default an
677 exclusive lock will be acquired
678 @type timeout: float
679 @param timeout: maximum waiting time before giving up
680 @type test_notify: callable or None
681 @param test_notify: Special callback function for unittesting
682
683 """
684 self.__lock.acquire()
685 try:
686
687 if __debug__ and callable(test_notify):
688 test_notify()
689
690 return self.__acquire_unlocked(shared, timeout)
691 finally:
692 self.__lock.release()
693
695 """Release a Shared Lock.
696
697 You must have acquired the lock, either in shared or in exclusive mode,
698 before calling this function.
699
700 """
701 self.__lock.acquire()
702 try:
703 assert self.__is_exclusive() or self.__is_sharer(), \
704 "Cannot release non-owned lock"
705
706
707 if self.__is_exclusive():
708 self.__exc = None
709 else:
710 self.__shr.remove(threading.currentThread())
711
712
713 if self.__pending:
714 first_condition = self.__pending[0]
715 first_condition.notifyAll()
716
717 if first_condition == self.__active_shr_c:
718 self.__active_shr_c = self.__inactive_shr_c
719 self.__inactive_shr_c = first_condition
720
721 finally:
722 self.__lock.release()
723
724 - def delete(self, timeout=None):
725 """Delete a Shared Lock.
726
727 This operation will declare the lock for removal. First the lock will be
728 acquired in exclusive mode if you don't already own it, then the lock
729 will be put in a state where any future and pending acquire() fail.
730
731 @type timeout: float
732 @param timeout: maximum waiting time before giving up
733
734 """
735 self.__lock.acquire()
736 try:
737 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
738
739 self.__check_deleted()
740
741
742 acquired = self.__is_exclusive()
743
744 if not acquired:
745 acquired = self.__acquire_unlocked(0, timeout)
746
747 assert self.__is_exclusive() and not self.__is_sharer(), \
748 "Lock wasn't acquired in exclusive mode"
749
750 if acquired:
751 self.__deleted = True
752 self.__exc = None
753
754 assert not (self.__exc or self.__shr), "Found owner during deletion"
755
756
757 while self.__pending:
758 self.__pending.pop().notifyAll()
759
760 return acquired
761 finally:
762 self.__lock.release()
763
768
771
772
773
774
775 ALL_SET = None
779 """Internal exception to abort an acquire on a timeout.
780
781 """
782
785 """Implements a set of locks.
786
787 This abstraction implements a set of shared locks for the same resource type,
788 distinguished by name. The user can lock a subset of the resources and the
789 LockSet will take care of acquiring the locks always in the same order, thus
790 preventing deadlock.
791
792 All the locks needed in the same set must be acquired together, though.
793
794 @type name: string
795 @ivar name: the name of the lockset
796
797 """
798 - def __init__(self, members, name, monitor=None):
799 """Constructs a new LockSet.
800
801 @type members: list of strings
802 @param members: initial members of the set
803 @type monitor: L{LockMonitor}
804 @param monitor: Lock monitor with which to register member locks
805
806 """
807 assert members is not None, "members parameter is not a list"
808 self.name = name
809
810
811 self.__monitor = monitor
812
813
814 self.__lock = SharedLock(name)
815
816
817
818 self.__lockdict = {}
819
820 for mname in members:
821 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
822 monitor=monitor)
823
824
825
826
827
828
829
830
831 self.__owners = {}
832
834 """Returns the name for a member lock.
835
836 """
837 return "%s/%s" % (self.name, mname)
838
840 """Is the current thread a current level owner?"""
841 return threading.currentThread() in self.__owners
842
844 """Note the current thread owns the given lock"""
845 if name is None:
846 if not self._is_owned():
847 self.__owners[threading.currentThread()] = set()
848 else:
849 if self._is_owned():
850 self.__owners[threading.currentThread()].add(name)
851 else:
852 self.__owners[threading.currentThread()] = set([name])
853
855 """Note the current thread owns the given lock"""
856
857 assert not (name is None and self.__lock._is_owned()), \
858 "Cannot hold internal lock when deleting owner status"
859
860 if name is not None:
861 self.__owners[threading.currentThread()].remove(name)
862
863
864 if (not self.__lock._is_owned() and
865 not self.__owners[threading.currentThread()]):
866 del self.__owners[threading.currentThread()]
867
869 """Get the set of resource names owned by the current thread"""
870 if self._is_owned():
871 return self.__owners[threading.currentThread()].copy()
872 else:
873 return set()
874
876 """Release and delete all resources owned by the current thread"""
877 for lname in self._list_owned():
878 lock = self.__lockdict[lname]
879 if lock._is_owned():
880 lock.release()
881 self._del_owned(name=lname)
882
884 """Return the current set of names.
885
886 Only call this function while holding __lock and don't iterate on the
887 result after releasing the lock.
888
889 """
890 return self.__lockdict.keys()
891
893 """Return a copy of the current set of elements.
894
895 Used only for debugging purposes.
896
897 """
898
899
900 release_lock = False
901 if not self.__lock._is_owned():
902 release_lock = True
903 self.__lock.acquire(shared=1)
904 try:
905 result = self.__names()
906 finally:
907 if release_lock:
908 self.__lock.release()
909 return set(result)
910
911 - def acquire(self, names, timeout=None, shared=0, test_notify=None):
912 """Acquire a set of resource locks.
913
914 @type names: list of strings (or string)
915 @param names: the names of the locks which shall be acquired
916 (special lock names, or instance/node names)
917 @type shared: integer (0/1) used as a boolean
918 @param shared: whether to acquire in shared mode; by default an
919 exclusive lock will be acquired
920 @type timeout: float or None
921 @param timeout: Maximum time to acquire all locks
922 @type test_notify: callable or None
923 @param test_notify: Special callback function for unittesting
924
925 @return: Set of all locks successfully acquired or None in case of timeout
926
927 @raise errors.LockError: when any lock we try to acquire has
928 been deleted before we succeed. In this case none of the
929 locks requested will be acquired.
930
931 """
932 assert timeout is None or timeout >= 0.0
933
934
935 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
936 " (lockset %s)" % self.name)
937
938
939
940 running_timeout = RunningTimeout(timeout, False)
941
942 try:
943 if names is not None:
944
945 if isinstance(names, basestring):
946 names = [names]
947
948 return self.__acquire_inner(names, False, shared,
949 running_timeout.Remaining, test_notify)
950
951 else:
952
953
954
955
956
957
958
959
960
961 if not self.__lock.acquire(shared=shared,
962 timeout=running_timeout.Remaining()):
963 raise _AcquireTimeout()
964 try:
965
966 self._add_owned()
967
968 return self.__acquire_inner(self.__names(), True, shared,
969 running_timeout.Remaining, test_notify)
970 except:
971
972
973
974 self.__lock.release()
975 self._del_owned()
976 raise
977
978 except _AcquireTimeout:
979 return None
980
981 - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
982 """Inner logic for acquiring a number of locks.
983
984 @param names: Names of the locks to be acquired
985 @param want_all: Whether all locks in the set should be acquired
986 @param shared: Whether to acquire in shared mode
987 @param timeout_fn: Function returning remaining timeout
988 @param test_notify: Special callback function for unittesting
989
990 """
991 acquire_list = []
992
993
994
995
996
997 for lname in sorted(utils.UniqueSequence(names)):
998 try:
999 lock = self.__lockdict[lname]
1000 except KeyError:
1001 if want_all:
1002
1003
1004 continue
1005
1006 raise errors.LockError("Non-existing lock %s in set %s" %
1007 (lname, self.name))
1008
1009 acquire_list.append((lname, lock))
1010
1011
1012 acquired = set()
1013
1014 try:
1015
1016
1017
1018
1019
1020 for (lname, lock) in acquire_list:
1021 if __debug__ and callable(test_notify):
1022 test_notify_fn = lambda: test_notify(lname)
1023 else:
1024 test_notify_fn = None
1025
1026 timeout = timeout_fn()
1027
1028 try:
1029
1030 acq_success = lock.acquire(shared=shared, timeout=timeout,
1031 test_notify=test_notify_fn)
1032 except errors.LockError:
1033 if want_all:
1034
1035
1036 continue
1037
1038 raise errors.LockError("Non-existing lock %s in set %s" %
1039 (lname, self.name))
1040
1041 if not acq_success:
1042
1043 if timeout is None:
1044
1045
1046 raise errors.LockError("Failed to get lock %s (set %s)" %
1047 (lname, self.name))
1048
1049 raise _AcquireTimeout()
1050
1051 try:
1052
1053 self._add_owned(name=lname)
1054 acquired.add(lname)
1055
1056 except:
1057
1058
1059
1060 if lock._is_owned():
1061 lock.release()
1062 raise
1063
1064 except:
1065
1066 self._release_and_delete_owned()
1067 raise
1068
1069 return acquired
1070
1072 """Release a set of resource locks, at the same level.
1073
1074 You must have acquired the locks, either in shared or in exclusive mode,
1075 before releasing them.
1076
1077 @type names: list of strings, or None
1078 @param names: the names of the locks which shall be released
1079 (defaults to all the locks acquired at that level).
1080
1081 """
1082 assert self._is_owned(), ("release() on lock set %s while not owner" %
1083 self.name)
1084
1085
1086 if isinstance(names, basestring):
1087 names = [names]
1088
1089 if names is None:
1090 names = self._list_owned()
1091 else:
1092 names = set(names)
1093 assert self._list_owned().issuperset(names), (
1094 "release() on unheld resources %s (set %s)" %
1095 (names.difference(self._list_owned()), self.name))
1096
1097
1098
1099 if self.__lock._is_owned():
1100 self.__lock.release()
1101 self._del_owned()
1102
1103 for lockname in names:
1104
1105
1106 self.__lockdict[lockname].release()
1107 self._del_owned(name=lockname)
1108
1109 - def add(self, names, acquired=0, shared=0):
1110 """Add a new set of elements to the set
1111
1112 @type names: list of strings
1113 @param names: names of the new elements to add
1114 @type acquired: integer (0/1) used as a boolean
1115 @param acquired: pre-acquire the new resource?
1116 @type shared: integer (0/1) used as a boolean
1117 @param shared: is the pre-acquisition shared?
1118
1119 """
1120
1121 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1122 ("Cannot add locks if the set %s is only partially owned, or shared" %
1123 self.name)
1124
1125
1126 if isinstance(names, basestring):
1127 names = [names]
1128
1129
1130
1131 release_lock = False
1132 if not self.__lock._is_owned():
1133 release_lock = True
1134 self.__lock.acquire()
1135
1136 try:
1137 invalid_names = set(self.__names()).intersection(names)
1138 if invalid_names:
1139
1140
1141
1142 raise errors.LockError("duplicate add(%s) on lockset %s" %
1143 (invalid_names, self.name))
1144
1145 for lockname in names:
1146 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1147
1148 if acquired:
1149 lock.acquire(shared=shared)
1150
1151 try:
1152 self._add_owned(name=lockname)
1153 except:
1154
1155
1156
1157
1158
1159
1160 lock.release()
1161 raise
1162
1163 self.__lockdict[lockname] = lock
1164
1165 finally:
1166
1167 if release_lock:
1168 self.__lock.release()
1169
1170 return True
1171
1173 """Remove elements from the lock set.
1174
1175 You can either not hold anything in the lockset or already hold a superset
1176 of the elements you want to delete, exclusively.
1177
1178 @type names: list of strings
1179 @param names: names of the resource to remove.
1180
1181 @return: a list of locks which we removed; the list is always
1182 equal to the names list if we were holding all the locks
1183 exclusively
1184
1185 """
1186
1187 if isinstance(names, basestring):
1188 names = [names]
1189
1190
1191
1192
1193 assert not self._is_owned() or self._list_owned().issuperset(names), (
1194 "remove() on acquired lockset %s while not owning all elements" %
1195 self.name)
1196
1197 removed = []
1198
1199 for lname in names:
1200
1201
1202
1203
1204
1205 try:
1206 self.__lockdict[lname].delete()
1207 removed.append(lname)
1208 except (KeyError, errors.LockError):
1209
1210 assert not self._is_owned(), ("remove failed while holding lockset %s"
1211 % self.name)
1212 else:
1213
1214
1215
1216
1217
1218
1219
1220 del self.__lockdict[lname]
1221
1222 if self._is_owned():
1223 self._del_owned(name=lname)
1224
1225 return removed
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 LEVEL_CLUSTER = 0
1238 LEVEL_INSTANCE = 1
1239 LEVEL_NODE = 2
1240
1241 LEVELS = [LEVEL_CLUSTER,
1242 LEVEL_INSTANCE,
1243 LEVEL_NODE]
1244
1245
1246 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1247
1248 LEVEL_NAMES = {
1249 LEVEL_CLUSTER: "cluster",
1250 LEVEL_INSTANCE: "instance",
1251 LEVEL_NODE: "node",
1252 }
1253
1254
1255 BGL = 'BGL'
1259 """The Ganeti Locking Library
1260
1261 The purpose of this small library is to manage locking for ganeti clusters
1262 in a central place, while at the same time doing dynamic checks against
1263 possible deadlocks. It will also make it easier to transition to a different
1264 lock type should we migrate away from python threads.
1265
1266 """
1267 _instance = None
1268
1269 - def __init__(self, nodes=None, instances=None):
1270 """Constructs a new GanetiLockManager object.
1271
1272 There should be only a GanetiLockManager object at any time, so this
1273 function raises an error if this is not the case.
1274
1275 @param nodes: list of node names
1276 @param instances: list of instance names
1277
1278 """
1279 assert self.__class__._instance is None, \
1280 "double GanetiLockManager instance"
1281
1282 self.__class__._instance = self
1283
1284 self._monitor = LockMonitor()
1285
1286
1287
1288 self.__keyring = {
1289 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1290 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1291 LEVEL_INSTANCE: LockSet(instances, "instances",
1292 monitor=self._monitor),
1293 }
1294
1296 """Queries information from all locks.
1297
1298 See L{LockMonitor.QueryLocks}.
1299
1300 """
1301 return self._monitor.QueryLocks(fields, sync)
1302
1304 """List the lock names at the given level.
1305
1306 This can be used for debugging/testing purposes.
1307
1308 @param level: the level whose list of locks to get
1309
1310 """
1311 assert level in LEVELS, "Invalid locking level %s" % level
1312 return self.__keyring[level]._names()
1313
1315 """Check whether we are owning locks at the given level
1316
1317 """
1318 return self.__keyring[level]._is_owned()
1319
1320 is_owned = _is_owned
1321
1323 """Get the set of owned locks at the given level
1324
1325 """
1326 return self.__keyring[level]._list_owned()
1327
1329 """Check that we don't own any lock at a level greater than the given one.
1330
1331 """
1332
1333
1334 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1335
1337 """Check if the current thread owns the BGL.
1338
1339 Both an exclusive or a shared acquisition work.
1340
1341 """
1342 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1343
1344 @staticmethod
1346 """Check if the level contains the BGL.
1347
1348 Check if acting on the given level and set of names will change
1349 the status of the Big Ganeti Lock.
1350
1351 """
1352 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1353
1354 - def acquire(self, level, names, timeout=None, shared=0):
1355 """Acquire a set of resource locks, at the same level.
1356
1357 @type level: member of locking.LEVELS
1358 @param level: the level at which the locks shall be acquired
1359 @type names: list of strings (or string)
1360 @param names: the names of the locks which shall be acquired
1361 (special lock names, or instance/node names)
1362 @type shared: integer (0/1) used as a boolean
1363 @param shared: whether to acquire in shared mode; by default
1364 an exclusive lock will be acquired
1365 @type timeout: float
1366 @param timeout: Maximum time to acquire all locks
1367
1368 """
1369 assert level in LEVELS, "Invalid locking level %s" % level
1370
1371
1372
1373
1374
1375
1376
1377 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1378 "You must own the Big Ganeti Lock before acquiring any other")
1379
1380
1381 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1382 " while owning some at a greater one")
1383
1384
1385 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1386
1387 - def release(self, level, names=None):
1388 """Release a set of resource locks, at the same level.
1389
1390 You must have acquired the locks, either in shared or in exclusive
1391 mode, before releasing them.
1392
1393 @type level: member of locking.LEVELS
1394 @param level: the level at which the locks shall be released
1395 @type names: list of strings, or None
1396 @param names: the names of the locks which shall be released
1397 (defaults to all the locks acquired at that level)
1398
1399 """
1400 assert level in LEVELS, "Invalid locking level %s" % level
1401 assert (not self._contains_BGL(level, names) or
1402 not self._upper_owned(LEVEL_CLUSTER)), (
1403 "Cannot release the Big Ganeti Lock while holding something"
1404 " at upper levels (%r)" %
1405 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1406 for i in self.__keyring.keys()]), ))
1407
1408
1409 return self.__keyring[level].release(names)
1410
1411 - def add(self, level, names, acquired=0, shared=0):
1412 """Add locks at the specified level.
1413
1414 @type level: member of locking.LEVELS_MOD
1415 @param level: the level at which the locks shall be added
1416 @type names: list of strings
1417 @param names: names of the locks to acquire
1418 @type acquired: integer (0/1) used as a boolean
1419 @param acquired: whether to acquire the newly added locks
1420 @type shared: integer (0/1) used as a boolean
1421 @param shared: whether the acquisition will be shared
1422
1423 """
1424 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1425 assert self._BGL_owned(), ("You must own the BGL before performing other"
1426 " operations")
1427 assert not self._upper_owned(level), ("Cannot add locks at a level"
1428 " while owning some at a greater one")
1429 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1430
1431 - def remove(self, level, names):
1432 """Remove locks from the specified level.
1433
1434 You must either already own the locks you are trying to remove
1435 exclusively or not own any lock at an upper level.
1436
1437 @type level: member of locking.LEVELS_MOD
1438 @param level: the level at which the locks shall be removed
1439 @type names: list of strings
1440 @param names: the names of the locks which shall be removed
1441 (special lock names, or instance/node names)
1442
1443 """
1444 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1445 assert self._BGL_owned(), ("You must own the BGL before performing other"
1446 " operations")
1447
1448
1449
1450 assert self._is_owned(level) or not self._upper_owned(level), (
1451 "Cannot remove locks at a level while not owning it or"
1452 " owning some at a greater one")
1453 return self.__keyring[level].remove(names)
1454
1457 _LOCK_ATTR = "_lock"
1458
1460 """Initializes this class.
1461
1462 """
1463 self._lock = SharedLock("LockMonitor")
1464
1465
1466
1467 self._locks = weakref.WeakKeyDictionary()
1468
1469 @ssynchronized(_LOCK_ATTR)
1471 """Registers a new lock.
1472
1473 """
1474 logging.debug("Registering lock %s", lock.name)
1475 assert lock not in self._locks, "Duplicate lock registration"
1476 assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1477 "Found duplicate lock name"
1478 self._locks[lock] = None
1479
1480 @ssynchronized(_LOCK_ATTR)
1482 """Get information from all locks while the monitor lock is held.
1483
1484 """
1485 result = {}
1486
1487 for lock in self._locks.keys():
1488 assert lock.name not in result, "Found duplicate lock name"
1489 result[lock.name] = lock.GetInfo(fields)
1490
1491 return result
1492
1494 """Queries information from all locks.
1495
1496 @type fields: list of strings
1497 @param fields: List of fields to return
1498 @type sync: boolean
1499 @param sync: Whether to operate in synchronous mode
1500
1501 """
1502 if sync:
1503 raise NotImplementedError("Synchronous queries are not implemented")
1504
1505
1506 result = self._GetLockInfo(fields)
1507
1508
1509 return [result[name] for name in utils.NiceSort(result.keys())]
1510