1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33
34 from ganeti import errors
35 from ganeti import utils
36 from ganeti import compat
40 """Shared Synchronization decorator.
41
42 Calls the function holding the given lock, either in exclusive or shared
43 mode. It requires the passed lock to be a SharedLock (or support its
44 semantics).
45
46 """
47 def wrap(fn):
48 def sync_function(*args, **kwargs):
49 lock.acquire(shared=shared)
50 try:
51 return fn(*args, **kwargs)
52 finally:
53 lock.release()
54 return sync_function
55 return wrap
56
59 """Class to calculate remaining timeout when doing several operations.
60
61 """
62 __slots__ = [
63 "_allow_negative",
64 "_start_time",
65 "_time_fn",
66 "_timeout",
67 ]
68
69 - def __init__(self, timeout, allow_negative, _time_fn=time.time):
70 """Initializes this class.
71
72 @type timeout: float
73 @param timeout: Timeout duration
74 @type allow_negative: bool
75 @param allow_negative: Whether to return values below zero
76 @param _time_fn: Time function for unittests
77
78 """
79 object.__init__(self)
80
81 if timeout is not None and timeout < 0.0:
82 raise ValueError("Timeout must not be negative")
83
84 self._timeout = timeout
85 self._allow_negative = allow_negative
86 self._time_fn = _time_fn
87
88 self._start_time = None
89
90 - def Remaining(self):
91 """Returns the remaining timeout.
92
93 """
94 if self._timeout is None:
95 return None
96
97
98 if self._start_time is None:
99 self._start_time = self._time_fn()
100
101
102 remaining_timeout = self._start_time + self._timeout - self._time_fn()
103
104 if not self._allow_negative:
105
106 return max(0.0, remaining_timeout)
107
108 return remaining_timeout
109
112 """Helper class for SingleNotifyPipeCondition
113
114 """
115 __slots__ = [
116 "_fd",
117 "_poller",
118 ]
119
121 """Constructor for _SingleNotifyPipeConditionWaiter
122
123 @type poller: select.poll
124 @param poller: Poller object
125 @type fd: int
126 @param fd: File descriptor to wait for
127
128 """
129 object.__init__(self)
130 self._poller = poller
131 self._fd = fd
132
134 """Wait for something to happen on the pipe.
135
136 @type timeout: float or None
137 @param timeout: Timeout for waiting (can be None)
138
139 """
140 running_timeout = RunningTimeout(timeout, True)
141
142 while True:
143 remaining_time = running_timeout.Remaining()
144
145 if remaining_time is not None:
146 if remaining_time < 0.0:
147 break
148
149
150 remaining_time *= 1000
151
152 try:
153 result = self._poller.poll(remaining_time)
154 except EnvironmentError, err:
155 if err.errno != errno.EINTR:
156 raise
157 result = None
158
159
160 if result and result[0][0] == self._fd:
161 break
162
165 """Base class containing common code for conditions.
166
167 Some of this code is taken from python's threading module.
168
169 """
170 __slots__ = [
171 "_lock",
172 "acquire",
173 "release",
174 ]
175
177 """Constructor for _BaseCondition.
178
179 @type lock: threading.Lock
180 @param lock: condition base lock
181
182 """
183 object.__init__(self)
184
185
186 assert not hasattr(lock, "_acquire_restore")
187 assert not hasattr(lock, "_release_save")
188
189 self._lock = lock
190
191
192 self.acquire = lock.acquire
193 self.release = lock.release
194
196 """Check whether lock is owned by current thread.
197
198 """
199 if self._lock.acquire(0):
200 self._lock.release()
201 return False
202
203 return True
204
206 """Raise an exception if the current thread doesn't own the lock.
207
208 """
209 if not self._is_owned():
210 raise RuntimeError("cannot work with un-aquired lock")
211
214 """Condition which can only be notified once.
215
216 This condition class uses pipes and poll, internally, to be able to wait for
217 notification with a timeout, without resorting to polling. It is almost
218 compatible with Python's threading.Condition, with the following differences:
219 - notifyAll can only be called once, and no wait can happen after that
220 - notify is not supported, only notifyAll
221
222 """
223
224 __slots__ = [
225 "_poller",
226 "_read_fd",
227 "_write_fd",
228 "_nwaiters",
229 "_notified",
230 ]
231
232 _waiter_class = _SingleNotifyPipeConditionWaiter
233
235 """Constructor for SingleNotifyPipeCondition
236
237 """
238 _BaseCondition.__init__(self, lock)
239 self._nwaiters = 0
240 self._notified = False
241 self._read_fd = None
242 self._write_fd = None
243 self._poller = None
244
246 """Throws an exception if already notified.
247
248 """
249 if self._notified:
250 raise RuntimeError("cannot use already notified condition")
251
253 """Cleanup open file descriptors, if any.
254
255 """
256 if self._read_fd is not None:
257 os.close(self._read_fd)
258 self._read_fd = None
259
260 if self._write_fd is not None:
261 os.close(self._write_fd)
262 self._write_fd = None
263 self._poller = None
264
265 - def wait(self, timeout=None):
266 """Wait for a notification.
267
268 @type timeout: float or None
269 @param timeout: Waiting timeout (can be None)
270
271 """
272 self._check_owned()
273 self._check_unnotified()
274
275 self._nwaiters += 1
276 try:
277 if self._poller is None:
278 (self._read_fd, self._write_fd) = os.pipe()
279 self._poller = select.poll()
280 self._poller.register(self._read_fd, select.POLLHUP)
281
282 wait_fn = self._waiter_class(self._poller, self._read_fd)
283 self.release()
284 try:
285
286 wait_fn(timeout)
287 finally:
288
289 self.acquire()
290 finally:
291 self._nwaiters -= 1
292 if self._nwaiters == 0:
293 self._Cleanup()
294
296 """Close the writing side of the pipe to notify all waiters.
297
298 """
299 self._check_owned()
300 self._check_unnotified()
301 self._notified = True
302 if self._write_fd is not None:
303 os.close(self._write_fd)
304 self._write_fd = None
305
308 """Group-only non-polling condition with counters.
309
310 This condition class uses pipes and poll, internally, to be able to wait for
311 notification with a timeout, without resorting to polling. It is almost
312 compatible with Python's threading.Condition, but only supports notifyAll and
313 non-recursive locks. As an additional features it's able to report whether
314 there are any waiting threads.
315
316 """
317 __slots__ = [
318 "_nwaiters",
319 "_single_condition",
320 ]
321
322 _single_condition_class = SingleNotifyPipeCondition
323
325 """Initializes this class.
326
327 """
328 _BaseCondition.__init__(self, lock)
329 self._nwaiters = 0
330 self._single_condition = self._single_condition_class(self._lock)
331
332 - def wait(self, timeout=None):
333 """Wait for a notification.
334
335 @type timeout: float or None
336 @param timeout: Waiting timeout (can be None)
337
338 """
339 self._check_owned()
340
341
342
343 my_condition = self._single_condition
344
345 assert self._nwaiters >= 0
346 self._nwaiters += 1
347 try:
348 my_condition.wait(timeout)
349 finally:
350 assert self._nwaiters > 0
351 self._nwaiters -= 1
352
354 """Notify all currently waiting threads.
355
356 """
357 self._check_owned()
358 self._single_condition.notifyAll()
359 self._single_condition = self._single_condition_class(self._lock)
360
362 """Returns whether there are active waiters.
363
364 """
365 self._check_owned()
366
367 return bool(self._nwaiters)
368
371 """Wrapper for Python's built-in threading.Condition class.
372
373 This wrapper keeps a count of active waiters. We can't access the internal
374 "__waiters" attribute of threading.Condition because it's not thread-safe.
375
376 """
377 __slots__ = [
378 "_cond",
379 "_nwaiters",
380 ]
381
383 """Initializes this class.
384
385 """
386 object.__init__(self)
387 self._cond = threading.Condition(lock=lock)
388 self._nwaiters = 0
389
391 """Notifies the condition.
392
393 """
394 return self._cond.notifyAll()
395
396 - def wait(self, timeout=None):
397 """Waits for the condition to be notified.
398
399 @type timeout: float or None
400 @param timeout: Waiting timeout (can be None)
401
402 """
403 assert self._nwaiters >= 0
404
405 self._nwaiters += 1
406 try:
407 return self._cond.wait(timeout=timeout)
408 finally:
409 self._nwaiters -= 1
410
412 """Returns whether there are active waiters.
413
414 """
415 return bool(self._nwaiters)
416
419 """Implements a shared lock.
420
421 Multiple threads can acquire the lock in a shared way, calling
422 acquire_shared(). In order to acquire the lock in an exclusive way threads
423 can call acquire_exclusive().
424
425 The lock prevents starvation but does not guarantee that threads will acquire
426 the shared lock in the order they queued for it, just that they will
427 eventually do so.
428
429 """
430 __slots__ = [
431 "__active_shr_c",
432 "__inactive_shr_c",
433 "__deleted",
434 "__exc",
435 "__lock",
436 "__pending",
437 "__shr",
438 ]
439
440 __condition_class = PipeCondition
441
443 """Construct a new SharedLock.
444
445 """
446 object.__init__(self)
447
448
449 self.__lock = threading.Lock()
450
451
452 self.__pending = []
453
454
455 self.__active_shr_c = self.__condition_class(self.__lock)
456 self.__inactive_shr_c = self.__condition_class(self.__lock)
457
458
459 self.__shr = set()
460 self.__exc = None
461
462
463 self.__deleted = False
464
466 """Raises an exception if the lock has been deleted.
467
468 """
469 if self.__deleted:
470 raise errors.LockError("Deleted lock")
471
473 """Is the current thread sharing the lock at this time?
474
475 """
476 return threading.currentThread() in self.__shr
477
479 """Is the current thread holding the lock exclusively at this time?
480
481 """
482 return threading.currentThread() == self.__exc
483
485 """Is the current thread somehow owning the lock at this time?
486
487 This is a private version of the function, which presumes you're holding
488 the internal lock.
489
490 """
491 if shared < 0:
492 return self.__is_sharer() or self.__is_exclusive()
493 elif shared:
494 return self.__is_sharer()
495 else:
496 return self.__is_exclusive()
497
499 """Is the current thread somehow owning the lock at this time?
500
501 @param shared:
502 - < 0: check for any type of ownership (default)
503 - 0: check for exclusive ownership
504 - > 0: check for shared ownership
505
506 """
507 self.__lock.acquire()
508 try:
509 return self.__is_owned(shared=shared)
510 finally:
511 self.__lock.release()
512
514 """Returns the number of pending acquires.
515
516 @rtype: int
517
518 """
519 self.__lock.acquire()
520 try:
521 return len(self.__pending)
522 finally:
523 self.__lock.release()
524
526 """Actually acquire the lock.
527
528 """
529 if shared:
530 self.__shr.add(threading.currentThread())
531 else:
532 self.__exc = threading.currentThread()
533
535 """Determine whether lock can be acquired.
536
537 """
538 if shared:
539 return self.__exc is None
540 else:
541 return len(self.__shr) == 0 and self.__exc is None
542
544 """Checks whether the passed condition is on top of the queue.
545
546 The caller must make sure the queue isn't empty.
547
548 """
549 return self.__pending[0] == cond
550
552 """Acquire a shared lock.
553
554 @param shared: whether to acquire in shared mode; by default an
555 exclusive lock will be acquired
556 @param timeout: maximum waiting time before giving up
557
558 """
559 self.__check_deleted()
560
561
562 assert not self.__is_owned(), "double acquire() on a non-recursive lock"
563
564
565 if not self.__pending and self.__can_acquire(shared):
566
567 self.__do_acquire(shared)
568 return True
569
570 if shared:
571 wait_condition = self.__active_shr_c
572
573
574 if wait_condition not in self.__pending:
575 self.__pending.append(wait_condition)
576 else:
577 wait_condition = self.__condition_class(self.__lock)
578
579 self.__pending.append(wait_condition)
580
581 try:
582
583
584 while not (self.__is_on_top(wait_condition) and
585 self.__can_acquire(shared)):
586
587 wait_condition.wait(timeout)
588 self.__check_deleted()
589
590
591
592 if timeout is not None:
593 break
594
595 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
596 self.__do_acquire(shared)
597 return True
598 finally:
599
600 if not wait_condition.has_waiting() and not self.__deleted:
601 self.__pending.remove(wait_condition)
602
603 return False
604
605 - def acquire(self, shared=0, timeout=None, test_notify=None):
606 """Acquire a shared lock.
607
608 @type shared: integer (0/1) used as a boolean
609 @param shared: whether to acquire in shared mode; by default an
610 exclusive lock will be acquired
611 @type timeout: float
612 @param timeout: maximum waiting time before giving up
613 @type test_notify: callable or None
614 @param test_notify: Special callback function for unittesting
615
616 """
617 self.__lock.acquire()
618 try:
619
620 if __debug__ and callable(test_notify):
621 test_notify()
622
623 return self.__acquire_unlocked(shared, timeout)
624 finally:
625 self.__lock.release()
626
628 """Release a Shared Lock.
629
630 You must have acquired the lock, either in shared or in exclusive mode,
631 before calling this function.
632
633 """
634 self.__lock.acquire()
635 try:
636 assert self.__is_exclusive() or self.__is_sharer(), \
637 "Cannot release non-owned lock"
638
639
640 if self.__is_exclusive():
641 self.__exc = None
642 else:
643 self.__shr.remove(threading.currentThread())
644
645
646 if self.__pending:
647 first_condition = self.__pending[0]
648 first_condition.notifyAll()
649
650 if first_condition == self.__active_shr_c:
651 self.__active_shr_c = self.__inactive_shr_c
652 self.__inactive_shr_c = first_condition
653
654 finally:
655 self.__lock.release()
656
657 - def delete(self, timeout=None):
658 """Delete a Shared Lock.
659
660 This operation will declare the lock for removal. First the lock will be
661 acquired in exclusive mode if you don't already own it, then the lock
662 will be put in a state where any future and pending acquire() fail.
663
664 @type timeout: float
665 @param timeout: maximum waiting time before giving up
666
667 """
668 self.__lock.acquire()
669 try:
670 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
671
672 self.__check_deleted()
673
674
675 acquired = self.__is_exclusive()
676
677 if not acquired:
678 acquired = self.__acquire_unlocked(0, timeout)
679
680 assert self.__is_exclusive() and not self.__is_sharer(), \
681 "Lock wasn't acquired in exclusive mode"
682
683 if acquired:
684 self.__deleted = True
685 self.__exc = None
686
687
688 while self.__pending:
689 self.__pending.pop().notifyAll()
690
691 return acquired
692 finally:
693 self.__lock.release()
694
695
696
697
698 ALL_SET = None
702 """Internal exception to abort an acquire on a timeout.
703
704 """
705
708 """Implements a set of locks.
709
710 This abstraction implements a set of shared locks for the same resource type,
711 distinguished by name. The user can lock a subset of the resources and the
712 LockSet will take care of acquiring the locks always in the same order, thus
713 preventing deadlock.
714
715 All the locks needed in the same set must be acquired together, though.
716
717 """
719 """Constructs a new LockSet.
720
721 @type members: list of strings
722 @param members: initial members of the set
723
724 """
725
726 self.__lock = SharedLock()
727
728
729
730 self.__lockdict = {}
731
732 if members is not None:
733 for name in members:
734 self.__lockdict[name] = SharedLock()
735
736
737
738
739
740
741
742
743 self.__owners = {}
744
746 """Is the current thread a current level owner?"""
747 return threading.currentThread() in self.__owners
748
750 """Note the current thread owns the given lock"""
751 if name is None:
752 if not self._is_owned():
753 self.__owners[threading.currentThread()] = set()
754 else:
755 if self._is_owned():
756 self.__owners[threading.currentThread()].add(name)
757 else:
758 self.__owners[threading.currentThread()] = set([name])
759
761 """Note the current thread owns the given lock"""
762
763 assert not (name is None and self.__lock._is_owned()), \
764 "Cannot hold internal lock when deleting owner status"
765
766 if name is not None:
767 self.__owners[threading.currentThread()].remove(name)
768
769
770 if (not self.__lock._is_owned() and
771 not self.__owners[threading.currentThread()]):
772 del self.__owners[threading.currentThread()]
773
775 """Get the set of resource names owned by the current thread"""
776 if self._is_owned():
777 return self.__owners[threading.currentThread()].copy()
778 else:
779 return set()
780
782 """Release and delete all resources owned by the current thread"""
783 for lname in self._list_owned():
784 lock = self.__lockdict[lname]
785 if lock._is_owned():
786 lock.release()
787 self._del_owned(name=lname)
788
790 """Return the current set of names.
791
792 Only call this function while holding __lock and don't iterate on the
793 result after releasing the lock.
794
795 """
796 return self.__lockdict.keys()
797
799 """Return a copy of the current set of elements.
800
801 Used only for debugging purposes.
802
803 """
804
805
806 release_lock = False
807 if not self.__lock._is_owned():
808 release_lock = True
809 self.__lock.acquire(shared=1)
810 try:
811 result = self.__names()
812 finally:
813 if release_lock:
814 self.__lock.release()
815 return set(result)
816
817 - def acquire(self, names, timeout=None, shared=0, test_notify=None):
818 """Acquire a set of resource locks.
819
820 @type names: list of strings (or string)
821 @param names: the names of the locks which shall be acquired
822 (special lock names, or instance/node names)
823 @type shared: integer (0/1) used as a boolean
824 @param shared: whether to acquire in shared mode; by default an
825 exclusive lock will be acquired
826 @type timeout: float or None
827 @param timeout: Maximum time to acquire all locks
828 @type test_notify: callable or None
829 @param test_notify: Special callback function for unittesting
830
831 @return: Set of all locks successfully acquired or None in case of timeout
832
833 @raise errors.LockError: when any lock we try to acquire has
834 been deleted before we succeed. In this case none of the
835 locks requested will be acquired.
836
837 """
838 assert timeout is None or timeout >= 0.0
839
840
841 assert not self._is_owned(), "Cannot acquire locks in the same set twice"
842
843
844
845 running_timeout = RunningTimeout(timeout, False)
846
847 try:
848 if names is not None:
849
850 if isinstance(names, basestring):
851 names = [names]
852
853 return self.__acquire_inner(names, False, shared,
854 running_timeout.Remaining, test_notify)
855
856 else:
857
858
859
860
861
862
863
864
865
866 if not self.__lock.acquire(shared=shared,
867 timeout=running_timeout.Remaining()):
868 raise _AcquireTimeout()
869 try:
870
871 self._add_owned()
872
873 return self.__acquire_inner(self.__names(), True, shared,
874 running_timeout.Remaining, test_notify)
875 except:
876
877
878
879 self.__lock.release()
880 self._del_owned()
881 raise
882
883 except _AcquireTimeout:
884 return None
885
886 - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
887 """Inner logic for acquiring a number of locks.
888
889 @param names: Names of the locks to be acquired
890 @param want_all: Whether all locks in the set should be acquired
891 @param shared: Whether to acquire in shared mode
892 @param timeout_fn: Function returning remaining timeout
893 @param test_notify: Special callback function for unittesting
894
895 """
896 acquire_list = []
897
898
899
900
901
902 for lname in sorted(utils.UniqueSequence(names)):
903 try:
904 lock = self.__lockdict[lname]
905 except KeyError:
906 if want_all:
907
908
909 continue
910
911 raise errors.LockError("Non-existing lock in set (%s)" % lname)
912
913 acquire_list.append((lname, lock))
914
915
916 acquired = set()
917
918 try:
919
920
921
922
923
924 for (lname, lock) in acquire_list:
925 if __debug__ and callable(test_notify):
926 test_notify_fn = lambda: test_notify(lname)
927 else:
928 test_notify_fn = None
929
930 timeout = timeout_fn()
931
932 try:
933
934 acq_success = lock.acquire(shared=shared, timeout=timeout,
935 test_notify=test_notify_fn)
936 except errors.LockError:
937 if want_all:
938
939
940 continue
941
942 raise errors.LockError("Non-existing lock in set (%s)" % lname)
943
944 if not acq_success:
945
946 if timeout is None:
947
948
949 raise errors.LockError("Failed to get lock %s" % lname)
950
951 raise _AcquireTimeout()
952
953 try:
954
955 self._add_owned(name=lname)
956 acquired.add(lname)
957
958 except:
959
960
961
962 if lock._is_owned():
963 lock.release()
964 raise
965
966 except:
967
968 self._release_and_delete_owned()
969 raise
970
971 return acquired
972
974 """Release a set of resource locks, at the same level.
975
976 You must have acquired the locks, either in shared or in exclusive mode,
977 before releasing them.
978
979 @type names: list of strings, or None
980 @param names: the names of the locks which shall be released
981 (defaults to all the locks acquired at that level).
982
983 """
984 assert self._is_owned(), "release() on lock set while not owner"
985
986
987 if isinstance(names, basestring):
988 names = [names]
989
990 if names is None:
991 names = self._list_owned()
992 else:
993 names = set(names)
994 assert self._list_owned().issuperset(names), (
995 "release() on unheld resources %s" %
996 names.difference(self._list_owned()))
997
998
999
1000 if self.__lock._is_owned():
1001 self.__lock.release()
1002 self._del_owned()
1003
1004 for lockname in names:
1005
1006
1007 self.__lockdict[lockname].release()
1008 self._del_owned(name=lockname)
1009
1010 - def add(self, names, acquired=0, shared=0):
1011 """Add a new set of elements to the set
1012
1013 @type names: list of strings
1014 @param names: names of the new elements to add
1015 @type acquired: integer (0/1) used as a boolean
1016 @param acquired: pre-acquire the new resource?
1017 @type shared: integer (0/1) used as a boolean
1018 @param shared: is the pre-acquisition shared?
1019
1020 """
1021
1022 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1023 "Cannot add locks if the set is only partially owned, or shared"
1024
1025
1026 if isinstance(names, basestring):
1027 names = [names]
1028
1029
1030
1031 release_lock = False
1032 if not self.__lock._is_owned():
1033 release_lock = True
1034 self.__lock.acquire()
1035
1036 try:
1037 invalid_names = set(self.__names()).intersection(names)
1038 if invalid_names:
1039
1040
1041
1042 raise errors.LockError("duplicate add() (%s)" % invalid_names)
1043
1044 for lockname in names:
1045 lock = SharedLock()
1046
1047 if acquired:
1048 lock.acquire(shared=shared)
1049
1050 try:
1051 self._add_owned(name=lockname)
1052 except:
1053
1054
1055
1056
1057
1058
1059 lock.release()
1060 raise
1061
1062 self.__lockdict[lockname] = lock
1063
1064 finally:
1065
1066 if release_lock:
1067 self.__lock.release()
1068
1069 return True
1070
1072 """Remove elements from the lock set.
1073
1074 You can either not hold anything in the lockset or already hold a superset
1075 of the elements you want to delete, exclusively.
1076
1077 @type names: list of strings
1078 @param names: names of the resource to remove.
1079
1080 @return: a list of locks which we removed; the list is always
1081 equal to the names list if we were holding all the locks
1082 exclusively
1083
1084 """
1085
1086 if isinstance(names, basestring):
1087 names = [names]
1088
1089
1090
1091
1092 assert not self._is_owned() or self._list_owned().issuperset(names), (
1093 "remove() on acquired lockset while not owning all elements")
1094
1095 removed = []
1096
1097 for lname in names:
1098
1099
1100
1101
1102
1103 try:
1104 self.__lockdict[lname].delete()
1105 removed.append(lname)
1106 except (KeyError, errors.LockError):
1107
1108 assert not self._is_owned(), "remove failed while holding lockset"
1109 else:
1110
1111
1112
1113
1114
1115
1116
1117 del self.__lockdict[lname]
1118
1119 if self._is_owned():
1120 self._del_owned(name=lname)
1121
1122 return removed
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134 LEVEL_CLUSTER = 0
1135 LEVEL_INSTANCE = 1
1136 LEVEL_NODE = 2
1137
1138 LEVELS = [LEVEL_CLUSTER,
1139 LEVEL_INSTANCE,
1140 LEVEL_NODE]
1141
1142
1143 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1144
1145 LEVEL_NAMES = {
1146 LEVEL_CLUSTER: "cluster",
1147 LEVEL_INSTANCE: "instance",
1148 LEVEL_NODE: "node",
1149 }
1150
1151
1152 BGL = 'BGL'
1156 """The Ganeti Locking Library
1157
1158 The purpose of this small library is to manage locking for ganeti clusters
1159 in a central place, while at the same time doing dynamic checks against
1160 possible deadlocks. It will also make it easier to transition to a different
1161 lock type should we migrate away from python threads.
1162
1163 """
1164 _instance = None
1165
1166 - def __init__(self, nodes=None, instances=None):
1167 """Constructs a new GanetiLockManager object.
1168
1169 There should be only a GanetiLockManager object at any time, so this
1170 function raises an error if this is not the case.
1171
1172 @param nodes: list of node names
1173 @param instances: list of instance names
1174
1175 """
1176 assert self.__class__._instance is None, \
1177 "double GanetiLockManager instance"
1178
1179 self.__class__._instance = self
1180
1181
1182
1183 self.__keyring = {
1184 LEVEL_CLUSTER: LockSet([BGL]),
1185 LEVEL_NODE: LockSet(nodes),
1186 LEVEL_INSTANCE: LockSet(instances),
1187 }
1188
1190 """List the lock names at the given level.
1191
1192 This can be used for debugging/testing purposes.
1193
1194 @param level: the level whose list of locks to get
1195
1196 """
1197 assert level in LEVELS, "Invalid locking level %s" % level
1198 return self.__keyring[level]._names()
1199
1201 """Check whether we are owning locks at the given level
1202
1203 """
1204 return self.__keyring[level]._is_owned()
1205
1206 is_owned = _is_owned
1207
1209 """Get the set of owned locks at the given level
1210
1211 """
1212 return self.__keyring[level]._list_owned()
1213
1215 """Check that we don't own any lock at a level greater than the given one.
1216
1217 """
1218
1219
1220 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1221
1223 """Check if the current thread owns the BGL.
1224
1225 Both an exclusive or a shared acquisition work.
1226
1227 """
1228 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1229
1230 @staticmethod
1232 """Check if the level contains the BGL.
1233
1234 Check if acting on the given level and set of names will change
1235 the status of the Big Ganeti Lock.
1236
1237 """
1238 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1239
1240 - def acquire(self, level, names, timeout=None, shared=0):
1241 """Acquire a set of resource locks, at the same level.
1242
1243 @type level: member of locking.LEVELS
1244 @param level: the level at which the locks shall be acquired
1245 @type names: list of strings (or string)
1246 @param names: the names of the locks which shall be acquired
1247 (special lock names, or instance/node names)
1248 @type shared: integer (0/1) used as a boolean
1249 @param shared: whether to acquire in shared mode; by default
1250 an exclusive lock will be acquired
1251 @type timeout: float
1252 @param timeout: Maximum time to acquire all locks
1253
1254 """
1255 assert level in LEVELS, "Invalid locking level %s" % level
1256
1257
1258
1259
1260
1261
1262
1263 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1264 "You must own the Big Ganeti Lock before acquiring any other")
1265
1266
1267 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1268 " while owning some at a greater one")
1269
1270
1271 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1272
1273 - def release(self, level, names=None):
1274 """Release a set of resource locks, at the same level.
1275
1276 You must have acquired the locks, either in shared or in exclusive
1277 mode, before releasing them.
1278
1279 @type level: member of locking.LEVELS
1280 @param level: the level at which the locks shall be released
1281 @type names: list of strings, or None
1282 @param names: the names of the locks which shall be released
1283 (defaults to all the locks acquired at that level)
1284
1285 """
1286 assert level in LEVELS, "Invalid locking level %s" % level
1287 assert (not self._contains_BGL(level, names) or
1288 not self._upper_owned(LEVEL_CLUSTER)), (
1289 "Cannot release the Big Ganeti Lock while holding something"
1290 " at upper levels (%r)" %
1291 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1292 for i in self.__keyring.keys()]), ))
1293
1294
1295 return self.__keyring[level].release(names)
1296
1297 - def add(self, level, names, acquired=0, shared=0):
1298 """Add locks at the specified level.
1299
1300 @type level: member of locking.LEVELS_MOD
1301 @param level: the level at which the locks shall be added
1302 @type names: list of strings
1303 @param names: names of the locks to acquire
1304 @type acquired: integer (0/1) used as a boolean
1305 @param acquired: whether to acquire the newly added locks
1306 @type shared: integer (0/1) used as a boolean
1307 @param shared: whether the acquisition will be shared
1308
1309 """
1310 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1311 assert self._BGL_owned(), ("You must own the BGL before performing other"
1312 " operations")
1313 assert not self._upper_owned(level), ("Cannot add locks at a level"
1314 " while owning some at a greater one")
1315 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1316
1317 - def remove(self, level, names):
1318 """Remove locks from the specified level.
1319
1320 You must either already own the locks you are trying to remove
1321 exclusively or not own any lock at an upper level.
1322
1323 @type level: member of locking.LEVELS_MOD
1324 @param level: the level at which the locks shall be removed
1325 @type names: list of strings
1326 @param names: the names of the locks which shall be removed
1327 (special lock names, or instance/node names)
1328
1329 """
1330 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1331 assert self._BGL_owned(), ("You must own the BGL before performing other"
1332 " operations")
1333
1334
1335
1336 assert self._is_owned(level) or not self._upper_owned(level), (
1337 "Cannot remove locks at a level while not owning it or"
1338 " owning some at a greater one")
1339 return self.__keyring[level].remove(names)
1340