Skip to content

sampling

DownsamplingProcessor

DataFrameHistory

current_delay property readonly

returns the deviation from the targeted period time

__init__(self, topic_names=None, sampling_rate=5) special

Downsamples a signal to a given sampling_rate [Hz], if the original rate is higher. Otherwise, the sampling rate stays the same (no upsampling). @param topic_names: the dtype to be resampled; if None, all incoming dtypes are resampled @param sampling_rate: the desired sampling rate [Hz]

Source code in multisensor_pipeline/modules/signal/sampling.py
def __init__(self, topic_names=None, sampling_rate=5):
    """
    Downsamples a signal to a given sampling_rate [Hz], if the original rate is higher.
    Otherwise, the sampling rate stays the same (no upsampling).
    @param topic_names: the dtype to be resampled; if None, all incoming dtypes are resampled
    @param sampling_rate: the desired sampling rate [Hz]
    """
    super(DownsamplingProcessor, self).__init__()
    self._topic_names = topic_names
    self._sampling_rate = sampling_rate
    self._period_time = 1. / sampling_rate

    self._sample_hist = dict()
    self._last_sent = dict()
    self._last_received = dict()

on_update(self, frame)

Custom update routine.

Source code in multisensor_pipeline/modules/signal/sampling.py
def on_update(self, frame: MSPDataFrame) -> Optional[MSPDataFrame]:
    if self._topic_names is None or frame.topic.name in self._topic_names:
        hist = self._get_history(frame.topic.uuid)
        hist.add(frame)
        _frame = hist.get_dataframe()
        if _frame is not None:
            _topic = self._generate_topic(name=f"{frame.topic.name}.{self._sampling_rate}Hz",
                                          dtype=frame.topic.dtype)
            _frame.topic = _topic
            return _frame