Base Sink
BaseSink
Base class for data sinks.
__init__(self, dropout=False)
special
Initializes the worker thread and a queue that will receive new samples from sources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dropout |
Union[bool, float] |
Set the max age before elements of the queue are dropped |
False |
Source code in multisensor_pipeline/modules/base/base.py
def __init__(self, dropout: Union[bool, float] = False):
"""
Initializes the worker thread and a queue that will receive new samples from sources.
Args:
dropout: Set the max age before elements of the queue are dropped
"""
super().__init__()
self._queue = Queue()
self._active_sources = {}
self._dropout = dropout # in seconds
if dropout and isinstance(dropout, bool):
self._dropout = 5
_handle_control_message(self, frame)
private
Handles incoming control messages from the observed sources (e.g. MSPControlMessage.END_OF_STREAM )
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
MSPDataFrame |
frame containing MSPControlMessage |
required |
Source code in multisensor_pipeline/modules/base/base.py
def _handle_control_message(self, frame: MSPDataFrame):
"""
Handles incoming control messages from the observed sources (e.g. MSPControlMessage.END_OF_STREAM )
Args:
frame: frame containing MSPControlMessage
"""
if isinstance(frame, MSPControlMessage):
logger.debug(f"[CONTROL] {frame.topic.source_uuid} -> {frame.message} -> {self.uuid}")
if frame.message == MSPControlMessage.END_OF_STREAM:
if frame.topic.source_uuid in self._active_sources:
# set source to inactive
self._active_sources[frame.topic.source_uuid] = False
# if no active source is left
if not any(self._active_sources.values()):
self.stop(blocking=False)
else:
logger.warning(f"unhandled control message: {frame.message}")
return True
return False
_worker(self)
private
Sink worker function: handles the incoming Dataframes
Source code in multisensor_pipeline/modules/base/base.py
def _worker(self):
"""
Sink worker function: handles the incoming Dataframes
"""
while self._active:
frame = self._queue.get()
if self._handle_control_message(frame):
continue
if self._profiling:
self._stats.add_frame(frame, MSPModuleStats.Direction.IN)
self.on_update(frame)
add_source(self, source)
Add a source module to be observed
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
BaseModule |
Set the max age before elements of the queue are dropped |
required |
Source code in multisensor_pipeline/modules/base/base.py
def add_source(self, source: BaseModule):
"""
Add a source module to be observed
Args:
source: Set the max age before elements of the queue are dropped
"""
self._active_sources[source.uuid] = True
on_update(self, frame)
Custom update routine.
Source code in multisensor_pipeline/modules/base/base.py
def on_update(self, frame: MSPDataFrame):
""" Custom update routine. """
raise NotImplementedError()