/usr/share/pyshared/numm/io.py is in python-numm 0.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | import os
import gobject
import gst
gobject.threads_init()
class WakePipe():
def __init__(self):
(self._read_fd, self._write_fd) = os.pipe()
def sleep(self):
return os.read(self._read_fd, 1)
def wake(self):
os.write(self._write_fd, '\0')
def close(self):
os.close(self._read_fd)
os.close(self._write_fd)
class RunPipeline(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.errors = []
self.eos = False
self.wake = WakePipe()
bus = pipeline.get_bus()
bus.set_sync_handler(self._message)
def _message(self, bus, msg):
if msg.type == gst.MESSAGE_ERROR:
self.errors.append(msg)
self.wake.wake()
elif msg.type == gst.MESSAGE_EOS:
self.eos = True
self.wake.wake()
return gst.BUS_PASS
def start(self):
self.pipeline.set_state(gst.STATE_PLAYING)
def finish(self):
ok = self.pipeline.set_state(gst.STATE_NULL)
# set_sync_handler call is necessary for the pipeline to be gc'd.
# it must come after STATE_NULL or gst frequently segfaults.
self.pipeline.get_bus().set_sync_handler(None)
if ok != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError()
state = self.pipeline.get_state()
assert state[1] == gst.STATE_NULL, state
self.wake.close()
def wait(self):
self.wake.sleep()
def __iter__(self):
try:
self.start()
while not (self.eos or self.errors):
self.wake.sleep()
yield
finally:
self.finish()
if self.errors:
raise RuntimeError(self.errors[0])
def run(self):
for _ in self:
pass
class Reader(RunPipeline):
def __init__(self, pipeline, appsink, cb, start_time=0):
RunPipeline.__init__(self, pipeline)
self.appsink = appsink
self.cb = cb
self.start_time = start_time
appsink.props.emit_signals = True
appsink.props.max_buffers = 10
appsink.props.sync = False
self.handler_id = appsink.connect('new-buffer', self._new_buffer)
def _new_buffer(self, _appsink):
buf = self.appsink.emit('pull-buffer')
self.wake.wake()
if buf is None:
return
v = self._process_buffer(buf)
if v is not None:
self.cb(v)
def finish(self):
self.appsink.disconnect(self.handler_id)
RunPipeline.finish(self)
def run(self):
if self.start_time != 0:
ok = self.pipeline.set_state(gst.STATE_PAUSED)
state = self.pipeline.get_state()
assert state[1] == gst.STATE_PAUSED, state
seek_success = self.pipeline.seek_simple(
gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
self.start_time)
if not seek_success:
raise RuntimeError("seek failed")
RunPipeline.run(self)
class Writer(RunPipeline):
def __init__(self, pipeline, appsrc):
RunPipeline.__init__(self, pipeline)
self.appsrc = appsrc
self.appsrc.props.emit_signals = True
self.appsrc.connect('need-data', self._need_data)
self.queue = []
self.start()
def _need_data(self, src, length):
self.wake.sleep()
if len(self.queue) > 0:
fr = self.queue.pop()
self.appsrc.emit('push-buffer', gst.Buffer(fr.data))
else:
self.appsrc.emit('end-of-stream')
def write(self, np):
self.queue.insert(0, np)
self.wake.wake()
def close(self):
self.wake.wake()
def _run_appsrc_pipeline(pipeline, appsrc, get_chunk):
# XXX: deprecated (need to refactor audio for streaming write)
position = [0]
def need_data(src, length):
try:
(delta_p, a) = get_chunk(position[0], length)
except IndexError:
src.emit('end-of-stream')
return
if len(a) == 0:
src.emit('end-of-stream')
return
src.emit('push-buffer', gst.Buffer(a.data))
position[0] += delta_p
appsrc.props.emit_signals = True
appsrc.connect('need-data', need_data)
run = RunPipeline(pipeline)
run.run()
|