1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module implementing the Ganeti locking code."""
22
23
24
25
26
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import operator
36 import itertools
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
52 """Shared Synchronization decorator.
53
54 Calls the function holding the given lock, either in exclusive or shared
55 mode. It requires the passed lock to be a SharedLock (or support its
56 semantics).
57
58 @type mylock: lockable object or string
59 @param mylock: lock to acquire or class member name of the lock to acquire
60
61 """
62 def wrap(fn):
63 def sync_function(*args, **kwargs):
64 if isinstance(mylock, basestring):
65 assert args, "cannot ssynchronize on non-class method: self not found"
66
67 lock = getattr(args[0], mylock)
68 else:
69 lock = mylock
70 lock.acquire(shared=shared)
71 try:
72 return fn(*args, **kwargs)
73 finally:
74 lock.release()
75 return sync_function
76 return wrap
77
80 """Helper class for SingleNotifyPipeCondition
81
82 """
83 __slots__ = [
84 "_fd",
85 "_poller",
86 ]
87
89 """Constructor for _SingleNotifyPipeConditionWaiter
90
91 @type poller: select.poll
92 @param poller: Poller object
93 @type fd: int
94 @param fd: File descriptor to wait for
95
96 """
97 object.__init__(self)
98 self._poller = poller
99 self._fd = fd
100
102 """Wait for something to happen on the pipe.
103
104 @type timeout: float or None
105 @param timeout: Timeout for waiting (can be None)
106
107 """
108 running_timeout = utils.RunningTimeout(timeout, True)
109
110 while True:
111 remaining_time = running_timeout.Remaining()
112
113 if remaining_time is not None:
114 if remaining_time < 0.0:
115 break
116
117
118 remaining_time *= 1000
119
120 try:
121 result = self._poller.poll(remaining_time)
122 except EnvironmentError, err:
123 if err.errno != errno.EINTR:
124 raise
125 result = None
126
127
128 if result and result[0][0] == self._fd:
129 break
130
133 """Base class containing common code for conditions.
134
135 Some of this code is taken from python's threading module.
136
137 """
138 __slots__ = [
139 "_lock",
140 "acquire",
141 "release",
142 "_is_owned",
143 "_acquire_restore",
144 "_release_save",
145 ]
146
174
176 """Check whether lock is owned by current thread.
177
178 """
179 if self._lock.acquire(0):
180 self._lock.release()
181 return False
182 return True
183
186
189
191 """Raise an exception if the current thread doesn't own the lock.
192
193 """
194 if not self._is_owned():
195 raise RuntimeError("cannot work with un-aquired lock")
196
199 """Condition which can only be notified once.
200
201 This condition class uses pipes and poll, internally, to be able to wait for
202 notification with a timeout, without resorting to polling. It is almost
203 compatible with Python's threading.Condition, with the following differences:
204 - notifyAll can only be called once, and no wait can happen after that
205 - notify is not supported, only notifyAll
206
207 """
208
209 __slots__ = [
210 "_poller",
211 "_read_fd",
212 "_write_fd",
213 "_nwaiters",
214 "_notified",
215 ]
216
217 _waiter_class = _SingleNotifyPipeConditionWaiter
218
220 """Constructor for SingleNotifyPipeCondition
221
222 """
223 _BaseCondition.__init__(self, lock)
224 self._nwaiters = 0
225 self._notified = False
226 self._read_fd = None
227 self._write_fd = None
228 self._poller = None
229
231 """Throws an exception if already notified.
232
233 """
234 if self._notified:
235 raise RuntimeError("cannot use already notified condition")
236
238 """Cleanup open file descriptors, if any.
239
240 """
241 if self._read_fd is not None:
242 os.close(self._read_fd)
243 self._read_fd = None
244
245 if self._write_fd is not None:
246 os.close(self._write_fd)
247 self._write_fd = None
248 self._poller = None
249
250 - def wait(self, timeout=None):
251 """Wait for a notification.
252
253 @type timeout: float or None
254 @param timeout: Waiting timeout (can be None)
255
256 """
257 self._check_owned()
258 self._check_unnotified()
259
260 self._nwaiters += 1
261 try:
262 if self._poller is None:
263 (self._read_fd, self._write_fd) = os.pipe()
264 self._poller = select.poll()
265 self._poller.register(self._read_fd, select.POLLHUP)
266
267 wait_fn = self._waiter_class(self._poller, self._read_fd)
268 state = self._release_save()
269 try:
270
271 wait_fn(timeout)
272 finally:
273
274 self._acquire_restore(state)
275 finally:
276 self._nwaiters -= 1
277 if self._nwaiters == 0:
278 self._Cleanup()
279
281 """Close the writing side of the pipe to notify all waiters.
282
283 """
284 self._check_owned()
285 self._check_unnotified()
286 self._notified = True
287 if self._write_fd is not None:
288 os.close(self._write_fd)
289 self._write_fd = None
290
293 """Group-only non-polling condition with counters.
294
295 This condition class uses pipes and poll, internally, to be able to wait for
296 notification with a timeout, without resorting to polling. It is almost
297 compatible with Python's threading.Condition, but only supports notifyAll and
298 non-recursive locks. As an additional features it's able to report whether
299 there are any waiting threads.
300
301 """
302 __slots__ = [
303 "_waiters",
304 "_single_condition",
305 ]
306
307 _single_condition_class = SingleNotifyPipeCondition
308
310 """Initializes this class.
311
312 """
313 _BaseCondition.__init__(self, lock)
314 self._waiters = set()
315 self._single_condition = self._single_condition_class(self._lock)
316
317 - def wait(self, timeout=None):
318 """Wait for a notification.
319
320 @type timeout: float or None
321 @param timeout: Waiting timeout (can be None)
322
323 """
324 self._check_owned()
325
326
327
328 cond = self._single_condition
329
330 self._waiters.add(threading.currentThread())
331 try:
332 cond.wait(timeout)
333 finally:
334 self._check_owned()
335 self._waiters.remove(threading.currentThread())
336
338 """Notify all currently waiting threads.
339
340 """
341 self._check_owned()
342 self._single_condition.notifyAll()
343 self._single_condition = self._single_condition_class(self._lock)
344
346 """Returns a list of all waiting threads.
347
348 """
349 self._check_owned()
350
351 return self._waiters
352
354 """Returns whether there are active waiters.
355
356 """
357 self._check_owned()
358
359 return bool(self._waiters)
360
363 __slots__ = [
364 "shared",
365 ]
366
368 """Initializes this class.
369
370 """
371 self.shared = shared
372 PipeCondition.__init__(self, lock)
373
376 """Implements a shared lock.
377
378 Multiple threads can acquire the lock in a shared way by calling
379 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380 threads can call C{acquire(shared=0)}.
381
382 Notes on data structures: C{__pending} contains a priority queue (heapq) of
383 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384 ...]}. Each per-priority queue contains a normal in-order list of conditions
385 to be notified when the lock can be acquired. Shared locks are grouped
386 together by priority and the condition for them is stored in
387 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388 references for the per-priority queues indexed by priority for faster access.
389
390 @type name: string
391 @ivar name: the name of the lock
392
393 """
394 __slots__ = [
395 "__weakref__",
396 "__deleted",
397 "__exc",
398 "__lock",
399 "__pending",
400 "__pending_by_prio",
401 "__pending_shared",
402 "__shr",
403 "name",
404 ]
405
406 __condition_class = _PipeConditionWithMode
407
408 - def __init__(self, name, monitor=None):
409 """Construct a new SharedLock.
410
411 @param name: the name of the lock
412 @type monitor: L{LockMonitor}
413 @param monitor: Lock monitor with which to register
414
415 """
416 object.__init__(self)
417
418 self.name = name
419
420
421 self.__lock = threading.Lock()
422
423
424 self.__pending = []
425 self.__pending_by_prio = {}
426 self.__pending_shared = {}
427
428
429 self.__shr = set()
430 self.__exc = None
431
432
433 self.__deleted = False
434
435
436 if monitor:
437 monitor.RegisterLock(self)
438
440 """Retrieves information for querying locks.
441
442 @type requested: set
443 @param requested: Requested information, see C{query.LQ_*}
444
445 """
446 self.__lock.acquire()
447 try:
448
449
450
451 mode = None
452 owner_names = None
453
454 if query.LQ_MODE in requested:
455 if self.__deleted:
456 mode = _DELETED_TEXT
457 assert not (self.__exc or self.__shr)
458 elif self.__exc:
459 mode = _EXCLUSIVE_TEXT
460 elif self.__shr:
461 mode = _SHARED_TEXT
462
463
464 if query.LQ_OWNER in requested:
465 if self.__exc:
466 owner = [self.__exc]
467 else:
468 owner = self.__shr
469
470 if owner:
471 assert not self.__deleted
472 owner_names = [i.getName() for i in owner]
473
474
475 if query.LQ_PENDING in requested:
476 pending = []
477
478
479 for (_, prioqueue) in sorted(self.__pending):
480 for cond in prioqueue:
481 if cond.shared:
482 pendmode = _SHARED_TEXT
483 else:
484 pendmode = _EXCLUSIVE_TEXT
485
486
487 pending.append((pendmode, [i.getName()
488 for i in cond.get_waiting()]))
489 else:
490 pending = None
491
492 return (self.name, mode, owner_names, pending)
493 finally:
494 self.__lock.release()
495
497 """Raises an exception if the lock has been deleted.
498
499 """
500 if self.__deleted:
501 raise errors.LockError("Deleted lock %s" % self.name)
502
504 """Is the current thread sharing the lock at this time?
505
506 """
507 return threading.currentThread() in self.__shr
508
510 """Is the current thread holding the lock exclusively at this time?
511
512 """
513 return threading.currentThread() == self.__exc
514
516 """Is the current thread somehow owning the lock at this time?
517
518 This is a private version of the function, which presumes you're holding
519 the internal lock.
520
521 """
522 if shared < 0:
523 return self.__is_sharer() or self.__is_exclusive()
524 elif shared:
525 return self.__is_sharer()
526 else:
527 return self.__is_exclusive()
528
530 """Is the current thread somehow owning the lock at this time?
531
532 @param shared:
533 - < 0: check for any type of ownership (default)
534 - 0: check for exclusive ownership
535 - > 0: check for shared ownership
536
537 """
538 self.__lock.acquire()
539 try:
540 return self.__is_owned(shared=shared)
541 finally:
542 self.__lock.release()
543
545 """Returns the number of pending acquires.
546
547 @rtype: int
548
549 """
550 self.__lock.acquire()
551 try:
552 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553 finally:
554 self.__lock.release()
555
557 """Checks whether there are any pending acquires.
558
559 @rtype: bool
560
561 """
562 self.__lock.acquire()
563 try:
564
565 return not (self.__find_first_pending_queue() or
566 self.__pending or
567 self.__pending_by_prio or
568 self.__pending_shared)
569 finally:
570 self.__lock.release()
571
573 """Actually acquire the lock.
574
575 """
576 if shared:
577 self.__shr.add(threading.currentThread())
578 else:
579 self.__exc = threading.currentThread()
580
582 """Determine whether lock can be acquired.
583
584 """
585 if shared:
586 return self.__exc is None
587 else:
588 return len(self.__shr) == 0 and self.__exc is None
589
591 """Tries to find the topmost queued entry with pending acquires.
592
593 Removes empty entries while going through the list.
594
595 """
596 while self.__pending:
597 (priority, prioqueue) = self.__pending[0]
598
599 if not prioqueue:
600 heapq.heappop(self.__pending)
601 del self.__pending_by_prio[priority]
602 assert priority not in self.__pending_shared
603 continue
604
605 if prioqueue:
606 return prioqueue
607
608 return None
609
611 """Checks whether the passed condition is on top of the queue.
612
613 The caller must make sure the queue isn't empty.
614
615 """
616 return cond == self.__find_first_pending_queue()[0]
617
619 """Acquire a shared lock.
620
621 @param shared: whether to acquire in shared mode; by default an
622 exclusive lock will be acquired
623 @param timeout: maximum waiting time before giving up
624 @type priority: integer
625 @param priority: Priority for acquiring lock
626
627 """
628 self.__check_deleted()
629
630
631 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
632 " %s" % self.name)
633
634
635 self.__find_first_pending_queue()
636
637
638 if not self.__pending and self.__can_acquire(shared):
639
640 self.__do_acquire(shared)
641 return True
642
643 prioqueue = self.__pending_by_prio.get(priority, None)
644
645 if shared:
646
647 wait_condition = self.__pending_shared.get(priority, None)
648 assert (wait_condition is None or
649 (wait_condition.shared and wait_condition in prioqueue))
650 else:
651 wait_condition = None
652
653 if wait_condition is None:
654 if prioqueue is None:
655 assert priority not in self.__pending_by_prio
656
657 prioqueue = []
658 heapq.heappush(self.__pending, (priority, prioqueue))
659 self.__pending_by_prio[priority] = prioqueue
660
661 wait_condition = self.__condition_class(self.__lock, shared)
662 prioqueue.append(wait_condition)
663
664 if shared:
665
666
667 assert priority not in self.__pending_shared
668 self.__pending_shared[priority] = wait_condition
669
670 try:
671
672
673
674 while not (self.__is_on_top(wait_condition) and
675 self.__can_acquire(shared)):
676
677 wait_condition.wait(timeout)
678 self.__check_deleted()
679
680
681
682 if timeout is not None:
683 break
684
685 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
686 self.__do_acquire(shared)
687 return True
688 finally:
689
690 if not wait_condition.has_waiting():
691 prioqueue.remove(wait_condition)
692 if wait_condition.shared:
693 del self.__pending_shared[priority]
694
695 return False
696
697 - def acquire(self, shared=0, timeout=None, priority=None,
698 test_notify=None):
699 """Acquire a shared lock.
700
701 @type shared: integer (0/1) used as a boolean
702 @param shared: whether to acquire in shared mode; by default an
703 exclusive lock will be acquired
704 @type timeout: float
705 @param timeout: maximum waiting time before giving up
706 @type priority: integer
707 @param priority: Priority for acquiring lock
708 @type test_notify: callable or None
709 @param test_notify: Special callback function for unittesting
710
711 """
712 if priority is None:
713 priority = _DEFAULT_PRIORITY
714
715 self.__lock.acquire()
716 try:
717
718 if __debug__ and callable(test_notify):
719 test_notify()
720
721 return self.__acquire_unlocked(shared, timeout, priority)
722 finally:
723 self.__lock.release()
724
726 """Release a Shared Lock.
727
728 You must have acquired the lock, either in shared or in exclusive mode,
729 before calling this function.
730
731 """
732 self.__lock.acquire()
733 try:
734 assert self.__is_exclusive() or self.__is_sharer(), \
735 "Cannot release non-owned lock"
736
737
738 if self.__is_exclusive():
739 self.__exc = None
740 else:
741 self.__shr.remove(threading.currentThread())
742
743
744 prioqueue = self.__find_first_pending_queue()
745 if prioqueue:
746 prioqueue[0].notifyAll()
747
748 finally:
749 self.__lock.release()
750
751 - def delete(self, timeout=None, priority=None):
752 """Delete a Shared Lock.
753
754 This operation will declare the lock for removal. First the lock will be
755 acquired in exclusive mode if you don't already own it, then the lock
756 will be put in a state where any future and pending acquire() fail.
757
758 @type timeout: float
759 @param timeout: maximum waiting time before giving up
760 @type priority: integer
761 @param priority: Priority for acquiring lock
762
763 """
764 if priority is None:
765 priority = _DEFAULT_PRIORITY
766
767 self.__lock.acquire()
768 try:
769 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
770
771 self.__check_deleted()
772
773
774 acquired = self.__is_exclusive()
775
776 if not acquired:
777 acquired = self.__acquire_unlocked(0, timeout, priority)
778
779 assert self.__is_exclusive() and not self.__is_sharer(), \
780 "Lock wasn't acquired in exclusive mode"
781
782 if acquired:
783 self.__deleted = True
784 self.__exc = None
785
786 assert not (self.__exc or self.__shr), "Found owner during deletion"
787
788
789 for (_, prioqueue) in self.__pending:
790 for cond in prioqueue:
791 cond.notifyAll()
792
793 assert self.__deleted
794
795 return acquired
796 finally:
797 self.__lock.release()
798
803
806
807
808
809
810 ALL_SET = None
814 """Internal exception to abort an acquire on a timeout.
815
816 """
817
820 """Implements a set of locks.
821
822 This abstraction implements a set of shared locks for the same resource type,
823 distinguished by name. The user can lock a subset of the resources and the
824 LockSet will take care of acquiring the locks always in the same order, thus
825 preventing deadlock.
826
827 All the locks needed in the same set must be acquired together, though.
828
829 @type name: string
830 @ivar name: the name of the lockset
831
832 """
833 - def __init__(self, members, name, monitor=None):
834 """Constructs a new LockSet.
835
836 @type members: list of strings
837 @param members: initial members of the set
838 @type monitor: L{LockMonitor}
839 @param monitor: Lock monitor with which to register member locks
840
841 """
842 assert members is not None, "members parameter is not a list"
843 self.name = name
844
845
846 self.__monitor = monitor
847
848
849 self.__lock = SharedLock(name)
850
851
852
853 self.__lockdict = {}
854
855 for mname in members:
856 self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
857 monitor=monitor)
858
859
860
861
862
863
864
865
866 self.__owners = {}
867
869 """Returns the name for a member lock.
870
871 """
872 return "%s/%s" % (self.name, mname)
873
875 """Is the current thread a current level owner?"""
876 return threading.currentThread() in self.__owners
877
879 """Note the current thread owns the given lock"""
880 if name is None:
881 if not self._is_owned():
882 self.__owners[threading.currentThread()] = set()
883 else:
884 if self._is_owned():
885 self.__owners[threading.currentThread()].add(name)
886 else:
887 self.__owners[threading.currentThread()] = set([name])
888
890 """Note the current thread owns the given lock"""
891
892 assert not (name is None and self.__lock._is_owned()), \
893 "Cannot hold internal lock when deleting owner status"
894
895 if name is not None:
896 self.__owners[threading.currentThread()].remove(name)
897
898
899 if (not self.__lock._is_owned() and
900 not self.__owners[threading.currentThread()]):
901 del self.__owners[threading.currentThread()]
902
904 """Get the set of resource names owned by the current thread"""
905 if self._is_owned():
906 return self.__owners[threading.currentThread()].copy()
907 else:
908 return set()
909
911 """Release and delete all resources owned by the current thread"""
912 for lname in self._list_owned():
913 lock = self.__lockdict[lname]
914 if lock._is_owned():
915 lock.release()
916 self._del_owned(name=lname)
917
919 """Return the current set of names.
920
921 Only call this function while holding __lock and don't iterate on the
922 result after releasing the lock.
923
924 """
925 return self.__lockdict.keys()
926
928 """Return a copy of the current set of elements.
929
930 Used only for debugging purposes.
931
932 """
933
934
935 release_lock = False
936 if not self.__lock._is_owned():
937 release_lock = True
938 self.__lock.acquire(shared=1)
939 try:
940 result = self.__names()
941 finally:
942 if release_lock:
943 self.__lock.release()
944 return set(result)
945
946 - def acquire(self, names, timeout=None, shared=0, priority=None,
947 test_notify=None):
948 """Acquire a set of resource locks.
949
950 @type names: list of strings (or string)
951 @param names: the names of the locks which shall be acquired
952 (special lock names, or instance/node names)
953 @type shared: integer (0/1) used as a boolean
954 @param shared: whether to acquire in shared mode; by default an
955 exclusive lock will be acquired
956 @type timeout: float or None
957 @param timeout: Maximum time to acquire all locks
958 @type priority: integer
959 @param priority: Priority for acquiring locks
960 @type test_notify: callable or None
961 @param test_notify: Special callback function for unittesting
962
963 @return: Set of all locks successfully acquired or None in case of timeout
964
965 @raise errors.LockError: when any lock we try to acquire has
966 been deleted before we succeed. In this case none of the
967 locks requested will be acquired.
968
969 """
970 assert timeout is None or timeout >= 0.0
971
972
973 assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
974 " (lockset %s)" % self.name)
975
976 if priority is None:
977 priority = _DEFAULT_PRIORITY
978
979
980
981 running_timeout = utils.RunningTimeout(timeout, False)
982
983 try:
984 if names is not None:
985
986 if isinstance(names, basestring):
987 names = [names]
988
989 return self.__acquire_inner(names, False, shared, priority,
990 running_timeout.Remaining, test_notify)
991
992 else:
993
994
995
996
997
998
999
1000
1001
1002 if not self.__lock.acquire(shared=shared, priority=priority,
1003 timeout=running_timeout.Remaining()):
1004 raise _AcquireTimeout()
1005 try:
1006
1007 self._add_owned()
1008
1009 return self.__acquire_inner(self.__names(), True, shared, priority,
1010 running_timeout.Remaining, test_notify)
1011 except:
1012
1013
1014
1015 self.__lock.release()
1016 self._del_owned()
1017 raise
1018
1019 except _AcquireTimeout:
1020 return None
1021
1022 - def __acquire_inner(self, names, want_all, shared, priority,
1023 timeout_fn, test_notify):
1024 """Inner logic for acquiring a number of locks.
1025
1026 @param names: Names of the locks to be acquired
1027 @param want_all: Whether all locks in the set should be acquired
1028 @param shared: Whether to acquire in shared mode
1029 @param timeout_fn: Function returning remaining timeout
1030 @param priority: Priority for acquiring locks
1031 @param test_notify: Special callback function for unittesting
1032
1033 """
1034 acquire_list = []
1035
1036
1037
1038
1039
1040 for lname in sorted(utils.UniqueSequence(names)):
1041 try:
1042 lock = self.__lockdict[lname]
1043 except KeyError:
1044 if want_all:
1045
1046
1047 continue
1048
1049 raise errors.LockError("Non-existing lock %s in set %s (it may have"
1050 " been removed)" % (lname, self.name))
1051
1052 acquire_list.append((lname, lock))
1053
1054
1055 acquired = set()
1056
1057 try:
1058
1059
1060
1061
1062
1063 for (lname, lock) in acquire_list:
1064 if __debug__ and callable(test_notify):
1065 test_notify_fn = lambda: test_notify(lname)
1066 else:
1067 test_notify_fn = None
1068
1069 timeout = timeout_fn()
1070
1071 try:
1072
1073 acq_success = lock.acquire(shared=shared, timeout=timeout,
1074 priority=priority,
1075 test_notify=test_notify_fn)
1076 except errors.LockError:
1077 if want_all:
1078
1079
1080 continue
1081
1082 raise errors.LockError("Non-existing lock %s in set %s (it may"
1083 " have been removed)" % (lname, self.name))
1084
1085 if not acq_success:
1086
1087 if timeout is None:
1088
1089
1090 raise errors.LockError("Failed to get lock %s (set %s)" %
1091 (lname, self.name))
1092
1093 raise _AcquireTimeout()
1094
1095 try:
1096
1097 self._add_owned(name=lname)
1098 acquired.add(lname)
1099
1100 except:
1101
1102
1103
1104 if lock._is_owned():
1105 lock.release()
1106 raise
1107
1108 except:
1109
1110 self._release_and_delete_owned()
1111 raise
1112
1113 return acquired
1114
1116 """Release a set of resource locks, at the same level.
1117
1118 You must have acquired the locks, either in shared or in exclusive mode,
1119 before releasing them.
1120
1121 @type names: list of strings, or None
1122 @param names: the names of the locks which shall be released
1123 (defaults to all the locks acquired at that level).
1124
1125 """
1126 assert self._is_owned(), ("release() on lock set %s while not owner" %
1127 self.name)
1128
1129
1130 if isinstance(names, basestring):
1131 names = [names]
1132
1133 if names is None:
1134 names = self._list_owned()
1135 else:
1136 names = set(names)
1137 assert self._list_owned().issuperset(names), (
1138 "release() on unheld resources %s (set %s)" %
1139 (names.difference(self._list_owned()), self.name))
1140
1141
1142
1143 if self.__lock._is_owned():
1144 self.__lock.release()
1145 self._del_owned()
1146
1147 for lockname in names:
1148
1149
1150 self.__lockdict[lockname].release()
1151 self._del_owned(name=lockname)
1152
1153 - def add(self, names, acquired=0, shared=0):
1154 """Add a new set of elements to the set
1155
1156 @type names: list of strings
1157 @param names: names of the new elements to add
1158 @type acquired: integer (0/1) used as a boolean
1159 @param acquired: pre-acquire the new resource?
1160 @type shared: integer (0/1) used as a boolean
1161 @param shared: is the pre-acquisition shared?
1162
1163 """
1164
1165 assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1166 ("Cannot add locks if the set %s is only partially owned, or shared" %
1167 self.name)
1168
1169
1170 if isinstance(names, basestring):
1171 names = [names]
1172
1173
1174
1175 release_lock = False
1176 if not self.__lock._is_owned():
1177 release_lock = True
1178 self.__lock.acquire()
1179
1180 try:
1181 invalid_names = set(self.__names()).intersection(names)
1182 if invalid_names:
1183
1184
1185
1186 raise errors.LockError("duplicate add(%s) on lockset %s" %
1187 (invalid_names, self.name))
1188
1189 for lockname in names:
1190 lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1191
1192 if acquired:
1193
1194
1195 lock.acquire(shared=shared)
1196
1197 try:
1198 self._add_owned(name=lockname)
1199 except:
1200
1201
1202
1203
1204
1205
1206 lock.release()
1207 raise
1208
1209 self.__lockdict[lockname] = lock
1210
1211 finally:
1212
1213 if release_lock:
1214 self.__lock.release()
1215
1216 return True
1217
1219 """Remove elements from the lock set.
1220
1221 You can either not hold anything in the lockset or already hold a superset
1222 of the elements you want to delete, exclusively.
1223
1224 @type names: list of strings
1225 @param names: names of the resource to remove.
1226
1227 @return: a list of locks which we removed; the list is always
1228 equal to the names list if we were holding all the locks
1229 exclusively
1230
1231 """
1232
1233 if isinstance(names, basestring):
1234 names = [names]
1235
1236
1237
1238
1239 assert not self._is_owned() or self._list_owned().issuperset(names), (
1240 "remove() on acquired lockset %s while not owning all elements" %
1241 self.name)
1242
1243 removed = []
1244
1245 for lname in names:
1246
1247
1248
1249
1250
1251 try:
1252 self.__lockdict[lname].delete()
1253 removed.append(lname)
1254 except (KeyError, errors.LockError):
1255
1256 assert not self._is_owned(), ("remove failed while holding lockset %s"
1257 % self.name)
1258 else:
1259
1260
1261
1262
1263
1264
1265
1266 del self.__lockdict[lname]
1267
1268 if self._is_owned():
1269 self._del_owned(name=lname)
1270
1271 return removed
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283 LEVEL_CLUSTER = 0
1284 LEVEL_INSTANCE = 1
1285 LEVEL_NODEGROUP = 2
1286 LEVEL_NODE = 3
1287
1288 LEVELS = [LEVEL_CLUSTER,
1289 LEVEL_INSTANCE,
1290 LEVEL_NODEGROUP,
1291 LEVEL_NODE]
1292
1293
1294 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1295
1296 LEVEL_NAMES = {
1297 LEVEL_CLUSTER: "cluster",
1298 LEVEL_INSTANCE: "instance",
1299 LEVEL_NODEGROUP: "nodegroup",
1300 LEVEL_NODE: "node",
1301 }
1302
1303
1304 BGL = 'BGL'
1308 """The Ganeti Locking Library
1309
1310 The purpose of this small library is to manage locking for ganeti clusters
1311 in a central place, while at the same time doing dynamic checks against
1312 possible deadlocks. It will also make it easier to transition to a different
1313 lock type should we migrate away from python threads.
1314
1315 """
1316 _instance = None
1317
1318 - def __init__(self, nodes, nodegroups, instances):
1319 """Constructs a new GanetiLockManager object.
1320
1321 There should be only a GanetiLockManager object at any time, so this
1322 function raises an error if this is not the case.
1323
1324 @param nodes: list of node names
1325 @param nodegroups: list of nodegroup uuids
1326 @param instances: list of instance names
1327
1328 """
1329 assert self.__class__._instance is None, \
1330 "double GanetiLockManager instance"
1331
1332 self.__class__._instance = self
1333
1334 self._monitor = LockMonitor()
1335
1336
1337
1338 self.__keyring = {
1339 LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1340 LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1341 LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1342 LEVEL_INSTANCE: LockSet(instances, "instances",
1343 monitor=self._monitor),
1344 }
1345
1347 """Queries information from all locks.
1348
1349 See L{LockMonitor.QueryLocks}.
1350
1351 """
1352 return self._monitor.QueryLocks(fields)
1353
1355 """Queries information from all locks, returning old-style data.
1356
1357 See L{LockMonitor.OldStyleQueryLocks}.
1358
1359 """
1360 return self._monitor.OldStyleQueryLocks(fields)
1361
1363 """List the lock names at the given level.
1364
1365 This can be used for debugging/testing purposes.
1366
1367 @param level: the level whose list of locks to get
1368
1369 """
1370 assert level in LEVELS, "Invalid locking level %s" % level
1371 return self.__keyring[level]._names()
1372
1374 """Check whether we are owning locks at the given level
1375
1376 """
1377 return self.__keyring[level]._is_owned()
1378
1379 is_owned = _is_owned
1380
1382 """Get the set of owned locks at the given level
1383
1384 """
1385 return self.__keyring[level]._list_owned()
1386
1387 list_owned = _list_owned
1388
1390 """Check that we don't own any lock at a level greater than the given one.
1391
1392 """
1393
1394
1395 return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1396
1398 """Check if the current thread owns the BGL.
1399
1400 Both an exclusive or a shared acquisition work.
1401
1402 """
1403 return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1404
1405 @staticmethod
1407 """Check if the level contains the BGL.
1408
1409 Check if acting on the given level and set of names will change
1410 the status of the Big Ganeti Lock.
1411
1412 """
1413 return level == LEVEL_CLUSTER and (names is None or BGL in names)
1414
1415 - def acquire(self, level, names, timeout=None, shared=0, priority=None):
1416 """Acquire a set of resource locks, at the same level.
1417
1418 @type level: member of locking.LEVELS
1419 @param level: the level at which the locks shall be acquired
1420 @type names: list of strings (or string)
1421 @param names: the names of the locks which shall be acquired
1422 (special lock names, or instance/node names)
1423 @type shared: integer (0/1) used as a boolean
1424 @param shared: whether to acquire in shared mode; by default
1425 an exclusive lock will be acquired
1426 @type timeout: float
1427 @param timeout: Maximum time to acquire all locks
1428 @type priority: integer
1429 @param priority: Priority for acquiring lock
1430
1431 """
1432 assert level in LEVELS, "Invalid locking level %s" % level
1433
1434
1435
1436
1437
1438
1439
1440 assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1441 "You must own the Big Ganeti Lock before acquiring any other")
1442
1443
1444 assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1445 " while owning some at a greater one")
1446
1447
1448 return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1449 priority=priority)
1450
1451 - def release(self, level, names=None):
1452 """Release a set of resource locks, at the same level.
1453
1454 You must have acquired the locks, either in shared or in exclusive
1455 mode, before releasing them.
1456
1457 @type level: member of locking.LEVELS
1458 @param level: the level at which the locks shall be released
1459 @type names: list of strings, or None
1460 @param names: the names of the locks which shall be released
1461 (defaults to all the locks acquired at that level)
1462
1463 """
1464 assert level in LEVELS, "Invalid locking level %s" % level
1465 assert (not self._contains_BGL(level, names) or
1466 not self._upper_owned(LEVEL_CLUSTER)), (
1467 "Cannot release the Big Ganeti Lock while holding something"
1468 " at upper levels (%r)" %
1469 (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1470 for i in self.__keyring.keys()]), ))
1471
1472
1473 return self.__keyring[level].release(names)
1474
1475 - def add(self, level, names, acquired=0, shared=0):
1476 """Add locks at the specified level.
1477
1478 @type level: member of locking.LEVELS_MOD
1479 @param level: the level at which the locks shall be added
1480 @type names: list of strings
1481 @param names: names of the locks to acquire
1482 @type acquired: integer (0/1) used as a boolean
1483 @param acquired: whether to acquire the newly added locks
1484 @type shared: integer (0/1) used as a boolean
1485 @param shared: whether the acquisition will be shared
1486
1487 """
1488 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1489 assert self._BGL_owned(), ("You must own the BGL before performing other"
1490 " operations")
1491 assert not self._upper_owned(level), ("Cannot add locks at a level"
1492 " while owning some at a greater one")
1493 return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1494
1495 - def remove(self, level, names):
1496 """Remove locks from the specified level.
1497
1498 You must either already own the locks you are trying to remove
1499 exclusively or not own any lock at an upper level.
1500
1501 @type level: member of locking.LEVELS_MOD
1502 @param level: the level at which the locks shall be removed
1503 @type names: list of strings
1504 @param names: the names of the locks which shall be removed
1505 (special lock names, or instance/node names)
1506
1507 """
1508 assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1509 assert self._BGL_owned(), ("You must own the BGL before performing other"
1510 " operations")
1511
1512
1513
1514 assert self._is_owned(level) or not self._upper_owned(level), (
1515 "Cannot remove locks at a level while not owning it or"
1516 " owning some at a greater one")
1517 return self.__keyring[level].remove(names)
1518
1521 """Sorting key function.
1522
1523 Sort by name, then by incoming order.
1524
1525 """
1526 (name, _, _, _) = item
1527
1528 return (utils.NiceSortKey(name), num)
1529
1532 _LOCK_ATTR = "_lock"
1533
1535 """Initializes this class.
1536
1537 """
1538 self._lock = SharedLock("LockMonitor")
1539
1540
1541 self._counter = itertools.count(0)
1542
1543
1544
1545 self._locks = weakref.WeakKeyDictionary()
1546
1547 @ssynchronized(_LOCK_ATTR)
1549 """Registers a new lock.
1550
1551 """
1552 logging.debug("Registering lock %s", lock.name)
1553 assert lock not in self._locks, "Duplicate lock registration"
1554
1555
1556
1557
1558
1559
1560
1561 self._locks[lock] = self._counter.next()
1562
1563 @ssynchronized(_LOCK_ATTR)
1565 """Get information from all locks while the monitor lock is held.
1566
1567 """
1568 return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1569
1586
1588 """Queries information from all locks.
1589
1590 @type fields: list of strings
1591 @param fields: List of fields to return
1592
1593 """
1594 (qobj, ctx) = self._Query(fields)
1595
1596
1597 return query.GetQueryResponse(qobj, ctx)
1598
1600 """Queries information from all locks, returning old-style data.
1601
1602 @type fields: list of strings
1603 @param fields: List of fields to return
1604
1605 """
1606 (qobj, ctx) = self._Query(fields)
1607
1608 return qobj.OldStyleQuery(ctx)
1609