Sound Localization (Beta)

Where is the sound coming from?

Getting Started

Using the Angus python SDK:

# -*- coding: utf-8 -*-
import angus.client
from pprint import pprint

conn = angus.client.connect()
service = conn.services.get_service('sound_localization', version=1)
job = service.process({'sound': open("./sound.wav", 'rb'), 'baseline' : 0.7, 'sensitivity:0.5'})

pprint(job.result)

Input

{'sound' : file,
 'baseline' : 0.7,
 'sensitivity' : 0.3}
  • sound : a python File Object as returned for example by open() or a StringIO buffer describing a wav file with the following format: PCM 16bit, 48kHz, Stereo.
  • baseline : distance between the 2 microphones of the array in meters.
  • sensitivity : modifies the ability of the algorithms to locate quiet sounds. [0, 1]. The higher the value is, the better the algorithm will locate quiet sounds, but the more it will be sensitive to background noise.

Output

Events will be pushed to your client following that format:

{
  "input_size" : 8192,
  "nb_sources" : 1,
  "sources" : [
              {
                "index" : 345,
                "yaw" : 0.156,
                "confidence" : 0.53,
              }
            ]
}
  • input_size : number of frame given as input (in a stereo file, 1 frame = 1 left sample + 1 right sample).
  • nb_sources : number of sound sources located.
  • yaw : angle of the sound source in radian as shown below:
  • confidence : an estimate of the probability that a real sound source is indeed located at the given yaw.

Code Sample

This sample assumes that you have a sound card able to record in stereo.

requirements: PyAudio

This code sample retrieve the audio stream of a recording device and display the result of the sound_localization service.

# -*- coding: utf-8 -*-
import Queue
import StringIO
import wave
import time
import sys
from pprint import pprint
import pyaudio
import numpy as np
import angus.client

CHUNK = 8192
PYAUDIO_FORMAT = pyaudio.paInt16
NUMPY_FORMAT = np.int16
TARGET_RATE = 48000
TARGET_CHANNELS = 2

def list_inputs():
    p = pyaudio.PyAudio()
    for i in range(p.get_device_count()):
        info = p.get_device_info_by_index(i)
        if info['maxInputChannels'] > 0:
            print("Device index={} name={}".format(info['index'], info['name']))

def prepare(in_data, channels, rate):
    # Extract first channel
    in_data = np.fromstring(in_data, dtype=NUMPY_FORMAT)
    in_data = np.reshape(in_data, (CHUNK, channels))

    # Re-sample if needed
    srcx = np.arange(0, CHUNK, 1)
    tgtx = np.arange(0, CHUNK, float(rate) / float(TARGET_RATE))

    print ((in_data[:,0]).size)

    left = np.interp(tgtx, srcx, in_data[:,0]).astype(NUMPY_FORMAT)
    right = np.interp(tgtx, srcx, in_data[:,1]).astype(NUMPY_FORMAT)

    print left.size
    print CHUNK

    c = np.empty((left.size + right.size), dtype=NUMPY_FORMAT)
    c[0::2] = left
    c[1::2] = right
    return c.tostring()

def main(stream_index):
    p = pyaudio.PyAudio()

    # Device configuration
    conf = p.get_device_info_by_index(stream_index)
    channels = int(conf['maxInputChannels'])
    if channels < TARGET_CHANNELS:
        raise RuntimeException("Bad device, no input channel")

    rate = int(conf['defaultSampleRate'])

    # Angus
    conn = angus.client.connect()
    service = conn.services.get_service('sound_localization', version=1)
    service.enable_session()

    # Record Process
    stream_queue = Queue.Queue()
    def chunk_callback(in_data, frame_count, time_info, status):
        in_data = prepare(in_data, channels, rate)
        stream_queue.put(in_data)
        return (in_data, pyaudio.paContinue)
    stream = p.open(format=PYAUDIO_FORMAT,
                channels=channels,
                rate=rate,
                input=True,
                frames_per_buffer=CHUNK,
                input_device_index=stream_index,
                stream_callback=chunk_callback)
    stream.start_stream()

    while True:
        nb_buffer_available = stream_queue.qsize()
        if nb_buffer_available > 0:
            print("nb buffer available = {}".format(nb_buffer_available))

        if nb_buffer_available == 0:
            time.sleep(0.01)
            continue

        data = stream_queue.get()

        buff = StringIO.StringIO()

        wf = wave.open(buff, 'wb')
        wf.setnchannels(TARGET_CHANNELS)
        wf.setsampwidth(p.get_sample_size(PYAUDIO_FORMAT))
        wf.setframerate(TARGET_RATE)
        wf.writeframes(data)
        wf.close()

        job = service.process(
            {'sound': StringIO.StringIO(buff.getvalue()), 'baseline': 0.14, 'sensitivity': 0.7})
        pprint(job.result['sources'])


    stream.stop_stream()
    stream.close()
    p.terminate()

if __name__ == "__main__":
    if len(sys.argv) < 2:
        list_inputs()
        INDEX = raw_input("Please select a device number:")
    else:
        INDEX = sys.argv[1]
    try:
        main(int(INDEX))
    except ValueError:
        print("Not a valid index")
        exit(1)