Skip to content

Base Source

BaseSource

Base class for data sources.

__init__(self) special

Initializes the worker thread and a queue list for communication with observers that listen to that source.

Source code in multisensor_pipeline/modules/base/base.py
def __init__(self):
    """
    Initializes the worker thread and a queue list for communication with observers that listen to that source.
    """
    super().__init__()
    self._sinks = []

_notify(self, frame) private

Notifies all observers that there's a new dataframe

Parameters:

Name Type Description Default
frame Optional[multisensor_pipeline.dataframe.dataframe.MSPDataFrame]

the payload as an instance of MSPDataFrame

required
Source code in multisensor_pipeline/modules/base/base.py
def _notify(self, frame: Optional[MSPDataFrame]):
    """
    Notifies all observers that there's a new dataframe

    Args:
        frame: the payload as an instance of MSPDataFrame
    """
    if frame is None:
        return

    assert isinstance(frame, MSPDataFrame), "You must use a MSPDataFrame instance to wrap your data."

    for sink in self._sinks:
        sink.put(frame)

    if self._profiling:
        self._stats.add_frame(frame, MSPModuleStats.Direction.OUT)

_worker(self) private

Source worker function: notify observer when source update function returns a DataFrame

Source code in multisensor_pipeline/modules/base/base.py
def _worker(self):
    """ Source worker function: notify observer when source update function returns a DataFrame """
    while self._active:
        self._notify(self.on_update())

add_observer(self, sink)

Register a Sink or Queue as an observer.

Parameters:

Name Type Description Default
sink

A thread-safe Queue object or Sink [or any class that implements put(tuple)]

required
Source code in multisensor_pipeline/modules/base/base.py
def add_observer(self, sink):
    """
    Register a Sink or Queue as an observer.

    Args:
        sink: A thread-safe Queue object or Sink [or any class that implements put(tuple)]
    """
    if isinstance(sink, Queue) or isinstance(sink, MPQueue):
        self._sinks.append(sink)
        return

    assert isinstance(sink, BaseSink) or isinstance(sink, BaseProcessor)
    sink.add_source(self)
    self._sinks.append(sink)

on_update(self)

Custom update routine.

Source code in multisensor_pipeline/modules/base/base.py
def on_update(self) -> Optional[MSPDataFrame]:
    """ Custom update routine. """
    raise NotImplementedError()

stop(self, blocking=True)

Stops the source and sends a MSPControlMessage.END_OF_STREAM all observers it stopped

Parameters:

Name Type Description Default
blocking bool True
Source code in multisensor_pipeline/modules/base/base.py
def stop(self, blocking: bool = True):
    """
    Stops the source and sends a MSPControlMessage.END_OF_STREAM all observers it stopped

    Args:
        blocking:
    """
    self._notify(MSPControlMessage(message=MSPControlMessage.END_OF_STREAM, source=self))
    super(BaseSource, self).stop(blocking=blocking)