/usr/lib/python3/dist-packages/postgresql/protocol/pbuffer.py is in python3-postgresql 1.1.0-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | ##
# .protocol.pbuffer
##
"""
Pure Python message buffer implementation.
Given data read from the wire, buffer the data until a complete message has been
received.
"""
__all__ = ['pq_message_stream']
from io import BytesIO
import struct
from .message_types import message_types
xl_unpack = struct.Struct('!xL').unpack_from
class pq_message_stream(object):
'provide a message stream from a data stream'
_block = 512
_limit = _block * 4
def __init__(self):
self._strio = BytesIO()
self._start = 0
def truncate(self):
"remove all data in the buffer"
self._strio.truncate(0)
self._start = 0
def _rtruncate(self, amt = None):
"[internal] remove the given amount of data"
strio = self._strio
if amt is None:
amt = self._strio.tell()
strio.seek(0, 2)
size = strio.tell()
# if the total size is equal to the amt,
# then the whole thing is going to be truncated.
if size == amt:
strio.truncate(0)
return
copyto_pos = 0
copyfrom_pos = amt
while True:
strio.seek(copyfrom_pos)
data = strio.read(self._block)
# Next copyfrom
copyfrom_pos = strio.tell()
strio.seek(copyto_pos)
strio.write(data)
if len(data) != self._block:
break
# Next copyto
copyto_pos = strio.tell()
strio.truncate(size - amt)
def has_message(self, xl_unpack = xl_unpack, len = len):
"if the buffer has a message available"
strio = self._strio
strio.seek(self._start)
header = strio.read(5)
if len(header) < 5:
return False
length, = xl_unpack(header)
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
strio.seek(0, 2)
return (strio.tell() - self._start) >= length + 1
def __len__(self, xl_unpack = xl_unpack, len = len):
"number of messages in buffer"
count = 0
rpos = self._start
strio = self._strio
strio.seek(self._start)
while True:
# get the message metadata
header = strio.read(5)
rpos += 5
if len(header) < 5:
# not enough data for another message
break
# unpack the length from the header
length, = xl_unpack(header)
rpos += length - 4
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
strio.seek(length - 4 - 1, 1)
if len(strio.read(1)) != 1:
break
count += 1
return count
def _get_message(self,
mtypes = message_types,
len = len,
xl_unpack = xl_unpack,
):
strio = self._strio
header = strio.read(5)
if len(header) < 5:
return
length, = xl_unpack(header)
typ = mtypes[header[0]]
if length < 4:
raise ValueError("invalid message size '%d'" %(length,))
length -= 4
body = strio.read(length)
if len(body) < length:
# Not enough data for message.
return
return (typ, body)
def next_message(self):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
self._strio.seek(self._start)
msg = self._get_message()
if msg is not None:
self._start = self._strio.tell()
return msg
def __next__(self):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
self._strio.seek(self._start)
msg = self._get_message()
if msg is None:
raise StopIteration
self._start = self._strio.tell()
return msg
def read(self, num = 0xFFFFFFFF, len = len):
if self._start > self._limit:
self._rtruncate(self._start)
self._start = 0
new_start = self._start
self._strio.seek(new_start)
l = []
while len(l) < num:
msg = self._get_message()
if msg is None:
break
l.append(msg)
new_start += (5 + len(msg[1]))
self._start = new_start
return l
def write(self, data):
# Always append data; it's a stream, damnit..
self._strio.seek(0, 2)
self._strio.write(data)
def getvalue(self):
self._strio.seek(self._start)
return self._strio.read()
|