Source code for ridepy.fleet_state

from __future__ import annotations

from collections import defaultdict

import functools as ft
import itertools as it
import operator as op

import numpy as np


from abc import ABC, abstractmethod
from typing import (
    Dict,
    SupportsFloat,
    Iterator,
    Union,
    Tuple,
    Iterable,
    Type,
)

import logging

from .util import supress_stoplist_extraction_warning

logger = logging.getLogger(__name__)

from ridepy.data_structures import (
    Stop,
    Request,
    TransportationRequest as pyTransportationRequest,
    InternalRequest as pyInternalRequest,
    SingleVehicleSolution,
    Dispatcher,
    StopAction,
    InternalRequest,
    TransportationRequest,
)
from .events import (
    Event,
    RequestEvent,
    StopEvent,
)
from ridepy.data_structures_cython import (
    TransportationRequest as CyTransportationRequest,
    InternalRequest as CyInternalRequest,
    StopAction as CyStopAction,
    Stop as CyStop,
)

from ridepy.vehicle_state import VehicleState
from ridepy.vehicle_state_cython import VehicleState as CyVehicleState

from ridepy.data_structures import TransportSpace
from ridepy.util.spaces_cython import TransportSpace as CyTransportSpace


[docs] class FleetState(ABC): """ Provides the interface for running the whole simulation. The fleet state is stored as a dictionary of (vehicle_id, VehicleState) pairs. Exposes the methods fast_forward, handle_request and simulate. To gain maximum computational efficiency, one can: * Subclass VehicleState and implement the main methods in Cython. * Implement fast_forward and handle_request using distributed computing e.g. MPI """ def __init__( self, *, initial_locations: Union[Dict[int, int], Dict[int, Tuple[float, ...]]], vehicle_state_class: Union[Type[VehicleState], Type[CyVehicleState]], space: Union[TransportSpace, CyTransportSpace], dispatcher: Dispatcher, seat_capacities: Union[int, Dict[int, int]], ): """ Create a `FleetState`, holding a number of `VehicleState` objects. Parameters ---------- initial_locations Dictionary with vehicle ids as keys and initial locations as values. vehicle_state_class The vehicle state class to be used. Can be either `.vehicle_state.VehicleState` (pure pythonic) or `.vehicle_state_cython.VehicleState` (implemented in cython). space The `.data_structures.TransportSpace` (e.g. Euclidean2D, Graph) in which the simulation will be run. dispatcher The dispatching algorithm that maps a (stoplist, TransportationRequest) pair into a cost and new stoplist. See :doc:`introduction` for more details. seat_capacities Integers denoting the maximum number of persons a vehicle can hold. Either a dictionary with vehicle ids as keys or a positive integer (Causes each vehicle to have the same capacity). Note ---- * Explain the graph nodes have in notes and why. * define LocType in a central place and motivate why we have it. """ if issubclass(vehicle_state_class, VehicleState): StopCls = Stop StopActionCls = StopAction InternalRequestCls = InternalRequest TransportationRequestCls = TransportationRequest assert isinstance(space, TransportSpace), "unsuitable transport space" logger.debug( f"Creating FleetState with vehicle state class {vehicle_state_class}" ) elif issubclass(vehicle_state_class, CyVehicleState): StopCls = CyStop StopActionCls = CyStopAction InternalRequestCls = CyInternalRequest TransportationRequestCls = CyTransportationRequest assert isinstance(space, CyTransportSpace), "unsuitable transport space" logger.debug( f"Creating FleetState with vehicle state class {vehicle_state_class}" ) else: raise TypeError(f"Unknown VehicleStateCls {type(vehicle_state_class)}") if isinstance(seat_capacities, Dict): assert set(initial_locations) == set( seat_capacities ), "vehicle_ids in seat_capacities and initial_stoplists must match" elif isinstance(seat_capacities, int): seat_capacities = defaultdict(lambda x=seat_capacities: x) else: raise TypeError(f"seat_capacities must be either dict or int") self.vehicle_state_class = vehicle_state_class self.dispatcher = dispatcher self.space = space assert initial_locations, "No initial locations supplied." for initial_location in initial_locations.values(): # note that NumPy's dimensions start from 0 assert space.n_dim == np.ndim(initial_location) + 1, ( f"Dimension mismatch: Initial location {initial_location} of " f"dimensionality {np.ndim(initial_location) + 1} supplied, " f"but space has {space.n_dim} dimensions." ) self.fleet: Dict[int, VehicleState] = { vehicle_id: vehicle_state_class( vehicle_id=vehicle_id, initial_stoplist=[ StopCls( location=initial_locations[vehicle_id], request=InternalRequestCls( request_id=-1, creation_timestamp=0, location=initial_locations[vehicle_id], ), action=StopActionCls.internal, estimated_arrival_time=0, occupancy_after_servicing=0, time_window_min=0, time_window_max=np.inf, ) ], space=space, dispatcher=self.dispatcher, seat_capacity=seat_capacities[vehicle_id], ) for vehicle_id in initial_locations.keys() }
[docs] @classmethod def from_fleet( cls, *, fleet: Dict[int, VehicleState], space: Union[TransportSpace, CyTransportSpace], dispatcher: Dispatcher, validate: bool = True, ): """ Create a `FleetState` from a dictionary of `VehicleState` objects, keyed by an integer `vehicle_id`. Parameters ---------- fleet Initial stoplists *must* contain current position element (CPE) stops with `StopAction.internal` and `InternalRequest` with `request_id==-1` as their first entries. space TransportSpace to operate on dispatcher dispatcher to use to assign requests to vehicles or reject them. validate If true, try to figure out whether something is off. If not, raise. Returns ------- """ self = super().__new__(cls) self.dispatcher = dispatcher self.fleet = fleet self.space = space if validate: VehicleStateCls = type(next(iter(self.fleet.values()))) assert all( isinstance(vehicle_state, VehicleStateCls) for vehicle_state in self.fleet.values() ), "fleet inhomogeneous" if issubclass(VehicleStateCls, VehicleState): StopCls = Stop StopActionCls = StopAction InternalRequestCls = InternalRequest TransportationRequestCls = TransportationRequest assert isinstance(space, TransportSpace), "unsuitable transport space" elif issubclass(VehicleStateCls, CyVehicleState): StopCls = CyStop StopActionCls = CyStopAction InternalRequestCls = CyInternalRequest TransportationRequestCls = CyTransportationRequest assert isinstance(space, CyTransportSpace), "unsuitable transport space" else: raise TypeError(f"Unknown VehicleStateCls {type(VehicleStateCls)}") for vehicle_state in self.fleet.values(): assert ( vehicle_state.stoplist[0].request.request_id == -1 ), "malformed CPE: request_id must be -1" assert ( vehicle_state.stoplist[0].action == StopActionCls.internal ), "malformed CPE: action must be 'internal'" assert isinstance( vehicle_state.stoplist[0].request, InternalRequestCls ), "malformed CPE: request type must be 'internal'" assert all(isinstance(stop, StopCls) for stop in vehicle_state.stoplist) assert all( isinstance( stop.request, (InternalRequestCls, TransportationRequestCls) ) for stop in vehicle_state.stoplist ) return self
[docs] def simulate( self, requests: Iterator[Request], t_cutoff: float = np.inf ) -> Iterator[Event]: """ Run a simulation. Parameters ---------- requests `Iterator` that supplies incoming requests. t_cutoff optional cutoff time after which the simulation is forcefully ended, disregarding any remaining stops or requests. Returns ------- Iterator of events that have been emitted during the simulation. Note ---- Because of lazy evaluation the returned iterator must be exhausted for the simulation to be actually be performed. """ self.t = 0 with supress_stoplist_extraction_warning(): for vehicle_id, fleet_state in self.fleet.items(): yield { "event_type": "VehicleStateBeginEvent", "vehicle_id": vehicle_id, "timestamp": self.t, "location": fleet_state.stoplist[0].location, "request_id": -100, } for n_req, request in enumerate(requests): # advance clock to req_epoch self.t = request.creation_timestamp if self.t > t_cutoff: break # Visit all the stops upto req_epoch yield from self.fast_forward(self.t) if isinstance(request, (TransportationRequest, CyTransportationRequest)): yield { "event_type": "RequestSubmissionEvent", "request_id": request.request_id, "timestamp": self.t, "origin": request.origin, "destination": request.destination, "pickup_timewindow_min": request.pickup_timewindow_min, "pickup_timewindow_max": request.pickup_timewindow_max, "delivery_timewindow_min": request.delivery_timewindow_min, "delivery_timewindow_max": request.delivery_timewindow_max, } # handle the current request if isinstance(request, (pyTransportationRequest, CyTransportationRequest)): yield self.handle_transportation_request(request) elif isinstance(request, (pyInternalRequest, CyInternalRequest)): yield self.handle_internal_request(request) else: raise NotImplementedError(f"Unknown request type: {type(request)}") logger.info(f"Handled request # {n_req}") with supress_stoplist_extraction_warning(): self.t = min( t_cutoff, max( vehicle.stoplist[-1].estimated_arrival_time for vehicle in self.fleet.values() ), ) # service all remaining stops yield from self.fast_forward(self.t) with supress_stoplist_extraction_warning(): for vehicle_id, fleet_state in self.fleet.items(): yield { "event_type": "VehicleStateEndEvent", "vehicle_id": vehicle_id, "timestamp": self.t, "location": fleet_state.stoplist[0].location, "request_id": -200, }
[docs] @abstractmethod def fast_forward(self, t: SupportsFloat) -> Iterator[StopEvent]: """ Advance the simulator's state in time from the previous time :math:`t'` to the new time :math:`t`, with :math:`t >= t'`. E.g. vehicle locations may change and vehicle stops may be serviced. The latter will emit `.StopEvent` s which are returned. Parameters ---------- t Time to advance to. Returns ------- Iterator of stop events. May be empty if no stop is serviced. """ ...
[docs] def handle_transportation_request( self, req: pyTransportationRequest ) -> RequestEvent: """ Handle a request by mapping the request and the fleet state onto a request response, modifying the fleet state in-place. This method can be implemented in child classes using various parallelization techniques like `multiprocessing`, `OpenMPI` or `dask`. Parameters ---------- req Request to handle. Returns ------- An `.Event`. """ ...
[docs] @abstractmethod def handle_internal_request(self, req: pyInternalRequest) -> RequestEvent: """ Parameters ---------- req request to handle. Returns ------- An `.Event`. """ ...
[docs] def _apply_request_solution( self, req, all_solutions: Iterable[SingleVehicleSolution] ) -> RequestEvent: """ Given a request and a bunch of solutions, pick the one with the minimum cost and apply it, thereby changing the stoplist of the chosen vehicle and emitting an RequestAcceptanceEvent. If the minimum cost is infinite, RequestRejectionEvent is returned. Parameters ---------- req request to handle. all_solutions a dictionary mapping a vehicle_id to a `.SingleVehicleSolution`. Returns ------- Either a `.RequestAcceptanceEvent`, or a `.RequestRejectionEvent` if no suitable vehicle could be found. Note ---- Modifies the `.VehicleState` of the vehicle with the least cost inplace. """ ( best_vehicle, min_cost, ( pickup_timewindow_min, pickup_timewindow_max, delivery_timewindow_min, delivery_timewindow_max, ), ) = min(all_solutions, key=op.itemgetter(1)) logger.debug(f"best vehicle: {best_vehicle}, at min_cost={min_cost}") if min_cost == np.inf: # no solution was found return { "event_type": "RequestRejectionEvent", "timestamp": self.t, "request_id": req.request_id, } else: # modify the best vehicle's stoplist self.fleet[best_vehicle].select_new_stoplist() return { "event_type": "RequestAcceptanceEvent", "timestamp": self.t, "request_id": req.request_id, "origin": req.origin, "destination": req.destination, "pickup_timewindow_min": pickup_timewindow_min, "pickup_timewindow_max": pickup_timewindow_max, "delivery_timewindow_min": delivery_timewindow_min, "delivery_timewindow_max": delivery_timewindow_max, }
[docs] class SlowSimpleFleetState(FleetState): def fast_forward(self, t: float): events = ( vehicle_state.fast_forward_time(t) for vehicle_state in self.fleet.values() ) # no need to swap the old stoplists of each vehicle with the new (fast-forwarded) # stoplists, because vehicle_state_class.fast_forward did that already. return sorted(it.chain.from_iterable(events), key=op.itemgetter("timestamp")) def handle_transportation_request( self, req: pyTransportationRequest ) -> RequestEvent: logger.debug(f"Handling Request: {req}") if req.origin == req.destination: return { "event_type": "RequestRejectionEvent", "timestamp": self.t, "request_id": req.request_id, } return self._apply_request_solution( req, map( ft.partial( self.vehicle_state_class.handle_transportation_request_single_vehicle, request=req, ), self.fleet.values(), ), ) def handle_internal_request(self, req: pyInternalRequest) -> RequestEvent: ...