Base Source
BaseSource
Base class for data sources.
__init__(self)
special
Initializes the worker thread and a queue list for communication with observers that listen to that source.
Source code in multisensor_pipeline/modules/base/base.py
def __init__(self):
"""
Initializes the worker thread and a queue list for communication with observers that listen to that source.
"""
super().__init__()
self._sinks = []
_notify(self, frame)
private
Notifies all observers that there's a new dataframe
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
Optional[multisensor_pipeline.dataframe.dataframe.MSPDataFrame] |
the payload as an instance of MSPDataFrame |
required |
Source code in multisensor_pipeline/modules/base/base.py
def _notify(self, frame: Optional[MSPDataFrame]):
"""
Notifies all observers that there's a new dataframe
Args:
frame: the payload as an instance of MSPDataFrame
"""
if frame is None:
return
assert isinstance(frame, MSPDataFrame), "You must use a MSPDataFrame instance to wrap your data."
for sink in self._sinks:
sink.put(frame)
if self._profiling:
self._stats.add_frame(frame, MSPModuleStats.Direction.OUT)
_worker(self)
private
Source worker function: notify observer when source update function returns a DataFrame
Source code in multisensor_pipeline/modules/base/base.py
def _worker(self):
""" Source worker function: notify observer when source update function returns a DataFrame """
while self._active:
self._notify(self.on_update())
add_observer(self, sink)
Register a Sink or Queue as an observer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sink |
|
A thread-safe Queue object or Sink [or any class that implements put(tuple)] |
required |
Source code in multisensor_pipeline/modules/base/base.py
def add_observer(self, sink):
"""
Register a Sink or Queue as an observer.
Args:
sink: A thread-safe Queue object or Sink [or any class that implements put(tuple)]
"""
if isinstance(sink, Queue) or isinstance(sink, MPQueue):
self._sinks.append(sink)
return
assert isinstance(sink, BaseSink) or isinstance(sink, BaseProcessor)
sink.add_source(self)
self._sinks.append(sink)
on_update(self)
Custom update routine.
Source code in multisensor_pipeline/modules/base/base.py
def on_update(self) -> Optional[MSPDataFrame]:
""" Custom update routine. """
raise NotImplementedError()
stop(self, blocking=True)
Stops the source and sends a MSPControlMessage.END_OF_STREAM all observers it stopped
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blocking |
bool |
True |
Source code in multisensor_pipeline/modules/base/base.py
def stop(self, blocking: bool = True):
"""
Stops the source and sends a MSPControlMessage.END_OF_STREAM all observers it stopped
Args:
blocking:
"""
self._notify(MSPControlMessage(message=MSPControlMessage.END_OF_STREAM, source=self))
super(BaseSource, self).stop(blocking=blocking)