Source code for ridepy.util.spaces

import random
import copy
import warnings

from typing import List, Tuple, Union, Any, Iterator, Sequence

import numpy as np
import operator as op
import math as m
import networkx as nx
import itertools as it
from scipy.spatial import distance as spd

from ridepy.data_structures import TransportSpace, ID, LocType
from ridepy.util import smartVectorize, make_repr


[docs] class Euclidean(TransportSpace): """ n-dimensional Euclidean space with constant velocity. """ def __init__( self, n_dim: int = 1, coord_range: List[Tuple[Union[int, float], Union[int, float]]] = None, velocity: float = 1, ): """ Initialize n-dimensional Euclidean space with constant velocity. Parameters ---------- n_dim number of dimensions coord_range coordinate range of the space as a list of 2-tuples (x_i,min, x_i,max) where x_i represents the ith dimension. velocity constant scaling factor as discriminator between distance and travel time """ self.n_dim = n_dim self.velocity = velocity if coord_range is not None: assert len(coord_range) == n_dim, ( "Number of desired dimensions must " "match the number of coord range pairs given" ) self.coord_range = coord_range else: self.coord_range = [(0, 1)] * n_dim @smartVectorize def d(self, u, v): if self.n_dim == 1: return abs(v - u) else: return spd.euclidean(u, v) def t(self, u, v): return self.d(u, v) / self.velocity def _coord_sub(self, u, v): if self.n_dim == 1: return u - v else: return map(op.sub, u, v) def _coord_mul(self, u, k): if self.n_dim == 1: return u * k else: return map(op.mul, u, it.repeat(k)) def interp_dist(self, u, v, dist_to_dest): return ( self._coord_sub( v, self._coord_mul(self._coord_sub(v, u), dist_to_dest / self.d(u, v)) ), 0, ) def interp_time(self, u, v, time_to_dest): return ( self._coord_sub( v, self._coord_mul(self._coord_sub(v, u), time_to_dest / self.t(u, v)) ), 0, ) def random_point(self): return tuple(random.uniform(a, b) for a, b in self.coord_range) def asdict(self): return dict( n_dim=self.n_dim, coord_range=self.coord_range, velocity=self.velocity ) def __repr__(self): return make_repr("Euclidean", self.asdict())
class Euclidean1D(Euclidean): def __init__( self, coord_range: List[Tuple[Union[int, float], Union[int, float]]] = None, velocity: float = 1, ): super().__init__(n_dim=1, coord_range=coord_range, velocity=velocity) def interp_dist(self, u, v, dist_to_dest): return v - (v - u) * dist_to_dest / self.d(u, v), 0 def interp_time(self, u, v, time_to_dest): return v - (v - u) * time_to_dest / self.t(u, v), 0 @smartVectorize def d(self, u, v): return abs(v - u) def random_point(self): return random.uniform(self.coord_range[0][0], self.coord_range[0][1]) def asdict(self): return dict(coord_range=self.coord_range, velocity=self.velocity) def __repr__(self): return make_repr("Euclidean1D", self.asdict()) class Euclidean2D(Euclidean): def __init__( self, coord_range: List[Tuple[Union[int, float], Union[int, float]]] = None, velocity: float = 1, ): super().__init__(n_dim=2, coord_range=coord_range, velocity=velocity) self.loc_type = LocType.R2LOC def _coord_sub(self, u, v): return u[0] - v[0], u[1] - v[1] def _coord_mul(self, u, k): return u[0] * k, u[1] * k @smartVectorize def d(self, u, v): return m.sqrt(m.pow(v[0] - u[0], 2) + m.pow(v[1] - u[1], 2)) def asdict(self): return dict(coord_range=self.coord_range, velocity=self.velocity) def __repr__(self): return make_repr("Euclidean2D", self.asdict())
[docs] class Manhattan2D(TransportSpace): """ :math:`\mathbb{R}^2` with :math:`L_1`-induced metric (Manhattan). """ def __init__( self, coord_range: List[Tuple[Union[int, float], Union[int, float]]] = None, velocity: float = 1, ): """ Parameters ---------- coord_range coordinate range of the space as a list of 2-tuples (x_i,min, x_i,max) where x_i represents the ith dimension. velocity constant scaling factor as discriminator between distance and travel time """ self.n_dim = 2 self.velocity = velocity self.loc_type = LocType.R2LOC if coord_range is not None: assert len(coord_range) == 2, ( "Number of desired dimensions must " "match the number of coord range pairs given" ) self.coord_range = coord_range else: self.coord_range = [(0, 1)] * 2 @smartVectorize def d(self, u, v): return abs(u[0] - v[0]) + abs(u[1] - v[1]) def t(self, u, v): return self.d(u, v) / self.velocity def _coord_sub(self, u, v): return map(op.sub, u, v) def _coord_mul(self, u, k): return map(op.mul, u, it.repeat(k)) def interp_dist(self, u, v, dist_to_dest): return ( self._coord_sub( v, self._coord_mul(self._coord_sub(v, u), dist_to_dest / self.d(u, v)) ), 0, ) def interp_time(self, u, v, time_to_dest): return ( self._coord_sub( v, self._coord_mul(self._coord_sub(v, u), time_to_dest / self.t(u, v)) ), 0, ) def random_point(self): return tuple(random.uniform(a, b) for a, b in self.coord_range) def asdict(self): return dict(coord_range=self.coord_range, velocity=self.velocity) def __repr__(self): return make_repr("Manhattan", self.asdict())
[docs] class Graph(TransportSpace): """ A location is identified with a one-dimensional coordinate, namely the node index. """ n_dim = 1 loc_type = LocType.INT def _update_distance_cache(self): ( self._predecessors, self._distances, ) = nx.floyd_warshall_predecessor_and_distance(self.G, "distance") def __init__( self, vertices: Sequence[int], edges: Sequence[Tuple[int, int]], weights: Union[None, float, Sequence[float]] = None, velocity: float = 1, ): """ Weighted undirected graph with integer vertex labels Parameters ---------- vertices sequence of vertices edges sequence of edge tuples weights Edge weights. - if None is supplied, the resulting graph is unweighted (unit edge length) - if a single float is supplied, every edge length will be equal to this number - if a sequence is supplied, this will be mapped onto the edge sequence velocity constant velocity to compute travel time, optional. """ self.G = nx.Graph() self.G.add_nodes_from(vertices) if weights is None: weights = 1 if isinstance(weights, (int, float)): weights = it.repeat(float(weights)) self.G.add_edges_from( (u, v, {"distance": w}) for (u, v), w in zip(edges, weights) ) self.loc_type = LocType.INT self.velocity = velocity self._update_distance_cache()
[docs] @staticmethod def _make_attribute_distance(G, attribute_name: Union[str, None]): """ Set the distance attribute on the edges of the graph inplace. Parameters ---------- G The graph to modify attribute_name The name of the attribute to use as distance. If None, the distance will be set to 1 for each edge. """ if attribute_name is None: nx.set_edge_attributes(G, 1, name="distance") elif attribute_name != "distance": # if 'distance' already exists, we raise if nx.get_edge_attributes(G, "distance"): raise ValueError( "'distance' already exists as edge attribute, won't overwrite" ) # otherwise rename else: nx.set_edge_attributes( G, nx.get_edge_attributes(G, attribute_name), name="distance", ) else: if len(nx.get_edge_attributes(G, "distance")) != G.number_of_edges(): raise ValueError( "Was told to use 'distance' as distance edge attribute. " "All edges must have the distance weight specified." )
@staticmethod def _prepare_copy_of_nx_graph(*, G, make_attribute_distance, directed=False): if not directed: graph_class = nx.Graph else: graph_class = nx.DiGraph if not all( isinstance(u, int) for u in random.sample(list(G.nodes()), k=min(5, len(G))) ): warnings.warn( "Heuristic determined non-int node labels. Converting to int, " "keeping original labels as attribute.", UserWarning, ) G = nx.relabel.convert_node_labels_to_integers( G, label_attribute="original_label" ) else: # create a copy as we don't want to modify the original graph G = copy.deepcopy(G) # as we are keeping the graph instance, make sure it's right if not isinstance(G, graph_class): raise TypeError(f"Must supply {graph_class.__name__}, not {type(G)}") elif (directed and not G.is_directed()) or (not directed and G.is_directed()): raise TypeError(f"Must supply {'' if directed else 'un'}directed graph") Graph._make_attribute_distance(G, make_attribute_distance) return G
[docs] @classmethod def from_nx( cls, G: nx.Graph, velocity: float = 1, make_attribute_distance: str = "distance", ): """ Create a graph space from NetworkX graph. Parameters ---------- G the networkx.Graph instance, will be deepcopied velocity velocity to use for travel time computation make_attribute_distance attribute to rename to "distance" and use as such If None is supplied, the resulting graph is unweighted (unit edge length). Returns ------- graph space instance """ self = cls.__new__(cls) self.G = cls._prepare_copy_of_nx_graph( G=G, make_attribute_distance=make_attribute_distance, directed=False ) self.velocity = velocity self._update_distance_cache() return self
@smartVectorize def d(self, u, v): return self._distances[u][v] def t(self, u, v) -> Union[int, float]: return self.d(u, v) / self.velocity
[docs] def interp_dist(self, u, v, dist_to_dest): """ Parameters ---------- u v dist_to_dest Returns ------- next_node jump_distance """ if u == v: return v, 0 next_node = v while next_node is not u: predecessor = self._predecessors[u][next_node] predecessor_dist = self.d(predecessor, v) if predecessor_dist >= dist_to_dest: break next_node = predecessor if predecessor_dist > dist_to_dest: return next_node, dist_to_dest - self.d(next_node, v) else: return predecessor, 0
[docs] def interp_time(self, u, v, time_to_dest): """ Parameters ---------- u v time_to_dest Returns ------- next_node jump_time """ next_node, jump_dist = self.interp_dist( u, v, dist_to_dest=time_to_dest * self.velocity ) return next_node, jump_dist / self.velocity
def shortest_path_vertex_sequence(self, u, v) -> List[ID]: seq = [v] if u != v: next_node = v while next_node is not u: next_node = self._predecessors[u][next_node] seq.append(next_node) seq.append(u) return seq[::-1] @property def vertices(self): return list(self.G.nodes) @property def edges(self): return list(self.G.edges) @property def weights(self): return list(nx.get_edge_attributes(self.G, "distance").values()) def random_point(self): return random.choice(self.vertices) def __repr__(self): return f"Graph(..., velocity={self.velocity})" def asdict(self): return dict( vertices=self.vertices, edges=self.edges, weights=self.weights, velocity=self.velocity, ) def __reduce__(self): return self.__class__, ( self.vertices, self.edges, self.weights, self.velocity, )
class DiGraph(Graph): def __init__( self, vertices: Sequence[int], edges: Sequence[Tuple[int, int]], weights: Union[None, float, Sequence[float]], velocity: float = 1, ): """ Weighted directed graph with integer vertex labels Parameters ---------- vertices sequence of vertices edges sequence of edge tuples weights Edge weights. - if None is supplied, the resulting graph is unweighted (unit edge length) - if a single float is supplied, every edge length will be equal to this number - if a sequence is supplied, this will be mapped onto the edge sequence velocity constant velocity to compute travel time, optional. """ self.G = nx.DiGraph() self.G.add_nodes_from(vertices) self.G.add_edges_from( (u, v, {"distance": w}) for (u, v), w in zip(edges, weights) ) self.velocity = velocity self._update_distance_cache() @classmethod def from_nx( cls, G: nx.Graph, velocity: float = 1, make_attribute_distance: str = "distance", ): """ Create a graph space from networkx directed graph Parameters ---------- G the networkx.DiGraph instance, will be deepcopied velocity velocity to use for travel time computation make_attribute_distance attribute to rename to "distance" and use as such If None is supplied, the resulting graph is unweighted (unit edge length). Returns ------- graph space instance """ self = cls.__new__(cls) self.G = cls._prepare_copy_of_nx_graph( G=G, make_attribute_distance=make_attribute_distance, directed=True ) self.velocity = velocity self._update_distance_cache() return self def __repr__(self): return f"DiGraph(..., velocity={self.velocity})" class ContinuousGraph(Graph): def __init__(self): raise NotImplementedError @smartVectorize def d(self, u, v): """coordinates shall consist of triples (u, v, dist_to_dest)""" ... def t(self, u, v): ... def interp_dist(self, u, w, time_to_dest): ... def interp_time(self, u, v, time_to_dest): ... def random_point(self): ...