This file is indexed.

/usr/lib/python3/dist-packages/postgresql/protocol/pbuffer.py is in python3-postgresql 1.1.0-1build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
##
# .protocol.pbuffer
##
"""
Pure Python message buffer implementation.

Given data read from the wire, buffer the data until a complete message has been
received.
"""
__all__ = ['pq_message_stream']

from io import BytesIO
import struct
from .message_types import message_types

xl_unpack = struct.Struct('!xL').unpack_from

class pq_message_stream(object):
	'provide a message stream from a data stream'
	_block = 512
	_limit = _block * 4
	def __init__(self):
		self._strio = BytesIO()
		self._start = 0

	def truncate(self):
		"remove all data in the buffer"
		self._strio.truncate(0)
		self._start = 0

	def _rtruncate(self, amt = None):
		"[internal] remove the given amount of data"
		strio = self._strio
		if amt is None:
			amt = self._strio.tell()
		strio.seek(0, 2)
		size = strio.tell()
		# if the total size is equal to the amt,
		# then the whole thing is going to be truncated.
		if size == amt:
			strio.truncate(0)
			return

		copyto_pos = 0
		copyfrom_pos = amt
		while True:
			strio.seek(copyfrom_pos)
			data = strio.read(self._block)
			# Next copyfrom
			copyfrom_pos = strio.tell()
			strio.seek(copyto_pos)
			strio.write(data)
			if len(data) != self._block:
				break
			# Next copyto
			copyto_pos = strio.tell()

		strio.truncate(size - amt)

	def has_message(self, xl_unpack = xl_unpack, len = len):
		"if the buffer has a message available"
		strio = self._strio
		strio.seek(self._start)
		header = strio.read(5)
		if len(header) < 5:
			return False
		length, = xl_unpack(header)
		if length < 4:
			raise ValueError("invalid message size '%d'" %(length,))
		strio.seek(0, 2)
		return (strio.tell() - self._start) >= length + 1

	def __len__(self, xl_unpack = xl_unpack, len = len):
		"number of messages in buffer"
		count = 0
		rpos = self._start
		strio = self._strio
		strio.seek(self._start)
		while True:
			# get the message metadata
			header = strio.read(5)
			rpos += 5
			if len(header) < 5:
				# not enough data for another message
				break
			# unpack the length from the header
			length, = xl_unpack(header)
			rpos += length - 4

			if length < 4:
				raise ValueError("invalid message size '%d'" %(length,))
			strio.seek(length - 4 - 1, 1)

			if len(strio.read(1)) != 1:
				break
			count += 1
		return count

	def _get_message(self,
		mtypes = message_types,
		len = len,
		xl_unpack = xl_unpack,
	):
		strio = self._strio
		header = strio.read(5)
		if len(header) < 5:
			return
		length, = xl_unpack(header)
		typ = mtypes[header[0]]

		if length < 4:
			raise ValueError("invalid message size '%d'" %(length,))
		length -= 4
		body = strio.read(length)
		if len(body) < length:
			# Not enough data for message.
			return
		return (typ, body)

	def next_message(self):
		if self._start > self._limit:
			self._rtruncate(self._start)
			self._start = 0

		self._strio.seek(self._start)
		msg = self._get_message()
		if msg is not None:
			self._start = self._strio.tell()
		return msg

	def __next__(self):
		if self._start > self._limit:
			self._rtruncate(self._start)
			self._start = 0

		self._strio.seek(self._start)
		msg = self._get_message()
		if msg is None:
			raise StopIteration
		self._start = self._strio.tell()
		return msg

	def read(self, num = 0xFFFFFFFF, len = len):
		if self._start > self._limit:
			self._rtruncate(self._start)
			self._start = 0

		new_start = self._start
		self._strio.seek(new_start)
		l = []
		while len(l) < num:
			msg = self._get_message()
			if msg is None:
				break
			l.append(msg)
			new_start += (5 + len(msg[1]))
		self._start = new_start
		return l

	def write(self, data):
		# Always append data; it's a stream, damnit..
		self._strio.seek(0, 2)
		self._strio.write(data)

	def getvalue(self):
		self._strio.seek(self._start)
		return self._strio.read()