Skip to content

Base Sink

BaseSink

Base class for data sinks.

__init__(self, dropout=False) special

Initializes the worker thread and a queue that will receive new samples from sources.

Parameters:

Name Type Description Default
dropout Union[bool, float]

Set the max age before elements of the queue are dropped

False
Source code in multisensor_pipeline/modules/base/base.py
def __init__(self, dropout: Union[bool, float] = False):
    """
    Initializes the worker thread and a queue that will receive new samples from sources.

    Args:
       dropout: Set the max age before elements of the queue are dropped
    """
    super().__init__()
    self._queue = Queue()
    self._active_sources = {}
    self._dropout = dropout  # in seconds
    if dropout and isinstance(dropout, bool):
        self._dropout = 5

_handle_control_message(self, frame) private

Handles incoming control messages from the observed sources (e.g. MSPControlMessage.END_OF_STREAM )

Parameters:

Name Type Description Default
frame MSPDataFrame

frame containing MSPControlMessage

required
Source code in multisensor_pipeline/modules/base/base.py
def _handle_control_message(self, frame: MSPDataFrame):
    """
    Handles incoming control messages from the observed sources (e.g. MSPControlMessage.END_OF_STREAM )

    Args:
       frame: frame containing MSPControlMessage
    """
    if isinstance(frame, MSPControlMessage):
        logger.debug(f"[CONTROL] {frame.topic.source_uuid} -> {frame.message} -> {self.uuid}")
        if frame.message == MSPControlMessage.END_OF_STREAM:
            if frame.topic.source_uuid in self._active_sources:
                # set source to inactive
                self._active_sources[frame.topic.source_uuid] = False
                # if no active source is left
            if not any(self._active_sources.values()):
                self.stop(blocking=False)
        else:
            logger.warning(f"unhandled control message: {frame.message}")
        return True
    return False

_worker(self) private

Sink worker function: handles the incoming Dataframes

Source code in multisensor_pipeline/modules/base/base.py
def _worker(self):
    """
    Sink worker function: handles the incoming Dataframes
    """
    while self._active:
        frame = self._queue.get()

        if self._handle_control_message(frame):
            continue

        if self._profiling:
            self._stats.add_frame(frame, MSPModuleStats.Direction.IN)

        self.on_update(frame)

add_source(self, source)

Add a source module to be observed

Parameters:

Name Type Description Default
source BaseModule

Set the max age before elements of the queue are dropped

required
Source code in multisensor_pipeline/modules/base/base.py
def add_source(self, source: BaseModule):
    """
    Add a source module to be observed

    Args:
       source: Set the max age before elements of the queue are dropped
    """
    self._active_sources[source.uuid] = True

on_update(self, frame)

Custom update routine.

Source code in multisensor_pipeline/modules/base/base.py
def on_update(self, frame: MSPDataFrame):
    """ Custom update routine. """
    raise NotImplementedError()