/usr/lib/python3/dist-packages/construct/lib/bitstream.py is in python3-construct 2.8.16-0.2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | from io import BlockingIOError
from time import sleep
from sys import maxsize
class RestreamedBytesIO(object):
__slots__ = ["substream", "encoder", "encoderunit", "decoder", "decoderunit", "rbuffer", "wbuffer","sincereadwritten"]
def __init__(self, substream, encoder, encoderunit, decoder, decoderunit):
self.substream = substream
self.encoder = encoder
self.encoderunit = encoderunit
self.decoder = decoder
self.decoderunit = decoderunit
self.rbuffer = b""
self.wbuffer = b""
self.sincereadwritten = 0
def read(self, count):
if count < 0:
raise ValueError("count cannot be negative")
while len(self.rbuffer) < count:
data = self.substream.read(self.decoderunit)
if data is None or len(data) == 0:
raise IOError("Restreamed cannot satisfy read request of %d bytes" % (count,))
self.rbuffer += self.decoder(data)
data, self.rbuffer = self.rbuffer[:count], self.rbuffer[count:]
self.sincereadwritten += count
return data
def write(self, data):
self.wbuffer += data
datalen = len(data)
while len(self.wbuffer) >= self.encoderunit:
data, self.wbuffer = self.wbuffer[:self.encoderunit], self.wbuffer[self.encoderunit:]
self.substream.write(self.encoder(data))
self.sincereadwritten += datalen
return datalen
def close(self):
if len(self.rbuffer):
raise ValueError("closing stream but %d unread bytes remain, %d is decoded unit" % (len(self.rbuffer), self.decoderunit))
if len(self.wbuffer):
raise ValueError("closing stream but %d unwritten bytes remain, %d is encoded unit" % (len(self.wbuffer), self.encoderunit))
def seekable(self):
return False
def tell(self):
"""WARNING: tell is correct only on read-only and write-only instances."""
return self.sincereadwritten
def tellable(self):
return True
class RebufferedBytesIO(object):
__slots__ = ["substream","offset","rwbuffer","moved","tailcutoff"]
def __init__(self, substream, tailcutoff=None):
self.substream = substream
self.offset = 0
self.rwbuffer = b""
self.moved = 0
self.tailcutoff = tailcutoff
def read(self, count=None):
if count is None:
raise ValueError("count must be an int, reading until EOF not supported")
startsat = self.offset
endsat = startsat + count
if startsat < self.moved:
raise IOError("could not read because tail was cut off")
while self.moved + len(self.rwbuffer) < endsat:
try:
newdata = self.substream.read(128*1024)
except BlockingIOError:
newdata = None
if not newdata:
sleep(0)
continue
self.rwbuffer += newdata
data = self.rwbuffer[startsat-self.moved:endsat-self.moved]
self.offset += count
if self.tailcutoff is not None and self.moved < self.offset - self.tailcutoff:
removed = self.offset - self.tailcutoff - self.moved
self.moved += removed
self.rwbuffer = self.rwbuffer[removed:]
if len(data) < count:
raise IOError("could not read enough bytes, something went wrong")
return data
def write(self, data):
startsat = self.offset
endsat = startsat + len(data)
while self.moved + len(self.rwbuffer) < startsat:
newdata = self.substream.read(128*1024)
self.rwbuffer += newdata
if not newdata:
sleep(0)
self.rwbuffer = self.rwbuffer[:startsat-self.moved] + data + self.rwbuffer[endsat-self.moved:]
self.offset = endsat
if self.tailcutoff is not None and self.moved < self.offset - self.tailcutoff:
removed = self.offset - self.tailcutoff - self.moved
self.moved += removed
self.rwbuffer = self.rwbuffer[removed:]
return len(data)
def seek(self, at, whence=0):
if whence == 0:
self.offset = at
return self.offset
elif whence == 1:
self.offset += at
return self.offset
else:
raise ValueError("seeks only with whence 0 and 1")
def seekable(self):
return True
def tell(self):
return self.offset
def tellable(self):
return True
def cachedfrom(self):
return self.moved
def cachedto(self):
return self.moved + len(self.rwbuffer)
|