sampling
DownsamplingProcessor
DataFrameHistory
current_delay
property
readonly
returns the deviation from the targeted period time
__init__(self, topic_names=None, sampling_rate=5)
special
Downsamples a signal to a given sampling_rate [Hz], if the original rate is higher. Otherwise, the sampling rate stays the same (no upsampling). @param topic_names: the dtype to be resampled; if None, all incoming dtypes are resampled @param sampling_rate: the desired sampling rate [Hz]
Source code in multisensor_pipeline/modules/signal/sampling.py
def __init__(self, topic_names=None, sampling_rate=5):
"""
Downsamples a signal to a given sampling_rate [Hz], if the original rate is higher.
Otherwise, the sampling rate stays the same (no upsampling).
@param topic_names: the dtype to be resampled; if None, all incoming dtypes are resampled
@param sampling_rate: the desired sampling rate [Hz]
"""
super(DownsamplingProcessor, self).__init__()
self._topic_names = topic_names
self._sampling_rate = sampling_rate
self._period_time = 1. / sampling_rate
self._sample_hist = dict()
self._last_sent = dict()
self._last_received = dict()
on_update(self, frame)
Custom update routine.
Source code in multisensor_pipeline/modules/signal/sampling.py
def on_update(self, frame: MSPDataFrame) -> Optional[MSPDataFrame]:
if self._topic_names is None or frame.topic.name in self._topic_names:
hist = self._get_history(frame.topic.uuid)
hist.add(frame)
_frame = hist.get_dataframe()
if _frame is not None:
_topic = self._generate_topic(name=f"{frame.topic.name}.{self._sampling_rate}Hz",
dtype=frame.topic.dtype)
_frame.topic = _topic
return _frame