Sound Localization (Beta)¶
Where is the sound coming from?
Getting Started¶
Using the Angus python SDK:
# -*- coding: utf-8 -*-
import angus.client
from pprint import pprint
conn = angus.client.connect()
service = conn.services.get_service('sound_localization', version=1)
job = service.process({'sound': open("./sound.wav", 'rb'), 'baseline' : 0.7, 'sensitivity:0.5'})
pprint(job.result)
Input¶
{'sound' : file,
'baseline' : 0.7,
'sensitivity' : 0.3}
sound
: a pythonFile Object
as returned for example byopen()
or aStringIO
buffer describing a wav file with the following format:PCM 16bit, 48kHz, Stereo
.baseline
: distance between the 2 microphones of the array inmeters
.sensitivity
: modifies the ability of the algorithms to locate quiet sounds.[0, 1]
. The higher the value is, the better the algorithm will locate quiet sounds, but the more it will be sensitive to background noise.
Output¶
Events will be pushed to your client following that format:
{
"input_size" : 8192,
"nb_sources" : 1,
"sources" : [
{
"index" : 345,
"yaw" : 0.156,
"confidence" : 0.53,
}
]
}
input_size
: number of frame given as input (in a stereo file, 1 frame = 1 left sample + 1 right sample).nb_sources
: number of sound sources located.yaw
: angle of the sound source in radian as shown below:confidence
: an estimate of the probability that a real sound source is indeed located at the givenyaw
.
Code Sample¶
This sample assumes that you have a sound card able to record in stereo.
requirements: PyAudio
This code sample retrieve the audio stream of a recording device and display the result of the sound_localization
service.
# -*- coding: utf-8 -*-
import Queue
import StringIO
import wave
import time
import sys
from pprint import pprint
import pyaudio
import numpy as np
import angus.client
CHUNK = 8192
PYAUDIO_FORMAT = pyaudio.paInt16
NUMPY_FORMAT = np.int16
TARGET_RATE = 48000
TARGET_CHANNELS = 2
def list_inputs():
p = pyaudio.PyAudio()
for i in range(p.get_device_count()):
info = p.get_device_info_by_index(i)
if info['maxInputChannels'] > 0:
print("Device index={} name={}".format(info['index'], info['name']))
def prepare(in_data, channels, rate):
# Extract first channel
in_data = np.fromstring(in_data, dtype=NUMPY_FORMAT)
in_data = np.reshape(in_data, (CHUNK, channels))
# Re-sample if needed
srcx = np.arange(0, CHUNK, 1)
tgtx = np.arange(0, CHUNK, float(rate) / float(TARGET_RATE))
print ((in_data[:,0]).size)
left = np.interp(tgtx, srcx, in_data[:,0]).astype(NUMPY_FORMAT)
right = np.interp(tgtx, srcx, in_data[:,1]).astype(NUMPY_FORMAT)
print left.size
print CHUNK
c = np.empty((left.size + right.size), dtype=NUMPY_FORMAT)
c[0::2] = left
c[1::2] = right
return c.tostring()
def main(stream_index):
p = pyaudio.PyAudio()
# Device configuration
conf = p.get_device_info_by_index(stream_index)
channels = int(conf['maxInputChannels'])
if channels < TARGET_CHANNELS:
raise RuntimeException("Bad device, no input channel")
rate = int(conf['defaultSampleRate'])
# Angus
conn = angus.client.connect()
service = conn.services.get_service('sound_localization', version=1)
service.enable_session()
# Record Process
stream_queue = Queue.Queue()
def chunk_callback(in_data, frame_count, time_info, status):
in_data = prepare(in_data, channels, rate)
stream_queue.put(in_data)
return (in_data, pyaudio.paContinue)
stream = p.open(format=PYAUDIO_FORMAT,
channels=channels,
rate=rate,
input=True,
frames_per_buffer=CHUNK,
input_device_index=stream_index,
stream_callback=chunk_callback)
stream.start_stream()
while True:
nb_buffer_available = stream_queue.qsize()
if nb_buffer_available > 0:
print("nb buffer available = {}".format(nb_buffer_available))
if nb_buffer_available == 0:
time.sleep(0.01)
continue
data = stream_queue.get()
buff = StringIO.StringIO()
wf = wave.open(buff, 'wb')
wf.setnchannels(TARGET_CHANNELS)
wf.setsampwidth(p.get_sample_size(PYAUDIO_FORMAT))
wf.setframerate(TARGET_RATE)
wf.writeframes(data)
wf.close()
job = service.process(
{'sound': StringIO.StringIO(buff.getvalue()), 'baseline': 0.14, 'sensitivity': 0.7})
pprint(job.result['sources'])
stream.stop_stream()
stream.close()
p.terminate()
if __name__ == "__main__":
if len(sys.argv) < 2:
list_inputs()
INDEX = raw_input("Please select a device number:")
else:
INDEX = sys.argv[1]
try:
main(int(INDEX))
except ValueError:
print("Not a valid index")
exit(1)