Recording
RecordingSink
RecordingSink replays a recorded json dataset
__init__(self, target, topics=None, override=False)
special
initializes RecordingSink
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target |
|
filepath |
required |
topics |
List |
Filter which topics should be recorded |
None |
override |
|
Flag to set overwrite rules |
False |
Source code in multisensor_pipeline/modules/persistence/recording.py
def __init__(self, target, topics: List = None, override=False):
"""
initializes RecordingSink
Args:
target: filepath
topics: Filter which topics should be recorded
override: Flag to set overwrite rules
"""
super(RecordingSink, self).__init__()
# set target path or file
self._target = Path(target)
if self._target.is_dir() and not self._target.exists():
self._target.mkdir(parents=True, exist_ok=True)
# set topic filter
self._topics = topics
# set override flag
self._override = override
check_topic(self, topic)
Check whether the given topic shall be captured.
Source code in multisensor_pipeline/modules/persistence/recording.py
def check_topic(self, topic):
"""Check whether the given topic shall be captured."""
if self._topics is None:
return True
return any([t == topic for t in self._topics])
on_update(self, frame)
Custom update routine.
Source code in multisensor_pipeline/modules/persistence/recording.py
def on_update(self, frame: MSPDataFrame):
if self.check_topic(frame.topic):
self.write(frame)
write(self, frame)
Custom write routine.
Source code in multisensor_pipeline/modules/persistence/recording.py
def write(self, frame):
""" Custom write routine. """
raise NotImplementedError()
JsonRecordingSink
JsonReplaySource replays a recorded json dataset
on_start(self)
Checks if file and file path is correct and override if exists
Source code in multisensor_pipeline/modules/persistence/recording.py
def on_start(self):
""" Checks if file and file path is correct and override if exists"""
assert self.target.suffix == ".json", f"The file extension must be json, but was {self.target.suffix}"
if not self.override:
assert not self.target.exists(), f"The file existis, but override is disabled ({self.target})"
self._json_file = self.target.open(mode="w")
on_stop(self)
Stops tne Sink and closes the json file
Source code in multisensor_pipeline/modules/persistence/recording.py
def on_stop(self):
""" Stops tne Sink and closes the json file """
self._json_file.close()
write(self, frame)
Writes the json file
Source code in multisensor_pipeline/modules/persistence/recording.py
def write(self, frame):
""" Writes the json file """
self._json_file.write(json.dumps(obj=frame, cls=MSPDataFrame.JsonEncoder) + '\n')