/usr/share/pyshared/numm/sound.py is in python-numm 0.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | """
Conversion between audio files and numpy arrays.
Audio data is represented by arrays of shape (n_frames, n_channels),
with a sample rate of 44.1kHz by default.
"""
import gst
import numpy
import numm.io
from numm.async import NummBuffer
def _make_sound_pipeline(path, sample_rate):
pipeline = gst.parse_launch(
'''
filesrc name=filesrc !
decodebin !
audioconvert !
audioresample !
audio/x-raw-int,rate=%s,width=16,depth=16,signed=true,channels=2 !
appsink name=appsink
''' % (sample_rate))
filesrc = pipeline.get_by_name('filesrc')
filesrc.props.location = path
appsink = pipeline.get_by_name('appsink')
return (pipeline, appsink)
class SoundReader(numm.io.Reader):
def __init__(self, path, cb, sample_rate=44100, start=0, n_frames=-1):
# XXX: These caps ought to be kept in sync with the ones in numm.run.
(pipeline, appsink) = _make_sound_pipeline(path, sample_rate)
start_time = int(start * gst.SECOND / float(sample_rate))
numm.io.Reader.__init__(self, pipeline, appsink, cb, start_time)
self.sample_rate = sample_rate
self.n_frames = n_frames
self.frame_idx = 0
self.seek_done = False
self.n_channels = None
def _process_buffer(self, buffer):
if self.n_channels is None:
self.n_channels = buffer.caps[0]['channels']
l = len(buffer) / (2 * self.n_channels)
np = numpy.fromstring(buffer, numpy.int16).reshape(l, self.n_channels).view(NummBuffer)
np.timestamp = buffer.timestamp
if self.n_frames > 0 and self.frame_idx + np.shape[0] >= self.n_frames:
np = np[:self.n_frames - self.frame_idx]
if np.shape[0] == 0:
self.eos = True
return None
self.frame_idx += np.shape[0]
return np
def _raw_sound_chunks(path, **kw):
chunks = []
reader = SoundReader(path, chunks.append, **kw)
for _ in reader:
while chunks:
yield chunks.pop(0)
while chunks:
yield chunks.pop(0)
def sound_chunks(path, chunk_size=None, **kw):
leftover = numpy.zeros((0,2), dtype=numpy.int16)
for chunk in _raw_sound_chunks(path, **kw):
if chunk_size is not None:
leftover = numpy.concatenate([leftover, chunk])
while len(leftover) >= chunk_size:
b = leftover[:chunk_size].view(NummBuffer)
# XXX: Should adjust timestamp.
b.timestamp = chunk.timestamp
yield b
leftover = leftover[chunk_size:]
else:
yield chunk
if len(leftover) > 0:
extrashape = list(leftover.shape)
extrashape[0] = chunk_size - extrashape[0]
extra = numpy.zeros(extrashape, dtype=numpy.int16)
b = numpy.concatenate([leftover, extra]).view(NummBuffer)
b.timestamp = chunk.timestamp
yield b
_extension_map = {"wav": "wavenc !",
"mp3": "lamemp3enc ! id3v2mux !",
"ogg": "audioconvert ! vorbisenc ! oggmux !",
"flac": "flacenc !",
"aac": "faac !"
}
def _write_sound(np, filepath, opts={}):
defaults = {
'sample_rate': 44100,
'channels': len(np.shape), # assume stereo if >1 dimension
'format': filepath.split('.')[-1]
}
options = dict(defaults)
options.update(opts)
options["apipe"] = _extension_map[options["format"]]
pipeline = gst.parse_launch(
'''
appsrc name=appsrc !
audio/x-raw-int,rate=%(sample_rate)s,width=16,depth=16,channels=%(channels)d,signed=true,endianness=1234 !
%(apipe)s
filesink name=filesink
''' % (options))
def get_chunk(position, length):
return (length, np[position:position+length])
appsrc = pipeline.get_by_name('appsrc')
filesink = pipeline.get_by_name('filesink')
filesink.props.location = filepath
numm.io._run_appsrc_pipeline(pipeline, appsrc, get_chunk)
def sound2np(filepath, start=0, n_frames=-1, sample_rate=44100):
"""
Load audio data from a file.
"""
chunks = []
reader = SoundReader(filepath, chunks.append, start=start, n_frames=n_frames, sample_rate=sample_rate)
reader.run()
if chunks:
return numpy.concatenate(chunks)
else:
return numpy.ndarray((0, 2))
def np2sound(np, filepath, opts={}):
"""
Save audio data to a file.
Currently, audio is always saved as WAV data.
"""
_write_sound(np, filepath, opts)
|