Source code for jupedsim.recording
# Copyright © 2012-2023 Forschungszentrum Jülich GmbH
# SPDX-License-Identifier: LGPL-3.0-or-later
import sqlite3
from dataclasses import dataclass
import shapely
from jupedsim.internal.aabb import AABB
@dataclass
[docs]class RecordingAgent:
"""Data for a single agent at a single frame."""
[docs] position: tuple[float, float]
[docs] orientation: tuple[float, float]
@dataclass
[docs]class RecordingFrame:
"""A single frame from the simulation."""
[docs] agents: list[RecordingAgent]
[docs]class Recording:
__supported_database_version = 1
"""Provides access to a simulation recording in a sqlite database"""
def __init__(self, db_connection_str: str, uri=False) -> None:
self.db = sqlite3.connect(
db_connection_str, uri=uri, isolation_level=None
)
self._check_version_compatible()
[docs] def frame(self, index: int) -> RecordingFrame:
"""Access a single frame of the recording.
Arguments:
index (int): index of the frame to access.
Returns:
A single frame.
"""
def agent_row(cursor, row):
return RecordingAgent(row[0], (row[1], row[2]), (row[3], row[4]))
cur = self.db.cursor()
cur.row_factory = agent_row
res = cur.execute(
"SELECT id, pos_x, pos_y, ori_x, ori_y FROM trajectory_data WHERE frame == (?) ORDER BY id ASC",
(index,),
)
return RecordingFrame(index, res.fetchall())
[docs] def geometry(self) -> shapely.GeometryCollection:
"""Access this recordings' geometry.
Returns:
walkable area of the simulation that created this recording.
"""
cur = self.db.cursor()
res = cur.execute("SELECT wkt FROM geometry")
wkt_str = res.fetchone()[0]
return shapely.from_wkt(wkt_str)
[docs] def bounds(self) -> AABB:
"""Get bounds of the position data contained in this recording."""
cur = self.db.cursor()
res = cur.execute("SELECT value FROM metadata WHERE key == 'xmin'")
xmin = float(res.fetchone()[0])
res = cur.execute("SELECT value FROM metadata WHERE key == 'xmax'")
xmax = float(res.fetchone()[0])
res = cur.execute("SELECT value FROM metadata WHERE key == 'ymin'")
ymin = float(res.fetchone()[0])
res = cur.execute("SELECT value FROM metadata WHERE key == 'ymax'")
ymax = float(res.fetchone()[0])
return AABB(xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax)
@property
[docs] def num_frames(self) -> int:
"""Access the number of frames stored in this recording.
Returns:
Number of frames in this recording.
"""
cur = self.db.cursor()
res = cur.execute("SELECT MAX(frame) FROM trajectory_data")
return res.fetchone()[0]
@property
[docs] def fps(self) -> float:
"""How many frames are stored per second.
Returns:
Frames per second of this recording.
"""
cur = self.db.cursor()
res = cur.execute("SELECT value from metadata WHERE key == 'fps'")
return float(res.fetchone()[0])
def _check_version_compatible(self) -> None:
cur = self.db.cursor()
res = cur.execute("SELECT value FROM metadata WHERE key == 'version'")
version_string = res.fetchone()[0]
try:
version_in_database = int(version_string)
if version_in_database != self.__supported_database_version:
raise Exception(
f"Incompatible database version. The database supplied is version {version_in_database}. "
f"This Program supports version {self.__supported_database_version}"
)
except ValueError:
raise Exception(
f"Database error, metadata version not an integer. Value found: {version_string}"
)