Source code for jupedsim.stages
# Copyright © 2012-2023 Forschungszentrum Jülich GmbH
# SPDX-License-Identifier: LGPL-3.0-or-later
from enum import Enum
import jupedsim.native as py_jps
[docs]class NotifiableQueueStage:
"""Models a queue where agents can wait until notified.
The queues waiting positions are predefined and agents will wait on the
first empty position. When agents leave the queue the remaining waiting
agents move up. If there are more agents trying to enqueue than there are
waiting positions defined the overflow agents will wait at the last waiting
position in the queue.
.. note::
This type is used to interact with an already created stage. To create
a stage of this type see :class:`~jupedsim.simulation.Simulation`
"""
def __init__(self, backing):
self._obj = backing
[docs] def count_targeting(self) -> int:
"""
Returns:
Number of agents currently targeting this stage.
"""
return self._obj.count_targeting()
[docs] def count_enqueued(self) -> int:
"""
Returns:
Number of agents currently enqueued at this stage.
"""
return self._obj.count_enqueued()
[docs] def pop(self, count) -> None:
"""Pop `count` number of agents from the front of the queue.
Arguments:
count: Number of agents to be popped from the front of the
queue
"""
return self._obj.pop(count)
[docs] def enqueued(self) -> list[int]:
"""Access the ids of all enqueued agents in order they are waiting at
the queue.
Returns:
list of enqueued agents ordered by their position in the queue.
"""
return self._obj.enqueued()
[docs]class WaitingSetState(Enum):
[docs] ACTIVE = py_jps.WaitingSetState.Active
[docs] INACTIVE = py_jps.WaitingSetState.Inactive
[docs]class WaitingSetStage:
"""Models a set of waiting positions that can be activated or deactivated.
Similar as with a :class:`NotifiableQueueStage` there needs to be a set of
waiting positions defined which will be filled in order of definition. The
:class:`WaitingSetStage` now can be active or inactive. If active agents will fill
waiting positions until all are occupied. Additional agents will all try to
wait at the last defined waiting position. In inactive state the
:class:`WaitingSetStage` acts as a simple waypoint at the position of the first
defined waiting position.
"""
def __init__(self, backing) -> None:
self._obj = backing
[docs] def count_targeting(self) -> int:
"""
Returns:
Number of agents currently targeting this stage.
"""
return self._obj.count_targeting()
[docs] def count_waiting(self) -> int:
"""
Returns:
Number of agents currently waiting at this stage.
"""
return self._obj.count_waiting()
[docs] def waiting(self) -> list[int]:
"""Access the ids of all waiting agents in order they are waiting.
Returns:
list of waiting agents ordered by their position.
"""
return self._obj.waiting()
@property
[docs] def state(self) -> WaitingSetState:
"""State of the set.
Can be active or inactive, see :class:`WaitingSetState`
"""
return WaitingSetState(self._obj.state)
@state.setter
def state(self, new_state: WaitingSetState):
self._obj.state = new_state.value
[docs]class WaypointStage:
"""Models a waypoint.
A waypoint is considered to be reached if an agent is within the specified
distance to the waypoint.
"""
def __init__(self, backing) -> None:
self._obj = backing
[docs] def count_targeting(self) -> int:
"""Returns:
Number of agents currently targeting this stage.
"""
return self._obj.count_targeting()
[docs]class ExitStage:
"""Models an exit.
Agents entering the polygon defining the exit will be removed at the
beginning of the next iteration, i.e. agents will be inside the specified
polygon for one frame.
"""
def __init__(self, backing):
self._obj = backing
[docs] def count_targeting(self):
"""
Returns:
Number of agents currently targeting this stage.
"""
return self._obj.count_targeting()