1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import time
32 import errno
33 import weakref
34 import logging
35 import heapq
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import compat
40
41
42 _EXCLUSIVE_TEXT = "exclusive"
43 _SHARED_TEXT = "shared"
44
45 _DEFAULT_PRIORITY = 0
49 """Shared Synchronization decorator.
50
51 Calls the function holding the given lock, either in exclusive or shared
52 mode. It requires the passed lock to be a SharedLock (or support its
53 semantics).
54
55 @type mylock: lockable object or string
56 @param mylock: lock to acquire or class member name of the lock to acquire
57
58 """
59 def wrap(fn):
60 def sync_function(*args, **kwargs):
61 if isinstance(mylock, basestring):
62 assert args, "cannot ssynchronize on non-class method: self not found"
63
64 lock = getattr(args[0], mylock)
65 else:
66 lock = mylock
67 lock.acquire(shared=shared)
68 try:
69 return fn(*args, **kwargs)
70 finally:
71 lock.release()
72 return sync_function
73 return wrap
74
77 """Class to calculate remaining timeout when doing several operations.
78
79 """
80 __slots__ = [
81 "_allow_negative",
82 "_start_time",
83 "_time_fn",
84 "_timeout",
85 ]
86
87 - def __init__(self, timeout, allow_negative, _time_fn=time.time):
88 """Initializes this class.
89
90 @type timeout: float
91 @param timeout: Timeout duration
92 @type allow_negative: bool
93 @param allow_negative: Whether to return values below zero
94 @param _time_fn: Time function for unittests
95
96 """
97 object.__init__(self)
98
99 if timeout is not None and timeout < 0.0:
100 raise ValueError("Timeout must not be negative")
101
102 self._timeout = timeout
103 self._allow_negative = allow_negative
104 self._time_fn = _time_fn
105
106 self._start_time = None
107
108 - def Remaining(self):
109 """Returns the remaining timeout.
110
111 """
112 if self._timeout is None:
113 return None
114
115
116 if self._start_time is None:
117 self._start_time = self._time_fn()
118
119
120 remaining_timeout = self._start_time + self._timeout - self._time_fn()
121
122 if not self._allow_negative:
123
124 return max(0.0, remaining_timeout)
125
126 return remaining_timeout
127
130 """Helper class for SingleNotifyPipeCondition
131
132 """
133 __slots__ = [
134 "_fd",
135 "_poller",
136 ]
137
139 """Constructor for _SingleNotifyPipeConditionWaiter
140
141 @type poller: select.poll
142 @param poller: Poller object
143 @type fd: int
144 @param fd: File descriptor to wait for
145
146 """
147 object.__init__(self)
148 self._poller = poller
149 self._fd = fd
150
152 """Wait for something to happen on the pipe.
153
154 @type timeout: float or None
155 @param timeout: Timeout for waiting (can be None)
156
157 """
158 running_timeout = RunningTimeout(timeout, True)
159
160 while True:
161 remaining_time = running_timeout.Remaining()
162
163 if remaining_time is not None:
164 if remaining_time < 0.0:
165 break
166
167
168 remaining_time *= 1000
169
170 try:
171 result = self._poller.poll(remaining_time)
172 except EnvironmentError, err:
173 if err.errno != errno.EINTR:
174 raise
175 result = None
176
177
178 if result and result[0][0] == self._fd:
179 break
180
183 """Base class containing common code for conditions.
184
185 Some of this code is taken from python's threading module.
186
187 """
188 __slots__ = [
189 "_lock",
190 "acquire",
191 "release",
192 "_is_owned",
193 "_acquire_restore",
194 "_release_save",
195 ]
196
224
226 """Check whether lock is owned by current thread.
227
228 """
229 if self._lock.acquire(0):
230 self._lock.release()
231 return False
232 return True
233
236
239
241 """Raise an exception if the current thread doesn't own the lock.
242
243 """
244 if not self._is_owned():
245 raise RuntimeError("cannot work with un-aquired lock")
246
249 """Condition which can only be notified once.
250
251 This condition class uses pipes and poll, internally, to be able to wait for
252 notification with a timeout, without resorting to polling. It is almost
253 compatible with Python's threading.Condition, with the following differences:
254 - notifyAll can only be called once, and no wait can happen after that
255 - notify is not supported, only notifyAll
256
257 """
258
259 __slots__ = [
260 "_poller",
261 "_read_fd",
262 "_write_fd",
263 "_nwaiters",
264 "_notified",
265 ]
266
267 _waiter_class = _SingleNotifyPipeConditionWaiter
268
270 """Constructor for SingleNotifyPipeCondition
271
272 """
273 _BaseCondition.__init__(self, lock)
274 self._nwaiters = 0
275 self._notified = False
276 self._read_fd = None
277 self._write_fd = None
278 self._poller = None
279
281 """Throws an exception if already notified.
282
283 """
284 if self._notified:
285 raise RuntimeError("cannot use already notified condition")
286
288 """Cleanup open file descriptors, if any.
289
290 """
291 if self._read_fd is not None:
292 os.close(self._read_fd)
293 self._read_fd = None
294
295 if self._write_fd is not None:
296 os.close(self._write_fd)
297 self._write_fd = None
298 self._poller = None
299
300 - def wait(self, timeout=None):
301 """Wait for a notification.
302
303 @type timeout: float or None
304 @param timeout: Waiting timeout (can be None)
305
306 """
307 self._check_owned()
308 self._check_unnotified()
309
310 self._nwaiters += 1
311 try:
312 if self._poller is None:
313 (self._read_fd, self._write_fd) = os.pipe()
314 self._poller = select.poll()
315 self._poller.register(self._read_fd, select.POLLHUP)
316
317 wait_fn = self._waiter_class(self._poller, self._read_fd)
318 state = self._release_save()
319 try:
320
321 wait_fn(timeout)
322 finally:
323
324 self._acquire_restore(state)
325 finally:
326 self._nwaiters -= 1
327 if self._nwaiters == 0:
328 self._Cleanup()
329
331 """Close the writing side of the pipe to notify all waiters.
332
333 """
334 self._check_owned()
335 self._check_unnotified()
336 self._notified = True
337 if self._write_fd is not None:
338 os.close(self._write_fd)
339 self._write_fd = None
340
343 """Group-only non-polling condition with counters.
344
345 This condition class uses pipes and poll, internally, to be able to wait for
346 notification with a timeout, without resorting to polling. It is almost
347 compatible with Python's threading.Condition, but only supports notifyAll and
348 non-recursive locks. As an additional features it's able to report whether
349 there are any waiting threads.
350
351 """
352 __slots__ = [
353 "_waiters",
354 "_single_condition",
355 ]
356
357 _single_condition_class = SingleNotifyPipeCondition
358
360 """Initializes this class.
361
362 """
363 _BaseCondition.__init__(self, lock)
364 self._waiters = set()
365 self._single_condition = self._single_condition_class(self._lock)
366
367 - def wait(self, timeout=None):
368 """Wait for a notification.
369
370 @type timeout: float or None
371 @param timeout: Waiting timeout (can be None)
372
373 """
374 self._check_owned()
375
376
377
378 cond = self._single_condition
379
380 self._waiters.add(threading.currentThread())
381 try:
382 cond.wait(timeout)
383 finally:
384 self._check_owned()
385 self._waiters.remove(threading.currentThread())
386
388 """Notify all currently waiting threads.
389
390 """
391 self._check_owned()
392 self._single_condition.notifyAll()
393 self._single_condition = self._single_condition_class(self._lock)
394
396 """Returns a list of all waiting threads.
397
398 """
399 self._check_owned()
400
401 return self._waiters
402
404 """Returns whether there are active waiters.
405
406 """
407 self._check_owned()
408
409 return bool(self._waiters)
410
413 __slots__ = [
414 "shared",
415 ]
416
418 """Initializes this class.
419
420 """
421 self.shared = shared
422 PipeCondition.__init__(self, lock)
423
426 """Implements a shared lock.
427
428 Multiple threads can acquire the lock in a shared way by calling
429 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
430 threads can call C{acquire(shared=0)}.
431
432 Notes on data structures: C{__pending} contains a priority queue (heapq) of
433 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434 ...]}. Each per-priority queue contains a normal in-order list of conditions
435 to be notified when the lock can be acquired. Shared locks are grouped
436 together by priority and the condition for them is stored in
437 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438 references for the per-priority queues indexed by priority for faster access.
439
440 @type name: string
441 @ivar name: the name of the lock
442
443 """
444 __slots__ = [
445 "__weakref__",
446 "__deleted",
447 "__exc",
448 "__lock",
449 "__pending",
450 "__pending_by_prio",
451 "__pending_shared",
452 "__shr",
453 "name",
454 ]
455
456 __condition_class = _PipeConditionWithMode
457
458 - def __init__(self, name, monitor=None):
459 """Construct a new SharedLock.
460
461 @param name: the name of the lock
462 @type monitor: L{LockMonitor}
463 @param monitor: Lock monitor with which to register
464
465 """
466 object.__init__(self)
467
468 self.name = name
469
470
471 self.__lock = threading.Lock()
472
473
474 self.__pending = []
475 self.__pending_by_prio = {}
476 self.__pending_shared = {}
477
478
479 self.__shr = set()
480 self.__exc = None
481
482
483 self.__deleted = False
484
485
486 if monitor:
487 monitor.RegisterLock(self)
488
490 """Retrieves information for querying locks.
491
492 @type fields: list of strings
493 @param fields: List of fields to return
494
495 """
496 self.__lock.acquire()
497 try:
498 info = []
499
500
501
502
503 for fname in fields:
504 if fname == "name":
505 info.append(self.name)
506 elif fname == "mode":
507 if self.__deleted:
508 info.append("deleted")
509 assert not (self.__exc or self.__shr)
510 elif self.__exc:
511 info.append(_EXCLUSIVE_TEXT)
512 elif self.__shr:
513 info.append(_SHARED_TEXT)
514 else:
515 info.append(None)
516 elif fname == "owner":
517 if self.__exc:
518 owner = [self.__exc]
519 else:
520 owner = self.__shr
521
522 if owner:
523 assert not self.__deleted
524 info.append([i.getName() for i in owner])
525 else:
526 info.append(None)
527 elif fname == "pending":
528 data = []
529
530
531 for (_, prioqueue) in sorted(self.__pending):
532 for cond in prioqueue:
533 if cond.shared:
534 mode = _SHARED_TEXT
535 else:
536 mode = _EXCLUSIVE_TEXT
537
538
539
540 data.append((mode, sorted(i.getName()
541 for i in cond.get_waiting())))
542
543 info.append(data)
544 else:
545 raise errors.OpExecError("Invalid query field '%s'" % fname)
546
547 return info
548 finally:
549 self.__lock.release()
550
552 """Raises an exception if the lock has been deleted.
553
554 """
555 if self.__deleted:
556 raise errors.LockError("Deleted lock %s" % self.name)
557
559 """Is the current thread sharing the lock at this time?
560
561 """
562 return threading.currentThread() in self.__shr
563
565 """Is the current thread holding the lock exclusively at this time?
566
567 """
568 return threading.currentThread() == self.__exc
569
571 """Is the current thread somehow owning the lock at this time?
572
573 This is a private version of the function, which presumes you're holding
574 the internal lock.
575
576 """
577 if shared < 0:
578 return self.__is_sharer() or self.__is_exclusive()
579 elif shared:
580 return self.__is_sharer()
581 else:
582 return self.__is_exclusive()
583
585 """Is the current thread somehow owning the lock at this time?
586
587 @param shared:
588 - < 0: check for any type of ownership (default)
589 - 0: check for exclusive ownership
590 - > 0: check for shared ownership
591
592 """
593 self.__lock.acquire()
594 try:
595 return self.__is_owned(shared=shared)
596 finally:
597 self.__lock.release()
598
600 """Returns the number of pending acquires.
601
602 @rtype: int
603
604 """
605 self.__lock.acquire()
606 try:
607 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608 finally:
609 self.__lock.release()
610
612 """Checks whether there are any pending acquires.
613
614 @rtype: bool
615
616 """
617 self.__lock.acquire()
618 try:
619
620 return not (self.__find_first_pending_queue() or
621 self.__pending or
622 self.__pending_by_prio or
623 self.__pending_shared)
624 finally:
625 self.__lock.release()
626
628 """Actually acquire the lock.
629
630 """
631 if shared:
632 self.__shr.add(threading.currentThread())
633 else:
634 self.__exc = threading.currentThread()
635
637 """Determine whether lock can be acquired.
638
639 """
640 if shared:
641 return self.__exc is None
642 else:
643 return len(self.__shr) == 0 and self.__exc is None
644
646 """Tries to find the topmost queued entry with pending acquires.
647
648 Removes empty entries while going through the list.
649
650 """
651 while self.__pending:
652 (priority, prioqueue) = self.__pending[0]
653
654 if not prioqueue:
655 heapq.heappop(self.__pending)
656 del self.__pending_by_prio[priority]
657 assert priority not in self.__pending_shared
658 continue
659
660 if prioqueue:
661 return prioqueue
662
663 return None
664
666 """Checks whether the passed condition is on top of the queue.
667
668 The caller must make sure the queue isn't empty.
669
670 """
671 return cond == self.__find_first_pending_queue()[0]
672
674 """Acquire a shared lock.
675
676 @param shared: whether to acquire in shared mode; by default an
677 exclusive lock will be acquired
678 @param timeout: maximum waiting time before giving up
679 @type priority: integer
680 @param priority: Priority for acquiring lock
681
682 """
683 self.__check_deleted()
684
685
686 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
687 " %s" % self.name)
688
689
690 self.__find_first_pending_queue()
691
692
693 if not self.__pending and self.__can_acquire(shared):
694
695 self.__do_acquire(shared)
696 return True
697
698 prioqueue = self.__pending_by_prio.get(priority, None)
699
700 if shared:
701
702 wait_condition = self.__pending_shared.get(priority, None)
703 assert (wait_condition is None or
704 (wait_condition.shared and wait_condition in prioqueue))
705 else:
706 wait_condition = None
707
708 if wait_condition is None:
709 if prioqueue is None:
710 assert priority not in self.__pending_by_prio
711
712 prioqueue = []
713 heapq.heappush(self.__pending, (priority, prioqueue))
714 self.__pending_by_prio[priority] = prioqueue
715
716 wait_condition = self.__condition_class(self.__lock, shared)
717 prioqueue.append(wait_condition)
718
719 if shared:
720
721
722 assert priority not in self.__pending_shared
723 self.__pending_shared[priority] = wait_condition
724
725 try:
726
727
728
729 while not (self.__is_on_top(wait_condition) and
730 self.__can_acquire(shared)):
731
732 wait_condition.wait(timeout)
733 self.__check_deleted()
734
735
736
737 if timeout is not None:
738 break
739
740 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
741 self.__do_acquire(shared)
742 return True
743 finally:
744
745 if not wait_condition.has_waiting():
746 prioqueue.remove(wait_condition)
747 if wait_condition.shared:
748 del self.__pending_shared[priority]
749
750 return False
751
752 - def acquire(self, shared=0, timeout=None, priority=None,
753 test_notify=None):
754 """Acquire a shared lock.
755
756 @type shared: integer (0/1) used as a boolean
757 @param shared: whether to acquire in shared mode; by default an
758 exclusive lock will be acquired
759 @type timeout: float
760 @param timeout: maximum waiting time before giving up
761 @type priority: integer
762 @param priority: Priority for acquiring lock
763 @type test_notify: callable or None
764 @param test_notify: Special callback function for unittesting
765
766 """
767 if priority is None:
768 priority = _DEFAULT_PRIORITY
769
770 self.__lock.acquire()
771 try:
772
773 if __debug__ and callable(test_notify):
774 test_notify()
775
776 return self.__acquire_unlocked(shared, timeout, priority)
777 finally:
778 self.__lock.release()
779
781 """Release a Shared Lock.
782
783 You must have acquired the lock, either in shared or in exclusive mode,
784 before calling this function.
785
786 """
787 self.__lock.acquire()
788 try:
789 assert self.__is_exclusive() or self.__is_sharer(), \
790 "Cannot release non-owned lock"
791
792
793 if self.__is_exclusive():
794 self.__exc = None
795 else:
796 self.__shr.remove(threading.currentThread())
797
798
799 prioqueue = self.__find_first_pending_queue()
800 if prioqueue:
801 prioqueue[0].notifyAll()
802
803 finally:
804 self.__lock.release()
805
806 - def delete(self, timeout=None, priority=None):
807 """Delete a Shared Lock.
808
809 This operation will declare the lock for removal. First the lock will be
810 acquired in exclusive mode if you don't already own it, then the lock
811 will be put in a state where any future and pending acquire() fail.
812
813 @type timeout: float
814 @param timeout: maximum waiting time before giving up
815 @type priority: integer
816 @param priority: Priority for acquiring lock
817
818 """
819 if priority is None:
820 priority = _DEFAULT_PRIORITY
821
822 self.__lock.acquire()
823 try:
824 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
825
826 self.__check_deleted()
827
828
829 acquired = self.__is_exclusive()
830
831 if not acquired:
832 acquired = self.__acquire_unlocked(0, timeout, priority)
833
834 assert self.__is_exclusive() and not self.__is_sharer(), \
835 "Lock wasn't acquired in exclusive mode"
836
837 if acquired:
838 self.__deleted = True
839 self.__exc = None
840
841 assert not (self.__exc or self.__shr), "Found owner during deletion"
842
843
844 for (_, prioqueue) in self.__pending:
845 for cond in prioqueue:
846 cond.notifyAll()
847
848 assert self.__deleted
849
850 return acquired
851 finally:
852 self.__lock.release()
853
858
861
862
863
864
865 ALL_SET = None
869 """Internal exception to abort an acquire on a timeout.
870
871 """
872
875 """Implements a set of locks.
876
877 This abstraction implements a set of shared locks for the same resource type,
878 distinguished by name. The user can lock a subset of the resources and the
879 LockSet will take care of acquiring the locks always in the same order, thus
880 preventing deadlock.
881
882 All the locks needed in the same set must be acquired together, though.
883
884 @type name: string
885 @ivar name: the name of the lockset
886
887 """
888 - def __init__(self, members, name, monitor=None):
889 """Constructs a new LockSet.
890
891 @type members: list of strings
892 @param members: initial members of the set
893 @type monitor: L{LockMonitor}
894 @param monitor: Lock monitor with which to register member locks
895
896 """
897 assert members is not None, "members parameter is not a list"
898 self.name = name
899
900
901 self.__monitor = monitor
902
903
904 self.__lock = SharedLock(name)
905
906
907
908 self.__lockdict = {}
909
910 for mname in members:
911 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
912 monitor=monitor)
913
914
915
916
917
918
919
920
921 self.__owners = {}
922
924 """Returns the name for a member lock.
925
926 """
927 return "%s/%s" % (self.name, mname)
928
930 """Is the current thread a current level owner?"""
931 return threading.currentThread() in self.__owners
932
934 """Note the current thread owns the given lock"""
935 if name is None:
936 if not self._is_owned():
937 self.__owners[threading.currentThread()] = set()
938 else:
939 if self._is_owned():
940 self.__owners[threading.currentThread()].add(name)
941 else:
942 self.__owners[threading.currentThread()] = set([name])
943
945 """Note the current thread owns the given lock"""
946
947 assert not (name is None and self.__lock._is_owned()), \
948 "Cannot hold internal lock when deleting owner status"
949
950 if name is not None:
951 self.__owners[threading.currentThread()].remove(name)
952
953
954 if (not self.__lock._is_owned() and
955 not self.__owners[threading.currentThread()]):
956 del self.__owners[threading.currentThread()]
957
959 """Get the set of resource names owned by the current thread"""
960 if self._is_owned():
961 return self.__owners[threading.currentThread()].copy()
962 else:
963 return set()
964
966 """Release and delete all resources owned by the current thread"""
967 for lname in self._list_owned():
968 lock = self.__lockdict[lname]
969 if lock._is_owned():
970 lock.release()
971 self._del_owned(name=lname)
972
974 """Return the current set of names.
975
976 Only call this function while holding __lock and don't iterate on the
977 result after releasing the lock.
978
979 """
980 return self.__lockdict.keys()
981
983 """Return a copy of the current set of elements.
984
985 Used only for debugging purposes.
986
987 """
988
989
990 release_lock = False
991 if not self.__lock._is_owned():
992 release_lock = True
993 self.__lock.acquire(shared=1)
994 try:
995 result = self.__names()
996 finally:
997 if release_lock:
998 self.__lock.release()
999 return set(result)
1000
1001 - def acquire(self, names, timeout=None, shared=0, priority=None,
1002 test_notify=None):
1003 """Acquire a set of resource locks.
1004
1005 @type names: list of strings (or string)
1006 @param names: the names of the locks which shall be acquired
1007 (special lock names, or instance/node names)
1008 @type shared: integer (0/1) used as a boolean
1009 @param shared: whether to acquire in shared mode; by default an
1010 exclusive lock will be acquired
1011 @type timeout: float or None
1012 @param timeout: Maximum time to acquire all locks
1013 @type priority: integer
1014 @param priority: Priority for acquiring locks
1015 @type test_notify: callable or None
1016 @param test_notify: Special callback function for unittesting
1017
1018 @return: Set of all locks successfully acquired or None in case of timeout
1019
1020 @raise errors.LockError: when any lock we try to acquire has
1021 been deleted before we succeed. In this case none of the
1022 locks requested will be acquired.
1023
1024 """
1025 assert timeout is None or timeout >= 0.0
1026
1027
1028 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1029 " (lockset %s)" % self.name)
1030
1031 if priority is None:
1032 priority = _DEFAULT_PRIORITY
1033
1034
1035
1036 running_timeout = RunningTimeout(timeout, False)
1037
1038 try:
1039 if names is not None:
1040
1041 if isinstance(names, basestring):
1042 names = [names]
1043
1044 return self.__acquire_inner(names, False, shared, priority,
1045 running_timeout.Remaining, test_notify)
1046
1047 else:
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057 if not self.__lock.acquire(shared=shared, priority=priority,
1058 timeout=running_timeout.Remaining()):
1059 raise _AcquireTimeout()
1060 try:
1061
1062 self._add_owned()
1063
1064 return self.__acquire_inner(self.__names(), True, shared, priority,
1065 running_timeout.Remaining, test_notify)
1066 except:
1067
1068
1069
1070 self.__lock.release()
1071 self._del_owned()
1072 raise
1073
1074 except _AcquireTimeout:
1075 return None
1076
1077 - def __acquire_inner(self, names, want_all, shared, priority,
1078 timeout_fn, test_notify):
1079 """Inner logic for acquiring a number of locks.
1080
1081 @param names: Names of the locks to be acquired
1082 @param want_all: Whether all locks in the set should be acquired
1083 @param shared: Whether to acquire in shared mode
1084 @param timeout_fn: Function returning remaining timeout
1085 @param priority: Priority for acquiring locks
1086 @param test_notify: Special callback function for unittesting
1087
1088 """
1089 acquire_list = []
1090
1091
1092
1093
1094
1095 for lname in sorted(utils.UniqueSequence(names)):
1096 try:
1097 lock = self.__lockdict[lname]
1098 except KeyError:
1099 if want_all:
1100
1101
1102 continue
1103
1104 raise errors.LockError("Non-existing lock %s in set %s" %
1105 (lname, self.name))
1106
1107 acquire_list.append((lname, lock))
1108
1109
1110 acquired = set()
1111
1112 try:
1113
1114
1115
1116
1117
1118 for (lname, lock) in acquire_list:
1119 if __debug__ and callable(test_notify):
1120 test_notify_fn = lambda: test_notify(lname)
1121 else:
1122 test_notify_fn = None
1123
1124 timeout = timeout_fn()
1125
1126 try:
1127
1128 acq_success = lock.acquire(shared=shared, timeout=timeout,
1129 priority=priority,
1130 test_notify=test_notify_fn)
1131 except errors.LockError:
1132 if want_all:
1133
1134
1135 continue
1136
1137 raise errors.LockError("Non-existing lock %s in set %s" %
1138 (lname, self.name))
1139
1140 if not acq_success:
1141
1142 if timeout is None:
1143
1144
1145 raise errors.LockError("Failed to get lock %s (set %s)" %
1146 (lname, self.name))
1147
1148 raise _AcquireTimeout()
1149
1150 try:
1151
1152 self._add_owned(name=lname)
1153 acquired.add(lname)
1154
1155 except:
1156
1157
1158
1159 if lock._is_owned():
1160 lock.release()
1161 raise
1162
1163 except:
1164
1165 self._release_and_delete_owned()
1166 raise
1167
1168 return acquired
1169
1171 """Release a set of resource locks, at the same level.
1172
1173 You must have acquired the locks, either in shared or in exclusive mode,
1174 before releasing them.
1175
1176 @type names: list of strings, or None
1177 @param names: the names of the locks which shall be released
1178 (defaults to all the locks acquired at that level).
1179
1180 """
1181 assert self._is_owned(), ("release() on lock set %s while not owner" %
1182 self.name)
1183
1184
1185 if isinstance(names, basestring):
1186 names = [names]
1187
1188 if names is None:
1189 names = self._list_owned()
1190 else:
1191 names = set(names)
1192 assert self._list_owned().issuperset(names), (
1193 "release() on unheld resources %s (set %s)" %
1194 (names.difference(self._list_owned()), self.name))
1195
1196
1197
1198 if self.__lock._is_owned():
1199 self.__lock.release()
1200 self._del_owned()
1201
1202 for lockname in names:
1203
1204
1205 self.__lockdict[lockname].release()
1206 self._del_owned(name=lockname)
1207
1208 - def add(self, names, acquired=0, shared=0):
1209 """Add a new set of elements to the set
1210
1211 @type names: list of strings
1212 @param names: names of the new elements to add
1213 @type acquired: integer (0/1) used as a boolean
1214 @param acquired: pre-acquire the new resource?
1215 @type shared: integer (0/1) used as a boolean
1216 @param shared: is the pre-acquisition shared?
1217
1218 """
1219
1220 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1221 ("Cannot add locks if the set %s is only partially owned, or shared" %
1222 self.name)
1223
1224
1225 if isinstance(names, basestring):
1226 names = [names]
1227
1228
1229
1230 release_lock = False
1231 if not self.__lock._is_owned():
1232 release_lock = True
1233 self.__lock.acquire()
1234
1235 try:
1236 invalid_names = set(self.__names()).intersection(names)
1237 if invalid_names:
1238
1239
1240
1241 raise errors.LockError("duplicate add(%s) on lockset %s" %
1242 (invalid_names, self.name))
1243
1244 for lockname in names:
1245 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1246
1247 if acquired:
1248
1249
1250 lock.acquire(shared=shared)
1251
1252 try:
1253 self._add_owned(name=lockname)
1254 except:
1255
1256
1257
1258
1259
1260
1261 lock.release()
1262 raise
1263
1264 self.__lockdict[lockname] = lock
1265
1266 finally:
1267
1268 if release_lock:
1269 self.__lock.release()
1270
1271 return True
1272
1274 """Remove elements from the lock set.
1275
1276 You can either not hold anything in the lockset or already hold a superset
1277 of the elements you want to delete, exclusively.
1278
1279 @type names: list of strings
1280 @param names: names of the resource to remove.
1281
1282 @return: a list of locks which we removed; the list is always
1283 equal to the names list if we were holding all the locks
1284 exclusively
1285
1286 """
1287
1288 if isinstance(names, basestring):
1289 names = [names]
1290
1291
1292
1293
1294 assert not self._is_owned() or self._list_owned().issuperset(names), (
1295 "remove() on acquired lockset %s while not owning all elements" %
1296 self.name)
1297
1298 removed = []
1299
1300 for lname in names:
1301
1302
1303
1304
1305
1306 try:
1307 self.__lockdict[lname].delete()
1308 removed.append(lname)
1309 except (KeyError, errors.LockError):
1310
1311 assert not self._is_owned(), ("remove failed while holding lockset %s"
1312 % self.name)
1313 else:
1314
1315
1316
1317
1318
1319
1320
1321 del self.__lockdict[lname]
1322
1323 if self._is_owned():
1324 self._del_owned(name=lname)
1325
1326 return removed
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338 LEVEL_CLUSTER = 0
1339 LEVEL_INSTANCE = 1
1340 LEVEL_NODE = 2
1341
1342 LEVELS = [LEVEL_CLUSTER,
1343 LEVEL_INSTANCE,
1344 LEVEL_NODE]
1345
1346
1347 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1348
1349 LEVEL_NAMES = {
1350 LEVEL_CLUSTER: "cluster",
1351 LEVEL_INSTANCE: "instance",
1352 LEVEL_NODE: "node",
1353 }
1354
1355
1356 BGL = 'BGL'
1360 """The Ganeti Locking Library
1361
1362 The purpose of this small library is to manage locking for ganeti clusters
1363 in a central place, while at the same time doing dynamic checks against
1364 possible deadlocks. It will also make it easier to transition to a different
1365 lock type should we migrate away from python threads.
1366
1367 """
1368 _instance = None
1369
1371 """Constructs a new GanetiLockManager object.
1372
1373 There should be only a GanetiLockManager object at any time, so this
1374 function raises an error if this is not the case.
1375
1376 @param nodes: list of node names
1377 @param instances: list of instance names
1378
1379 """
1380 assert self.__class__._instance is None, \
1381 "double GanetiLockManager instance"
1382
1383 self.__class__._instance = self
1384
1385 self._monitor = LockMonitor()
1386
1387
1388
1389 self.__keyring = {
1390 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1391 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1392 LEVEL_INSTANCE: LockSet(instances, "instances",
1393 monitor=self._monitor),
1394 }
1395
1397 """Queries information from all locks.
1398
1399 See L{LockMonitor.QueryLocks}.
1400
1401 """
1402 return self._monitor.QueryLocks(fields, sync)
1403
1405 """List the lock names at the given level.
1406
1407 This can be used for debugging/testing purposes.
1408
1409 @param level: the level whose list of locks to get
1410
1411 """
1412 assert level in LEVELS, "Invalid locking level %s" % level
1413 return self.__keyring[level]._names()
1414
1416 """Check whether we are owning locks at the given level
1417
1418 """
1419 return self.__keyring[level]._is_owned()
1420
1421 is_owned = _is_owned
1422
1424 """Get the set of owned locks at the given level
1425
1426 """
1427 return self.__keyring[level]._list_owned()
1428
1430 """Check that we don't own any lock at a level greater than the given one.
1431
1432 """
1433
1434
1435 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1436
1438 """Check if the current thread owns the BGL.
1439
1440 Both an exclusive or a shared acquisition work.
1441
1442 """
1443 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1444
1445 @staticmethod
1447 """Check if the level contains the BGL.
1448
1449 Check if acting on the given level and set of names will change
1450 the status of the Big Ganeti Lock.
1451
1452 """
1453 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1454
1455 - def acquire(self, level, names, timeout=None, shared=0, priority=None):
1456 """Acquire a set of resource locks, at the same level.
1457
1458 @type level: member of locking.LEVELS
1459 @param level: the level at which the locks shall be acquired
1460 @type names: list of strings (or string)
1461 @param names: the names of the locks which shall be acquired
1462 (special lock names, or instance/node names)
1463 @type shared: integer (0/1) used as a boolean
1464 @param shared: whether to acquire in shared mode; by default
1465 an exclusive lock will be acquired
1466 @type timeout: float
1467 @param timeout: Maximum time to acquire all locks
1468 @type priority: integer
1469 @param priority: Priority for acquiring lock
1470
1471 """
1472 assert level in LEVELS, "Invalid locking level %s" % level
1473
1474
1475
1476
1477
1478
1479
1480 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1481 "You must own the Big Ganeti Lock before acquiring any other")
1482
1483
1484 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1485 " while owning some at a greater one")
1486
1487
1488 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1489 priority=priority)
1490
1491 - def release(self, level, names=None):
1492 """Release a set of resource locks, at the same level.
1493
1494 You must have acquired the locks, either in shared or in exclusive
1495 mode, before releasing them.
1496
1497 @type level: member of locking.LEVELS
1498 @param level: the level at which the locks shall be released
1499 @type names: list of strings, or None
1500 @param names: the names of the locks which shall be released
1501 (defaults to all the locks acquired at that level)
1502
1503 """
1504 assert level in LEVELS, "Invalid locking level %s" % level
1505 assert (not self._contains_BGL(level, names) or
1506 not self._upper_owned(LEVEL_CLUSTER)), (
1507 "Cannot release the Big Ganeti Lock while holding something"
1508 " at upper levels (%r)" %
1509 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1510 for i in self.__keyring.keys()]), ))
1511
1512
1513 return self.__keyring[level].release(names)
1514
1515 - def add(self, level, names, acquired=0, shared=0):
1516 """Add locks at the specified level.
1517
1518 @type level: member of locking.LEVELS_MOD
1519 @param level: the level at which the locks shall be added
1520 @type names: list of strings
1521 @param names: names of the locks to acquire
1522 @type acquired: integer (0/1) used as a boolean
1523 @param acquired: whether to acquire the newly added locks
1524 @type shared: integer (0/1) used as a boolean
1525 @param shared: whether the acquisition will be shared
1526
1527 """
1528 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1529 assert self._BGL_owned(), ("You must own the BGL before performing other"
1530 " operations")
1531 assert not self._upper_owned(level), ("Cannot add locks at a level"
1532 " while owning some at a greater one")
1533 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1534
1535 - def remove(self, level, names):
1536 """Remove locks from the specified level.
1537
1538 You must either already own the locks you are trying to remove
1539 exclusively or not own any lock at an upper level.
1540
1541 @type level: member of locking.LEVELS_MOD
1542 @param level: the level at which the locks shall be removed
1543 @type names: list of strings
1544 @param names: the names of the locks which shall be removed
1545 (special lock names, or instance/node names)
1546
1547 """
1548 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1549 assert self._BGL_owned(), ("You must own the BGL before performing other"
1550 " operations")
1551
1552
1553
1554 assert self._is_owned(level) or not self._upper_owned(level), (
1555 "Cannot remove locks at a level while not owning it or"
1556 " owning some at a greater one")
1557 return self.__keyring[level].remove(names)
1558
1561 _LOCK_ATTR = "_lock"
1562
1564 """Initializes this class.
1565
1566 """
1567 self._lock = SharedLock("LockMonitor")
1568
1569
1570
1571 self._locks = weakref.WeakKeyDictionary()
1572
1573 @ssynchronized(_LOCK_ATTR)
1575 """Registers a new lock.
1576
1577 """
1578 logging.debug("Registering lock %s", lock.name)
1579 assert lock not in self._locks, "Duplicate lock registration"
1580 assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1581 "Found duplicate lock name"
1582 self._locks[lock] = None
1583
1584 @ssynchronized(_LOCK_ATTR)
1586 """Get information from all locks while the monitor lock is held.
1587
1588 """
1589 result = {}
1590
1591 for lock in self._locks.keys():
1592 assert lock.name not in result, "Found duplicate lock name"
1593 result[lock.name] = lock.GetInfo(fields)
1594
1595 return result
1596
1598 """Queries information from all locks.
1599
1600 @type fields: list of strings
1601 @param fields: List of fields to return
1602 @type sync: boolean
1603 @param sync: Whether to operate in synchronous mode
1604
1605 """
1606 if sync:
1607 raise NotImplementedError("Synchronous queries are not implemented")
1608
1609
1610 result = self._GetLockInfo(fields)
1611
1612
1613 return [result[name] for name in utils.NiceSort(result.keys())]
1614