[docs]classSqliteTrajectoryWriter(TrajectoryWriter):"""Write trajectory data into a sqlite db"""def__init__(self,*,output_file:Path,every_nth_frame:int=4)->None:"""SqliteTrajectoryWriter constructor Args: output_file : pathlib.Path name of the output file. Note: the file will not be written until the first call to 'begin_writing' every_nth_frame: int indicates interval between writes, 1 means every frame, 5 every 5th Returns: SqliteTrajectoryWriter """self._output_file=output_fileifevery_nth_frame<1:raiseTrajectoryWriter.Exception("'every_nth_frame' has to be > 0")self._every_nth_frame=every_nth_frameself._con=sqlite3.connect(self._output_file,isolation_level=None)
[docs]defbegin_writing(self,simulation:Simulation)->None:"""Begin writing trajectory data. This method is intended to handle all data writing that has to be done once before the trajectory data can be written. E.g. Meta information such as framerate etc... """fps=1/simulation.delta_time()/self._every_nth_framegeometry=simulation.get_geometry()geo=shapely.to_wkt(shapely.Polygon(geometry.boundary(),holes=geometry.holes()),rounding_precision=-1,)cur=self._con.cursor()try:cur.execute("BEGIN")cur.execute("DROP TABLE IF EXISTS trajectory_data")cur.execute("CREATE TABLE trajectory_data ("" frame INTEGER NOT NULL,"" id INTEGER NOT NULL,"" pos_x REAL NOT NULL,"" pos_y REAL NOT NULL,"" ori_x REAL NOT NULL,"" ori_y REAL NOT NULL)")cur.execute("DROP TABLE IF EXISTS metadata")cur.execute("CREATE TABLE metadata(key TEXT NOT NULL UNIQUE PRIMARY KEY, value TEXT NOT NULL)")cur.executemany("INSERT INTO metadata VALUES(?, ?)",(("version","1"),("fps",fps)),)cur.execute("DROP TABLE IF EXISTS geometry")cur.execute("CREATE TABLE geometry(wkt TEXT NOT NULL)")cur.execute("INSERT INTO geometry VALUES(?)",(geo,))cur.execute("CREATE INDEX frame_id_idx ON trajectory_data(frame, id)")cur.execute("COMMIT")exceptsqlite3.Errorase:cur.execute("ROLLBACK")raiseTrajectoryWriter.Exception(f"Error creating database: {e}")
[docs]defwrite_iteration_state(self,simulation:Simulation)->None:"""Write trajectory data of one simulation iteration. This method is intended to handle serialization of the trajectory data of a single iteration. """ifnotself._con:raiseTrajectoryWriter.Exception("Database not opened.")iteration=simulation.iteration_count()ifiteration%self.every_nth_frame()!=0:returnframe=iteration/self.every_nth_frame()cur=self._con.cursor()try:cur.execute("BEGIN")frame_data=[(frame,agent.id,agent.position[0],agent.position[1],agent.orientation[0],agent.orientation[1],)foragentinsimulation.agents()]cur.executemany("INSERT INTO trajectory_data VALUES(?, ?, ?, ?, ?, ?)",frame_data,)res=cur.execute("SELECT MIN(pos_x), MAX(pos_x), MIN(pos_y), MAX(pos_y) FROM trajectory_data")xmin,xmax,ymin,ymax=res.fetchone()old_xmin=self._x_min(cur)old_xmax=self._x_max(cur)old_ymin=self._y_min(cur)old_ymax=self._y_max(cur)cur.executemany("INSERT OR REPLACE INTO metadata(key, value) VALUES(?,?)",[("xmin",str(min(xmin,float(old_xmin)))),("xmax",str(max(xmax,float(old_xmax)))),("ymin",str(min(ymin,float(old_ymin)))),("ymax",str(max(ymax,float(old_ymax)))),],)cur.execute("COMMIT")exceptsqlite3.Errorase:cur.execute("ROLLBACK")raiseTrajectoryWriter.Exception(f"Error writing to database: {e}")
def_value_or_default(self,cur,key,default:float|int|str):res=cur.execute("SELECT value FROM metadata WHERE key = ?",(key,)).fetchone()ifresisNone:returndefaultelse:text=res[0]returntype(default)(text)def_x_min(self,cur):returnself._value_or_default(cur,"xmin",float("inf"))def_x_max(self,cur):returnself._value_or_default(cur,"xmax",float("-inf"))def_y_min(self,cur):returnself._value_or_default(cur,"ymin",float("inf"))def_y_max(self,cur):returnself._value_or_default(cur,"ymax",float("-inf"))