Skip to content

Recording

RecordingSink

RecordingSink replays a recorded json dataset

__init__(self, target, topics=None, override=False) special

initializes RecordingSink

Parameters:

Name Type Description Default
target

filepath

required
topics List

Filter which topics should be recorded

None
override

Flag to set overwrite rules

False
Source code in multisensor_pipeline/modules/persistence/recording.py
def __init__(self, target, topics: List = None, override=False):
    """
    initializes RecordingSink
    Args:
        target: filepath
        topics: Filter which topics should be recorded
        override: Flag to set overwrite rules
    """
    super(RecordingSink, self).__init__()

    # set target path or file
    self._target = Path(target)
    if self._target.is_dir() and not self._target.exists():
        self._target.mkdir(parents=True, exist_ok=True)
    # set topic filter
    self._topics = topics
    # set override flag
    self._override = override

check_topic(self, topic)

Check whether the given topic shall be captured.

Source code in multisensor_pipeline/modules/persistence/recording.py
def check_topic(self, topic):
    """Check whether the given topic shall be captured."""
    if self._topics is None:
        return True
    return any([t == topic for t in self._topics])

on_update(self, frame)

Custom update routine.

Source code in multisensor_pipeline/modules/persistence/recording.py
def on_update(self, frame: MSPDataFrame):
    if self.check_topic(frame.topic):
        self.write(frame)

write(self, frame)

Custom write routine.

Source code in multisensor_pipeline/modules/persistence/recording.py
def write(self, frame):
    """ Custom write routine. """
    raise NotImplementedError()

JsonRecordingSink

JsonReplaySource replays a recorded json dataset

on_start(self)

Checks if file and file path is correct and override if exists

Source code in multisensor_pipeline/modules/persistence/recording.py
def on_start(self):
    """ Checks if file and file path is correct and override if exists"""
    assert self.target.suffix == ".json", f"The file extension must be json, but was {self.target.suffix}"
    if not self.override:
        assert not self.target.exists(), f"The file existis, but override is disabled ({self.target})"
    self._json_file = self.target.open(mode="w")

on_stop(self)

Stops tne Sink and closes the json file

Source code in multisensor_pipeline/modules/persistence/recording.py
def on_stop(self):
    """ Stops tne Sink and closes the json file """
    self._json_file.close()

write(self, frame)

Writes the json file

Source code in multisensor_pipeline/modules/persistence/recording.py
def write(self, frame):
    """ Writes the json file """
    self._json_file.write(json.dumps(obj=frame, cls=MSPDataFrame.JsonEncoder) + '\n')