/usr/share/pyshared/numm/io.py is in python-numm 0.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
| import os
import gobject
import gst
gobject.threads_init()
class WakePipe():
def __init__(self):
(self._read_fd, self._write_fd) = os.pipe()
def sleep(self):
return os.read(self._read_fd, 1)
def wake(self):
os.write(self._write_fd, '\0')
def close(self):
os.close(self._read_fd)
os.close(self._write_fd)
class RunPipeline(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.errors = []
self.eos = False
self.wake = WakePipe()
bus = pipeline.get_bus()
bus.set_sync_handler(self._message)
def _message(self, bus, msg):
if msg.type == gst.MESSAGE_ERROR:
self.errors.append(msg)
self.wake.wake()
elif msg.type == gst.MESSAGE_EOS:
self.eos = True
self.wake.wake()
return gst.BUS_PASS
def start(self):
self.pipeline.set_state(gst.STATE_PLAYING)
def finish(self):
ok = self.pipeline.set_state(gst.STATE_NULL)
# set_sync_handler call is necessary for the pipeline to be gc'd.
# it must come after STATE_NULL or gst frequently segfaults.
self.pipeline.get_bus().set_sync_handler(None)
if ok != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError()
state = self.pipeline.get_state()
assert state[1] == gst.STATE_NULL, state
self.wake.close()
def wait(self):
self.wake.sleep()
def __iter__(self):
try:
self.start()
while not (self.eos or self.errors):
self.wake.sleep()
yield
finally:
self.finish()
if self.errors:
raise RuntimeError(self.errors[0])
def run(self):
for _ in self:
pass
class Reader(RunPipeline):
def __init__(self, pipeline, appsink, cb, start_time=0):
RunPipeline.__init__(self, pipeline)
self.appsink = appsink
self.cb = cb
self.start_time = start_time
appsink.props.emit_signals = True
appsink.props.max_buffers = 10
appsink.props.sync = False
self.handler_id = appsink.connect('new-buffer', self._new_buffer)
def _new_buffer(self, _appsink):
buf = self.appsink.emit('pull-buffer')
self.wake.wake()
if buf is None:
return
v = self._process_buffer(buf)
if v is not None:
self.cb(v)
def finish(self):
self.appsink.disconnect(self.handler_id)
RunPipeline.finish(self)
def run(self):
if self.start_time != 0:
ok = self.pipeline.set_state(gst.STATE_PAUSED)
state = self.pipeline.get_state()
assert state[1] == gst.STATE_PAUSED, state
seek_success = self.pipeline.seek_simple(
gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
self.start_time)
if not seek_success:
raise RuntimeError("seek failed")
RunPipeline.run(self)
class Writer(RunPipeline):
def __init__(self, pipeline, appsrc):
RunPipeline.__init__(self, pipeline)
self.appsrc = appsrc
self.appsrc.props.emit_signals = True
self.appsrc.connect('need-data', self._need_data)
self.queue = []
self.start()
def _need_data(self, src, length):
self.wake.sleep()
if len(self.queue) > 0:
fr = self.queue.pop()
self.appsrc.emit('push-buffer', gst.Buffer(fr.data))
else:
self.appsrc.emit('end-of-stream')
def write(self, np):
self.queue.insert(0, np)
self.wake.wake()
def close(self):
self.wake.wake()
def _run_appsrc_pipeline(pipeline, appsrc, get_chunk):
# XXX: deprecated (need to refactor audio for streaming write)
position = [0]
def need_data(src, length):
try:
(delta_p, a) = get_chunk(position[0], length)
except IndexError:
src.emit('end-of-stream')
return
if len(a) == 0:
src.emit('end-of-stream')
return
src.emit('push-buffer', gst.Buffer(a.data))
position[0] += delta_p
appsrc.props.emit_signals = True
appsrc.connect('need-data', need_data)
run = RunPipeline(pipeline)
run.run()
|