Source code for ridepy.vehicle_state

from __future__ import annotations

import numpy as np

from typing import Optional, SupportsFloat, List, Tuple

from .data_structures import (
    Request,
    Stoplist,
    SingleVehicleSolution,
    StopAction,
    Stop,
    TransportationRequest,
    TransportSpace,
    Dispatcher,
    LocType,
)
from .events import PickupEvent, DeliveryEvent, InternalEvent, StopEvent

import logging

logger = logging.getLogger(__name__)


[docs] class VehicleState: """ Single vehicle insertion logic is implemented here. Can optionally be implemented in Cython or another compiled language. """ def recompute_arrival_times_drive_first(self): # update CPATs for stop_i, stop_j in zip(self.stoplist, self.stoplist[1:]): stop_j.estimated_arrival_time = max( stop_i.estimated_arrival_time, stop_i.time_window_min ) + self.space.t(stop_i.location, stop_j.location) def __init__( self, *, vehicle_id, initial_stoplist: Stoplist, space: TransportSpace, dispatcher: Dispatcher, seat_capacity: int, ): """ Parameters ---------- vehicle_id id of the vehicle to be created. initial_stoplist stoplist to start out with, MUST contain CPE as first element. space dispatcher see the docstring of `.FleetState`. seat_capacity the maximum number of `.TransportationRequest` s that can be in a vehicle at the same time. """ self.vehicle_id = vehicle_id # TODO check for CPE existence in each supplied stoplist or encapsulate the whole thing self.stoplist: Stoplist = initial_stoplist """The list of `.Stop` objects specifying the planned future actions to be undertaken by this vehicle.""" self.space = space self.dispatcher = dispatcher self.seat_capacity = seat_capacity logger.info(f"Created VehicleState with space of type {type(self.space)}")
[docs] def fast_forward_time(self, t: float) -> Tuple[List[StopEvent], List[Stop]]: """ Update the vehicle_state to the simulator time `t`. Parameters ---------- t time to be updated to. Returns ------- events List of stop events emitted through servicing stops upto time=t new_stoplist Stoplist remaining after servicing the stops upto time=t """ # TODO assert that the CPATs are updated and the stops sorted accordingly # TODO optionally validate the travel time velocity constraints event_cache = [] # Here, last_stop refers to the stop with the largest departure time value smaller or equal than t. # This can either be the last stop in the stoplist that is serviced here, or it can be the # (possibly outdated) CPE stop, of no other stop is serviced. last_stop = None # drop all non-future stops from the stoplist, except for the (outdated) CPE for i in range(len(self.stoplist) - 1, 0, -1): stop = self.stoplist[i] service_time = max(stop.estimated_arrival_time, stop.time_window_min) # service the stop at the minimum time at which it can leave if service_time <= t: # as we are iterating backwards, the first stop iterated over is the last one serviced if last_stop is None: last_stop = stop if stop.action == StopAction.pickup: stop_event = { "event_type": "PickupEvent", "timestamp": service_time, "request_id": stop.request.request_id, "vehicle_id": self.vehicle_id, } elif stop.action == StopAction.dropoff: stop_event = { "event_type": "DeliveryEvent", "timestamp": service_time, "request_id": stop.request.request_id, "vehicle_id": self.vehicle_id, } elif stop.action == StopAction.internal: stop_event = { "event_type": "InternalEvent", "timestamp": service_time, "vehicle_id": self.vehicle_id, } else: raise ValueError(f"Unknown StopAction={stop.action}") event_cache.append(stop_event) del self.stoplist[i] # fix event cache order event_cache = event_cache[::-1] # if no stop was serviced, the last stop is the outdated CPE if last_stop is None: last_stop = self.stoplist[0] # set the occupancy at CPE self.stoplist[0].occupancy_after_servicing = last_stop.occupancy_after_servicing # set CPE location to current location as inferred from the time delta to the # upcoming stop's CPAT still mid-jump from last interpolation, no need to # interpolate again if self.stoplist[0].estimated_arrival_time <= t: if len(self.stoplist) > 1: loc, jump_time = self.space.interp_time( u=last_stop.location, v=self.stoplist[1].location, time_to_dest=self.stoplist[1].estimated_arrival_time - t, ) self.stoplist[0].location = loc # set CPE time self.stoplist[0].estimated_arrival_time = t + jump_time else: # Stoplist is (now) empty, only CPE is there. Set CPE time to # current time and move CPE to last_stop's location (which is # identical to CPE, if we haven't served anything. self.stoplist[0].location = last_stop.location self.stoplist[0].estimated_arrival_time = t return event_cache
[docs] def handle_transportation_request_single_vehicle( self, request: TransportationRequest ) -> SingleVehicleSolution: """ The computational bottleneck. An efficient simulator could do the following: 1. Parallelize this over all vehicles. This function being without any side effects, it should be easy to do. 2. Implement as a c extension. The args and the return value are all basic c data types, so this should also be easy. Parameters ---------- request Request to be handled. Returns ------- The `SingleVehicleSolution` for the respective vehicle. """ cost, self.stoplist_new, (EAST_pu, LAST_pu, EAST_do, LAST_do) = self.dispatcher( request=request, stoplist=self.stoplist, space=self.space, seat_capacity=self.seat_capacity, ) return self.vehicle_id, cost, (EAST_pu, LAST_pu, EAST_do, LAST_do)
def select_new_stoplist(self): self.stoplist = self.stoplist_new