Source code for jupedsim.recording
# Copyright © 2012-2024 Forschungszentrum Jülich GmbH
# SPDX-License-Identifier: LGPL-3.0-or-later
import sqlite3
from dataclasses import dataclass
import shapely
from jupedsim.internal.aabb import AABB
@dataclass
[docs]class RecordingAgent:
    """Data for a single agent at a single frame."""
[docs]    position: tuple[float, float] 
[docs]    orientation: tuple[float, float]  
@dataclass
[docs]class RecordingFrame:
    """A single frame from the simulation."""
[docs]    agents: list[RecordingAgent]  
[docs]class Recording:
    __supported_database_version = 1
    """Provides access to a simulation recording in a sqlite database"""
    def __init__(self, db_connection_str: str, uri=False) -> None:
        self.db = sqlite3.connect(
            db_connection_str, uri=uri, isolation_level=None
        )
        self._check_version_compatible()
[docs]    def frame(self, index: int) -> RecordingFrame:
        """Access a single frame of the recording.
        Arguments:
            index (int): index of the frame to access.
        Returns:
            A single frame.
        """
        def agent_row(cursor, row):
            return RecordingAgent(row[0], (row[1], row[2]), (row[3], row[4]))
        cur = self.db.cursor()
        cur.row_factory = agent_row
        res = cur.execute(
            "SELECT id, pos_x, pos_y, ori_x, ori_y FROM trajectory_data WHERE frame == (?) ORDER BY id ASC",
            (index,),
        )
        return RecordingFrame(index, res.fetchall()) 
[docs]    def geometry(self) -> shapely.GeometryCollection:
        """Access this recordings' geometry.
        Returns:
            walkable area of the simulation that created this recording.
        """
        cur = self.db.cursor()
        res = cur.execute("SELECT wkt FROM geometry")
        wkt_str = res.fetchone()[0]
        return shapely.from_wkt(wkt_str) 
[docs]    def bounds(self) -> AABB:
        """Get bounds of the position data contained in this recording."""
        cur = self.db.cursor()
        res = cur.execute("SELECT value FROM metadata WHERE key == 'xmin'")
        xmin = float(res.fetchone()[0])
        res = cur.execute("SELECT value FROM metadata WHERE key == 'xmax'")
        xmax = float(res.fetchone()[0])
        res = cur.execute("SELECT value FROM metadata WHERE key == 'ymin'")
        ymin = float(res.fetchone()[0])
        res = cur.execute("SELECT value FROM metadata WHERE key == 'ymax'")
        ymax = float(res.fetchone()[0])
        return AABB(xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax) 
    @property
[docs]    def num_frames(self) -> int:
        """Access the number of frames stored in this recording.
        Returns:
            Number of frames in this recording.
        """
        cur = self.db.cursor()
        res = cur.execute("SELECT MAX(frame) FROM trajectory_data")
        return res.fetchone()[0] 
    @property
[docs]    def fps(self) -> float:
        """How many frames are stored per second.
        Returns:
            Frames per second of this recording.
        """
        cur = self.db.cursor()
        res = cur.execute("SELECT value from metadata WHERE key == 'fps'")
        return float(res.fetchone()[0]) 
    def _check_version_compatible(self) -> None:
        cur = self.db.cursor()
        res = cur.execute("SELECT value FROM metadata WHERE key == 'version'")
        version_string = res.fetchone()[0]
        try:
            version_in_database = int(version_string)
            if version_in_database != self.__supported_database_version:
                raise Exception(
                    f"Incompatible database version. The database supplied is version {version_in_database}. "
                    f"This Program supports version {self.__supported_database_version}"
                )
        except ValueError:
            raise Exception(
                f"Database error, metadata version not an integer. Value found: {version_string}"
            )