Skip to content

Base Module

BaseModule

Base Class for all Modules

active property readonly

Returns if the module is activ

name property readonly

Returns the name of the actual subclass

stats: MSPModuleStats property readonly

Returns real-time profiling information.

uuid property readonly

Returns the uuuid of the module

__init__(self, profiling=False) special

Initialize the BaseModule

Parameters:

Name Type Description Default
profiling

Option to enable profiling

False
Source code in multisensor_pipeline/modules/base/base.py
def __init__(self, profiling=False):
    """
    Initialize the BaseModule
    Args:
       profiling: Option to enable profiling
    """
    self._uuid = uuid.uuid1()
    self._thread = Thread(target=self._worker)
    self._profiling = profiling
    self._stats = MSPModuleStats()
    self._active = False

_worker(self) private

Main worker function (async)

Source code in multisensor_pipeline/modules/base/base.py
def _worker(self):
    """ Main worker function (async) """
    raise NotImplementedError()

on_start(self)

Custom initialization

Source code in multisensor_pipeline/modules/base/base.py
def on_start(self):
    """ Custom initialization """
    pass

on_stop(self)

Custom clean-up

Source code in multisensor_pipeline/modules/base/base.py
def on_stop(self):
    """ Custom clean-up """
    pass

on_update(self)

Custom update routine.

Source code in multisensor_pipeline/modules/base/base.py
def on_update(self):
    """ Custom update routine. """
    raise NotImplementedError()

start(self)

Starts the module.

Source code in multisensor_pipeline/modules/base/base.py
def start(self):
    """
    Starts the module.
    """
    logger.debug("starting: {}".format(self.uuid))
    self._active = True
    self.on_start()
    self._thread.start()

stop(self, blocking=True)

Stops the module.

Source code in multisensor_pipeline/modules/base/base.py
def stop(self, blocking=True):
    """ Stops the module. """
    logger.debug("stopping: {}".format(self.uuid))
    self._active = False
    if blocking:
        self._thread.join()
    self.on_stop()