Microphone
Microphone
Microphone Source for live audio recording of a connected microphone
__init__(self, device, format=8, channels=2, sampling_rate=44100, chunk_size=1024)
special
Initialize the Source
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device |
str |
Device id of the microphone |
required |
format |
|
PyAudio format specification |
8 |
channels |
int |
Number of channels of the device |
2 |
sampling_rate |
int |
The audio sampling rate |
44100 |
chunk_size |
int |
Size of the chunks of the recordings |
1024 |
Source code in multisensor_pipeline/modules/audio/microphone.py
def __init__(self, device: str, format=pyaudio.paInt16, channels: int = 2, sampling_rate: int = 44100,
chunk_size: int = 1024):
"""
Initialize the Source
Args:
device: Device id of the microphone
format: PyAudio format specification
channels: Number of channels of the device
sampling_rate: The audio sampling rate
chunk_size: Size of the chunks of the recordings
"""
super(Microphone, self).__init__()
self.device = device
self.format = format
self.channels = channels
self.sampling_rate = sampling_rate
self.chunk_size = chunk_size
self._mic = pyaudio.PyAudio()
self._stream = self._mic.open(format=self.format,
channels=self.channels,
rate=self.sampling_rate,
input=True,
frames_per_buffer=self.chunk_size)
on_stop(self)
Stops the Microphone source and closes the stream
Source code in multisensor_pipeline/modules/audio/microphone.py
def on_stop(self):
"""
Stops the Microphone source and closes the stream
"""
self._stream.stop_stream()
self._stream.close()
self._mic.terminate()
on_update(self)
Sends chunks of the audio recording
Source code in multisensor_pipeline/modules/audio/microphone.py
def on_update(self) -> Optional[MSPDataFrame]:
"""
Sends chunks of the audio recording
"""
data = self._stream.read(self.chunk_size)
return MSPDataFrame(topic=self._generate_topic(name="audio"), chunk=data)