This file is indexed.

/usr/lib/python3/dist-packages/postgresql/test/test_alock.py is in python3-postgresql 1.1.0-1build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
##
# .test.test_alock - test .alock
##
import unittest
import threading
import time
from ..temporal import pg_tmp
from .. import alock

n_alocks = "select count(*) FROM pg_locks WHERE locktype = 'advisory'"

class test_alock(unittest.TestCase):
	@pg_tmp
	def testALockWait(self):
		# sadly, this is primarily used to exercise the code paths..
		ad = prepare(n_alocks).first
		self.assertEqual(ad(), 0)
		state = [False, False, False]
		alt = new()
		first = alock.ExclusiveLock(db, (0,0))
		second = alock.ExclusiveLock(db, 1)
		def concurrent_lock():
			try:
				with alock.ExclusiveLock(alt, 1):
					with alock.ExclusiveLock(alt, (0,0)):
						# start it
						state[0] = True
						while not state[1]:
							pass
							time.sleep(0.01)
					while not state[2]:
						time.sleep(0.01)
			except Exception:
				# Avoid dead lock in cases where advisory is not available.
				state[0] = state[1] = state[2] = True
		t = threading.Thread(target = concurrent_lock)
		t.start()
		while not state[0]:
			time.sleep(0.01)
		self.assertEqual(ad(), 2)
		state[1] = True
		with first:
			self.assertEqual(ad(), 2)
			state[2] = True
			with second:
				self.assertEqual(ad(), 2)
		t.join(timeout = 1)

	@pg_tmp
	def testALockNoWait(self):
		alt = new()
		ad = prepare(n_alocks).first
		self.assertEqual(ad(), 0)
		with alock.ExclusiveLock(db, (0,0)):
			l=alock.ExclusiveLock(alt, (0,0))
			# should fail to acquire
			self.assertEqual(l.acquire(blocking=False), False)
		# no alocks should exist now
		self.assertEqual(ad(), 0)

	@pg_tmp
	def testALock(self):
		ad = prepare(n_alocks).first
		self.assertEqual(ad(), 0)
		# test a variety..
		lockids = [
			(1,4),
			-32532, 0, 2,
			(7, -1232),
			4, 5, 232142423,
			(18,7),
			2, (1,4)
		]
		alt = new()
		xal1 = alock.ExclusiveLock(db, *lockids)
		xal2 = alock.ExclusiveLock(db, *lockids)
		sal1 = alock.ShareLock(db, *lockids)
		with sal1:
			with xal1, xal2:
				self.assertTrue(ad() > 0)
				for x in lockids:
					xl = alock.ExclusiveLock(alt, x)
					self.assertEqual(xl.acquire(blocking=False), False)
				# main has exclusives on these, so this should fail.
				xl = alock.ShareLock(alt, *lockids)
				self.assertEqual(xl.acquire(blocking=False), False)
			for x in lockids:
				# sal1 still holds
				xl = alock.ExclusiveLock(alt, x)
				self.assertEqual(xl.acquire(blocking=False), False)
				# sal1 still holds, but we want a share lock too.
				xl = alock.ShareLock(alt, x)
				self.assertEqual(xl.acquire(blocking=False), True)
				xl.release()
		# no alocks should exist now
		self.assertEqual(ad(), 0)

	@pg_tmp
	def testPartialALock(self):
		# Validates that release is properly cleaning up
		ad = prepare(n_alocks).first
		self.assertEqual(ad(), 0)
		held = (0,-1234)
		wanted = [0, 324, -1232948, 7, held, 1, (2,4), (834,1)]
		alt = new()
		with alock.ExclusiveLock(db, held):
			l=alock.ExclusiveLock(alt, *wanted)
			# should fail to acquire, db has held
			self.assertEqual(l.acquire(blocking=False), False)
		# No alocks should exist now.
		# This *MUST* occur prior to alt being closed.
		# Otherwise, we won't be testing for the recovery
		# of a failed non-blocking acquire().
		self.assertEqual(ad(), 0)

	@pg_tmp
	def testALockParameterErrors(self):
		self.assertRaises(TypeError, alock.ALock)
		l = alock.ExclusiveLock(db)
		self.assertRaises(RuntimeError, l.release)

	@pg_tmp
	def testALockOnClosed(self):
		ad = prepare(n_alocks).first
		self.assertEqual(ad(), 0)
		held = (0,-1234)
		alt = new()
		# __exit__ should only touch the count.
		with alock.ExclusiveLock(alt, held) as l:
			self.assertEqual(ad(), 1)
			self.assertEqual(l.locked(), True)
			alt.close()
			time.sleep(0.005)
			self.assertEqual(ad(), 0)
			self.assertEqual(l.locked(), False)

if __name__ == '__main__':
	unittest.main()