1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 """Module implementing the Ganeti locking code."""
31
32
33
34
35
36
37 import os
38 import select
39 import threading
40 import errno
41 import logging
42 import heapq
43 import time
44
45 from ganeti import errors
46 from ganeti import utils
47 from ganeti import compat
48 from ganeti import query
49
50
51 _EXCLUSIVE_TEXT = "exclusive"
52 _SHARED_TEXT = "shared"
53 _DELETED_TEXT = "deleted"
54
55 _DEFAULT_PRIORITY = 0
56
57
58
59 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
60
61
63 """Shared Synchronization decorator.
64
65 Calls the function holding the given lock, either in exclusive or shared
66 mode. It requires the passed lock to be a SharedLock (or support its
67 semantics).
68
69 @type mylock: lockable object or string
70 @param mylock: lock to acquire or class member name of the lock to acquire
71
72 """
73 def wrap(fn):
74 def sync_function(*args, **kwargs):
75 if isinstance(mylock, basestring):
76 assert args, "cannot ssynchronize on non-class method: self not found"
77
78 lock = getattr(args[0], mylock)
79 else:
80 lock = mylock
81 lock.acquire(shared=shared)
82 try:
83 return fn(*args, **kwargs)
84 finally:
85 lock.release()
86 return sync_function
87 return wrap
88
89
91 """Helper class for SingleNotifyPipeCondition
92
93 """
94 __slots__ = [
95 "_fd",
96 ]
97
99 """Constructor for _SingleNotifyPipeConditionWaiter
100
101 @type fd: int
102 @param fd: File descriptor to wait for
103
104 """
105 object.__init__(self)
106 self._fd = fd
107
109 """Wait for something to happen on the pipe.
110
111 @type timeout: float or None
112 @param timeout: Timeout for waiting (can be None)
113
114 """
115 running_timeout = utils.RunningTimeout(timeout, True)
116 poller = select.poll()
117 poller.register(self._fd, select.POLLHUP)
118
119 while True:
120 remaining_time = running_timeout.Remaining()
121
122 if remaining_time is not None:
123 if remaining_time < 0.0:
124 break
125
126
127 remaining_time *= 1000
128
129 try:
130 result = poller.poll(remaining_time)
131 except EnvironmentError, err:
132 if err.errno != errno.EINTR:
133 raise
134 result = None
135
136
137 if result and result[0][0] == self._fd:
138 break
139
140
142 """Base class containing common code for conditions.
143
144 Some of this code is taken from python's threading module.
145
146 """
147 __slots__ = [
148 "_lock",
149 "acquire",
150 "release",
151 "_is_owned",
152 "_acquire_restore",
153 "_release_save",
154 ]
155
183
185 """Check whether lock is owned by current thread.
186
187 """
188 if self._lock.acquire(0):
189 self._lock.release()
190 return False
191 return True
192
195
198
200 """Raise an exception if the current thread doesn't own the lock.
201
202 """
203 if not self._is_owned():
204 raise RuntimeError("cannot work with un-aquired lock")
205
206
208 """Condition which can only be notified once.
209
210 This condition class uses pipes and poll, internally, to be able to wait for
211 notification with a timeout, without resorting to polling. It is almost
212 compatible with Python's threading.Condition, with the following differences:
213 - notifyAll can only be called once, and no wait can happen after that
214 - notify is not supported, only notifyAll
215
216 """
217
218 __slots__ = [
219 "_read_fd",
220 "_write_fd",
221 "_nwaiters",
222 "_notified",
223 ]
224
225 _waiter_class = _SingleNotifyPipeConditionWaiter
226
228 """Constructor for SingleNotifyPipeCondition
229
230 """
231 _BaseCondition.__init__(self, lock)
232 self._nwaiters = 0
233 self._notified = False
234 self._read_fd = None
235 self._write_fd = None
236
238 """Throws an exception if already notified.
239
240 """
241 if self._notified:
242 raise RuntimeError("cannot use already notified condition")
243
245 """Cleanup open file descriptors, if any.
246
247 """
248 if self._read_fd is not None:
249 os.close(self._read_fd)
250 self._read_fd = None
251
252 if self._write_fd is not None:
253 os.close(self._write_fd)
254 self._write_fd = None
255
256 - def wait(self, timeout):
257 """Wait for a notification.
258
259 @type timeout: float or None
260 @param timeout: Waiting timeout (can be None)
261
262 """
263 self._check_owned()
264 self._check_unnotified()
265
266 self._nwaiters += 1
267 try:
268 if self._read_fd is None:
269 (self._read_fd, self._write_fd) = os.pipe()
270
271 wait_fn = self._waiter_class(self._read_fd)
272 state = self._release_save()
273 try:
274
275 wait_fn(timeout)
276 finally:
277
278 self._acquire_restore(state)
279 finally:
280 self._nwaiters -= 1
281 if self._nwaiters == 0:
282 self._Cleanup()
283
285 """Close the writing side of the pipe to notify all waiters.
286
287 """
288 self._check_owned()
289 self._check_unnotified()
290 self._notified = True
291 if self._write_fd is not None:
292 os.close(self._write_fd)
293 self._write_fd = None
294
295
297 """Group-only non-polling condition with counters.
298
299 This condition class uses pipes and poll, internally, to be able to wait for
300 notification with a timeout, without resorting to polling. It is almost
301 compatible with Python's threading.Condition, but only supports notifyAll and
302 non-recursive locks. As an additional features it's able to report whether
303 there are any waiting threads.
304
305 """
306 __slots__ = [
307 "_waiters",
308 "_single_condition",
309 ]
310
311 _single_condition_class = SingleNotifyPipeCondition
312
314 """Initializes this class.
315
316 """
317 _BaseCondition.__init__(self, lock)
318 self._waiters = set()
319 self._single_condition = self._single_condition_class(self._lock)
320
321 - def wait(self, timeout):
322 """Wait for a notification.
323
324 @type timeout: float or None
325 @param timeout: Waiting timeout (can be None)
326
327 """
328 self._check_owned()
329
330
331
332 cond = self._single_condition
333
334 self._waiters.add(threading.currentThread())
335 try:
336 cond.wait(timeout)
337 finally:
338 self._check_owned()
339 self._waiters.remove(threading.currentThread())
340
342 """Notify all currently waiting threads.
343
344 """
345 self._check_owned()
346 self._single_condition.notifyAll()
347 self._single_condition = self._single_condition_class(self._lock)
348
350 """Returns a list of all waiting threads.
351
352 """
353 self._check_owned()
354
355 return self._waiters
356
358 """Returns whether there are active waiters.
359
360 """
361 self._check_owned()
362
363 return bool(self._waiters)
364
366 return ("<%s.%s waiters=%s at %#x>" %
367 (self.__class__.__module__, self.__class__.__name__,
368 self._waiters, id(self)))
369
370
372 __slots__ = [
373 "shared",
374 ]
375
377 """Initializes this class.
378
379 """
380 self.shared = shared
381 PipeCondition.__init__(self, lock)
382
383
385 """Implements a shared lock.
386
387 Multiple threads can acquire the lock in a shared way by calling
388 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
389 threads can call C{acquire(shared=0)}.
390
391 Notes on data structures: C{__pending} contains a priority queue (heapq) of
392 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
393 ...]}. Each per-priority queue contains a normal in-order list of conditions
394 to be notified when the lock can be acquired. Shared locks are grouped
395 together by priority and the condition for them is stored in
396 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
397 references for the per-priority queues indexed by priority for faster access.
398
399 @type name: string
400 @ivar name: the name of the lock
401
402 """
403 __slots__ = [
404 "__weakref__",
405 "__deleted",
406 "__exc",
407 "__lock",
408 "__pending",
409 "__pending_by_prio",
410 "__pending_shared",
411 "__shr",
412 "__time_fn",
413 "name",
414 ]
415
416 __condition_class = _PipeConditionWithMode
417
418 - def __init__(self, name, monitor=None, _time_fn=time.time):
419 """Construct a new SharedLock.
420
421 @param name: the name of the lock
422 @param monitor: Lock monitor with which to register
423
424 """
425 object.__init__(self)
426
427 self.name = name
428
429
430 self.__time_fn = _time_fn
431
432
433 self.__lock = threading.Lock()
434
435
436 self.__pending = []
437 self.__pending_by_prio = {}
438 self.__pending_shared = {}
439
440
441 self.__shr = set()
442 self.__exc = None
443
444
445 self.__deleted = False
446
447
448 if monitor:
449 logging.debug("Adding lock %s to monitor", name)
450 monitor.RegisterLock(self)
451
453 return ("<%s.%s name=%s at %#x>" %
454 (self.__class__.__module__, self.__class__.__name__,
455 self.name, id(self)))
456
458 """Retrieves information for querying locks.
459
460 @type requested: set
461 @param requested: Requested information, see C{query.LQ_*}
462
463 """
464 self.__lock.acquire()
465 try:
466
467
468
469 mode = None
470 owner_names = None
471
472 if query.LQ_MODE in requested:
473 if self.__deleted:
474 mode = _DELETED_TEXT
475 assert not (self.__exc or self.__shr)
476 elif self.__exc:
477 mode = _EXCLUSIVE_TEXT
478 elif self.__shr:
479 mode = _SHARED_TEXT
480
481
482 if query.LQ_OWNER in requested:
483 if self.__exc:
484 owner = [self.__exc]
485 else:
486 owner = self.__shr
487
488 if owner:
489 assert not self.__deleted
490 owner_names = [i.getName() for i in owner]
491
492
493 if query.LQ_PENDING in requested:
494 pending = []
495
496
497 for (_, prioqueue) in sorted(self.__pending):
498 for cond in prioqueue:
499 if cond.shared:
500 pendmode = _SHARED_TEXT
501 else:
502 pendmode = _EXCLUSIVE_TEXT
503
504
505 pending.append((pendmode, [i.getName()
506 for i in cond.get_waiting()]))
507 else:
508 pending = None
509
510 return [(self.name, mode, owner_names, pending)]
511 finally:
512 self.__lock.release()
513
515 """Raises an exception if the lock has been deleted.
516
517 """
518 if self.__deleted:
519 raise errors.LockError("Deleted lock %s" % self.name)
520
522 """Is the current thread sharing the lock at this time?
523
524 """
525 return threading.currentThread() in self.__shr
526
528 """Is the current thread holding the lock exclusively at this time?
529
530 """
531 return threading.currentThread() == self.__exc
532
534 """Is the current thread somehow owning the lock at this time?
535
536 This is a private version of the function, which presumes you're holding
537 the internal lock.
538
539 """
540 if shared < 0:
541 return self.__is_sharer() or self.__is_exclusive()
542 elif shared:
543 return self.__is_sharer()
544 else:
545 return self.__is_exclusive()
546
548 """Is the current thread somehow owning the lock at this time?
549
550 @param shared:
551 - < 0: check for any type of ownership (default)
552 - 0: check for exclusive ownership
553 - > 0: check for shared ownership
554
555 """
556 self.__lock.acquire()
557 try:
558 return self.__is_owned(shared=shared)
559 finally:
560 self.__lock.release()
561
562
563
564 _is_owned = is_owned
565
567 """Returns the number of pending acquires.
568
569 @rtype: int
570
571 """
572 self.__lock.acquire()
573 try:
574 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
575 finally:
576 self.__lock.release()
577
579 """Checks whether there are any pending acquires.
580
581 @rtype: bool
582
583 """
584 self.__lock.acquire()
585 try:
586
587 (_, prioqueue) = self.__find_first_pending_queue()
588
589 return not (prioqueue or
590 self.__pending or
591 self.__pending_by_prio or
592 self.__pending_shared)
593 finally:
594 self.__lock.release()
595
597 """Actually acquire the lock.
598
599 """
600 if shared:
601 self.__shr.add(threading.currentThread())
602 else:
603 self.__exc = threading.currentThread()
604
606 """Determine whether lock can be acquired.
607
608 """
609 if shared:
610 return self.__exc is None
611 else:
612 return len(self.__shr) == 0 and self.__exc is None
613
615 """Tries to find the topmost queued entry with pending acquires.
616
617 Removes empty entries while going through the list.
618
619 """
620 while self.__pending:
621 (priority, prioqueue) = self.__pending[0]
622
623 if prioqueue:
624 return (priority, prioqueue)
625
626
627 heapq.heappop(self.__pending)
628 del self.__pending_by_prio[priority]
629 assert priority not in self.__pending_shared
630
631 return (None, None)
632
634 """Checks whether the passed condition is on top of the queue.
635
636 The caller must make sure the queue isn't empty.
637
638 """
639 (_, prioqueue) = self.__find_first_pending_queue()
640
641 return cond == prioqueue[0]
642
644 """Acquire a shared lock.
645
646 @param shared: whether to acquire in shared mode; by default an
647 exclusive lock will be acquired
648 @param timeout: maximum waiting time before giving up
649 @type priority: integer
650 @param priority: Priority for acquiring lock
651
652 """
653 self.__check_deleted()
654
655
656 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
657 " %s" % self.name)
658
659
660 self.__find_first_pending_queue()
661
662
663 if not self.__pending and self.__can_acquire(shared):
664
665 self.__do_acquire(shared)
666 return True
667
668
669
670
671 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
672 return False
673
674 prioqueue = self.__pending_by_prio.get(priority, None)
675
676 if shared:
677
678 wait_condition = self.__pending_shared.get(priority, None)
679 assert (wait_condition is None or
680 (wait_condition.shared and wait_condition in prioqueue))
681 else:
682 wait_condition = None
683
684 if wait_condition is None:
685 if prioqueue is None:
686 assert priority not in self.__pending_by_prio
687
688 prioqueue = []
689 heapq.heappush(self.__pending, (priority, prioqueue))
690 self.__pending_by_prio[priority] = prioqueue
691
692 wait_condition = self.__condition_class(self.__lock, shared)
693 prioqueue.append(wait_condition)
694
695 if shared:
696
697
698 assert priority not in self.__pending_shared
699 self.__pending_shared[priority] = wait_condition
700
701 wait_start = self.__time_fn()
702 acquired = False
703
704 try:
705
706
707 while True:
708 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
709 self.__do_acquire(shared)
710 acquired = True
711 break
712
713
714
715 if (timeout is not None and
716 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
717 break
718
719
720 wait_condition.wait(timeout)
721 self.__check_deleted()
722 finally:
723
724 if not wait_condition.has_waiting():
725 prioqueue.remove(wait_condition)
726 if wait_condition.shared:
727
728
729 self.__pending_shared.pop(priority, None)
730
731 return acquired
732
733 - def acquire(self, shared=0, timeout=None, priority=None,
734 test_notify=None):
735 """Acquire a shared lock.
736
737 @type shared: integer (0/1) used as a boolean
738 @param shared: whether to acquire in shared mode; by default an
739 exclusive lock will be acquired
740 @type timeout: float
741 @param timeout: maximum waiting time before giving up
742 @type priority: integer
743 @param priority: Priority for acquiring lock
744 @type test_notify: callable or None
745 @param test_notify: Special callback function for unittesting
746
747 """
748 if priority is None:
749 priority = _DEFAULT_PRIORITY
750
751 self.__lock.acquire()
752 try:
753
754 if __debug__ and callable(test_notify):
755 test_notify()
756
757 return self.__acquire_unlocked(shared, timeout, priority)
758 finally:
759 self.__lock.release()
760
762 """Changes the lock mode from exclusive to shared.
763
764 Pending acquires in shared mode on the same priority will go ahead.
765
766 """
767 self.__lock.acquire()
768 try:
769 assert self.__is_owned(), "Lock must be owned"
770
771 if self.__is_exclusive():
772
773 self.__exc = None
774 self.__do_acquire(1)
775
776
777
778
779
780 (priority, prioqueue) = self.__find_first_pending_queue()
781 if prioqueue:
782
783 cond = self.__pending_shared.pop(priority, None)
784 if cond:
785 assert cond.shared
786 assert cond in prioqueue
787
788
789 if len(prioqueue) > 1:
790 prioqueue.remove(cond)
791 prioqueue.insert(0, cond)
792
793
794 cond.notifyAll()
795
796 assert not self.__is_exclusive()
797 assert self.__is_sharer()
798
799 return True
800 finally:
801 self.__lock.release()
802
804 """Release a Shared Lock.
805
806 You must have acquired the lock, either in shared or in exclusive mode,
807 before calling this function.
808
809 """
810 self.__lock.acquire()
811 try:
812 assert self.__is_exclusive() or self.__is_sharer(), \
813 "Cannot release non-owned lock"
814
815
816 if self.__is_exclusive():
817 self.__exc = None
818 notify = True
819 else:
820 self.__shr.remove(threading.currentThread())
821 notify = not self.__shr
822
823
824
825 if notify:
826 self.__notify_topmost()
827 finally:
828 self.__lock.release()
829
831 """Notifies topmost condition in queue of pending acquires.
832
833 """
834 (priority, prioqueue) = self.__find_first_pending_queue()
835 if prioqueue:
836 cond = prioqueue[0]
837 cond.notifyAll()
838 if cond.shared:
839
840
841 self.__pending_shared.pop(priority, None)
842
844 """Exported version of L{__notify_topmost}.
845
846 """
847 self.__lock.acquire()
848 try:
849 return self.__notify_topmost()
850 finally:
851 self.__lock.release()
852
853 - def delete(self, timeout=None, priority=None):
854 """Delete a Shared Lock.
855
856 This operation will declare the lock for removal. First the lock will be
857 acquired in exclusive mode if you don't already own it, then the lock
858 will be put in a state where any future and pending acquire() fail.
859
860 @type timeout: float
861 @param timeout: maximum waiting time before giving up
862 @type priority: integer
863 @param priority: Priority for acquiring lock
864
865 """
866 if priority is None:
867 priority = _DEFAULT_PRIORITY
868
869 self.__lock.acquire()
870 try:
871 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
872
873 self.__check_deleted()
874
875
876 acquired = self.__is_exclusive()
877
878 if not acquired:
879 acquired = self.__acquire_unlocked(0, timeout, priority)
880
881 if acquired:
882 assert self.__is_exclusive() and not self.__is_sharer(), \
883 "Lock wasn't acquired in exclusive mode"
884
885 self.__deleted = True
886 self.__exc = None
887
888 assert not (self.__exc or self.__shr), "Found owner during deletion"
889
890
891 for (_, prioqueue) in self.__pending:
892 for cond in prioqueue:
893 cond.notifyAll()
894
895 assert self.__deleted
896
897 return acquired
898 finally:
899 self.__lock.release()
900
905
908
909
910
911
912 ALL_SET = None
913
914 LOCKSET_NAME = "[lockset]"
915
916
918 """Returns the number zero.
919
920 """
921 return 0
922
923
925 """Internal exception to abort an acquire on a timeout.
926
927 """
928
929
930
931
932
933
934
935
936
937
938
939
940 (LEVEL_CLUSTER,
941 LEVEL_INSTANCE,
942 LEVEL_NODEGROUP,
943 LEVEL_NODE,
944 LEVEL_NODE_RES,
945 LEVEL_NETWORK) = range(0, 6)
946
947 LEVELS = [
948 LEVEL_CLUSTER,
949 LEVEL_INSTANCE,
950 LEVEL_NODEGROUP,
951 LEVEL_NODE,
952 LEVEL_NODE_RES,
953 LEVEL_NETWORK,
954 ]
955
956
957 LEVELS_MOD = compat.UniqueFrozenset([
958 LEVEL_NODE_RES,
959 LEVEL_NODE,
960 LEVEL_NODEGROUP,
961 LEVEL_INSTANCE,
962 LEVEL_NETWORK,
963 ])
964
965
966 LEVEL_NAMES = {
967 LEVEL_CLUSTER: "cluster",
968 LEVEL_INSTANCE: "instance",
969 LEVEL_NODEGROUP: "nodegroup",
970 LEVEL_NODE: "node",
971 LEVEL_NODE_RES: "node-res",
972 LEVEL_NETWORK: "network",
973 }
974
975
976 BGL = "BGL"
977