Source code for ridepy.data_structures

from collections.abc import Iterator

from numpy import inf
from abc import ABC, abstractmethod
from enum import Enum
from dataclasses import dataclass
from typing import Any, Optional, Union, Tuple, List, Callable


ID = Union[str, int]
"""Generic ID, could be vehicle ID, request ID, ..."""


[docs] @dataclass class Request: """ A request for the system to perform a task """ request_id: ID creation_timestamp: float
[docs] @dataclass class TransportationRequest(Request): """ A request for the system to perform a transportation task, through creating a route through the system given spatio-temporal constraints. """ origin: Any destination: Any # pickup_offset: float = 0 pickup_timewindow_min: float = 0 pickup_timewindow_max: float = inf delivery_timewindow_min: float = 0 delivery_timewindow_max: float = inf
[docs] @dataclass class InternalRequest(Request): """ A request for the system to perform some action at a specific location that is not directly requested by a customer """ location: Any
[docs] class StopAction(Enum): """ Representing actions that the system may perform at a specific location """ pickup = 1 dropoff = 2 internal = 3
[docs] class LocType(Enum): """ Represents the kind of location objects the simulator supports. Either of: 1. `R2LOC` (for points in :math:`\mathbb{R}^2`, holds a `Tuple[float, float]`). 2. `INT` (for e.g. graphs). Note ---- Use this for simulations using the pure python components. For simulations using cythonic components, the cython version of this enum i.e. :class:`.data_structures_cython.LocType` has to be used. """ R2LOC = 1 # points in R^2 INT = 2
[docs] @dataclass class Stop: """ The notion of an action to be performed in fulfilling a request. Attached are spatio-temporal constraints. Parameters ---------- location: location at which the stop is supposed to be serviced """ location: Any request: Request action: StopAction estimated_arrival_time: float occupancy_after_servicing: int = 0 time_window_min: float = 0 time_window_max: float = inf @property def estimated_departure_time(self): return max( self.estimated_arrival_time, self.time_window_min, )
[docs] class TransportSpace(ABC):
[docs] @abstractmethod def d(self, u, v) -> Union[int, float]: """ Return distance between points `u` and `v`. Parameters ---------- u origin coordinate v destination coordinate Returns ------- d distance """ ...
[docs] @abstractmethod def t(self, u, v) -> Union[int, float]: """ Return travel time between points `u` and `v`. Parameters ---------- u origin coordinate v destination coordinate Returns ------- d travel time """ ...
[docs] @abstractmethod def random_point(self): """ Return a random point on the space. Returns ------- random point """ ...
[docs] @abstractmethod def interp_time(self, u, v, time_to_dest) -> Tuple[Any, Union[int, float]]: """ Interpolate a location `x` between the origin `u` and the destination `v` as a function of the travel time between the unknown location and the destination `t(x, v) == time_to_dest`. Parameters ---------- u origin coordinate v destination coordinate time_to_dest travel time from the unknown location `x` to the destination `v` Returns ------- x interpolated coordinate of the unknown location `x` jump_dist remaining distance until the returned interpolated coordinate will be reached Note ---- The notion of `jump_dist` is necessary in transport spaces whose locations are *discrete* (e.g. graphs). There if someone is travelling along a trajectory, at a certain time `t` one may be "in between" two locations `w` \ and `x`. Then the "position" at time `t` is ill defined, and we must settle for the fact that its location *will be* `x` at `t+jump_time`. """ ...
[docs] @abstractmethod def interp_dist( self, origin, destination, dist_to_dest ) -> Tuple[Any, Union[int, float]]: """ Interpolate a location `x` between the origin `u` and the destination `v` as a function of the distance between the unknown location and the destination `d(x, v) == dist_to_dest`. Parameters ---------- u origin coordinate v destination coordinate dist_to_dest distance from the unknown location `x` to the destination `v` Returns ------- x interpolated coordinate of the unknown location `x` jump_dist remaining distance until the returned interpolated coordinate will be reached """ ...
@abstractmethod def asdict(self) -> dict: ... def __eq__(self, other: "TransportSpace"): return type(self) == type(other) and self.asdict() == other.asdict()
class RequestGenerator(Iterator): ... Stoplist = List[Stop] """A list of `.Stop` objects. Specifies completely the current position and future actions a vehicle will make.""" DispatcherSolution = tuple[float, Stoplist, tuple[float, float, float, float]] """cost, updated_stoplist, ( pickup_timewindow_min, pickup_timewindow_max, delivery_timewindow_min, delivery_timewindow_max, ) This is what a dispatcher returns. In case no solution is found, the cost is :math:`\infty` and the timewindow variables are ``None``. """ SingleVehicleSolution = tuple[ID, float, tuple[float, float, float, float]] """vehicle_id, cost, ( pickup_timewindow_min, pickup_timewindow_max, delivery_timewindow_min, delivery_timewindow_max, ) This is what `VehicleState.handle_transportation_request_single_vehicle` returns. In case no solution is found, the cost is :math:`\infty` and the timewindow variables are `None`. """ Dispatcher = Callable[ [ TransportationRequest, Stoplist, TransportSpace, int, ], DispatcherSolution, ] """Defines the `Dispatcher` interface. Actual dispatchers are implemented in `.util.dispatchers`.""" Location = Union[int, float, tuple[float]] class DistanceDistribution(ABC): """ Abstract base class for specifying distance distributions to use, e.g., during request generation. """ def __init__(self, *args, **kwargs): ... @abstractmethod def sample(self) -> float: """ Sample a distance from the distribution. """ ... @property @abstractmethod def mean(self) -> float: """ Return the mean of the distribution. """ ... @property @abstractmethod def std(self) -> float: """ Return the standard deviation of the distribution. """ ... @abstractmethod def __repr__(self): ... @abstractmethod def __str__(self): ... @abstractmethod def asdict(self) -> dict[str, Any]: """ Return a dictionary representation of the distribution. """ ...