Source code for quetzal.model.transportmodel

# -*- coding: utf-8 -*-

import pandas as pd
import numpy as np
from quetzal.analysis import analysis
from quetzal.engine import engine
from quetzal.engine.pathfinder import PublicPathFinder
from quetzal.engine.park_and_ride_pathfinder import ParkRidePathFinder 
from quetzal.engine.road_pathfinder import RoadPathFinder
from quetzal.engine import nested_logit
from quetzal.model import model, optimalmodel, parkridemodel

from syspy.assignment import raw as raw_assignment
from syspy.assignment.raw import fast_assign as assign
from syspy.skims import skims
from tqdm import tqdm
import networkx as nx


[docs]def read_hdf(filepath): m = TransportModel() m.read_hdf(filepath) return m
[docs]def read_json(folder): m = TransportModel() m.read_json(folder) return m
track_args = model.track_args log = model.log
[docs]class TransportModel(optimalmodel.OptimalModel, parkridemodel.ParkRideModel):
[docs] @track_args def step_distribution( self, deterrence_matrix=None, **od_volume_from_zones_kwargs ): """ * requires: zones * builds: volumes :param deterrence_matrix: an OD unstaked dataframe representing the disincentive to travel as distance/time/cost increases. :param od_volume_from_zones_kwargs: if the friction matrix is not provided, it will be automatically computed using a gravity distribution which uses the following parameters: * param power: (int) the gravity exponent * param intrazonal: (bool) set the intrazonal distance to 0 if False, compute a characteristic distance otherwise. """ self.volumes = engine.od_volume_from_zones( self.zones, deterrence_matrix, coordinates_unit=self.coordinates_unit, **od_volume_from_zones_kwargs )
[docs] @track_args def step_pathfinder( self, walk_on_road=False, complete=True, **kwargs ): """ * requires: links, footpaths, zone_to_transit, zone_to_road * builds: pt_los """ assert self.links['time'].isnull().sum() == 0 self.links = engine.graph_links(self.links) self.walk_on_road = walk_on_road if walk_on_road: footpaths = self.road_links.copy() footpaths['time'] = footpaths['walk_time'] ntlegs = self.zone_to_road nodes = self.road_nodes else: footpaths = self.footpaths ntlegs = self.zone_to_transit nodes = self.nodes #TODO even with walk on road, transit nodes may not belong to road_nodes self.pt_los, self.graph = engine.path_and_duration_from_links_and_ntlegs( self.links, ntlegs=ntlegs, pole_set=set(self.zones.index), footpaths=footpaths, **kwargs ) if complete: self.pt_los = analysis.path_analysis_od_matrix( od_matrix=self.pt_los, links=self.links, nodes=nodes, centroids=self.centroids, )
[docs] @track_args def step_road_pathfinder(self, maxiters=1, *args, **kwargs): """ * requires: zones, road_links, zone_to_road * builds: car_los, road_links """ roadpathfinder = RoadPathFinder(self) roadpathfinder.frank_wolfe(maxiters=maxiters, *args, **kwargs) self.car_los = roadpathfinder.car_los self.road_links = roadpathfinder.road_links
[docs] @track_args def step_pr_pathfinder( self, force=False, path_analysis=True, **kwargs ): if not force: sets = ['nodes', 'links', 'zones', 'road_nodes', 'road_links'] self.integrity_test_collision(sets) self.links = engine.graph_links(self.links) parkridepathfinder = ParkRidePathFinder(self) parkridepathfinder.find_best_path(**kwargs) self.pr_los = parkridepathfinder.paths if path_analysis: analysis_nodes = pd.concat([self.nodes, self.road_nodes]) analysis_links = pd.concat([self.links, self.road_links]) self.pr_los = analysis.path_analysis_od_matrix( od_matrix=self.pr_los, links=self.links, nodes=analysis_nodes, centroids=self.centroids, ) # analyse non vérifiée, prise directement depuis pt_los
[docs] @track_args def step_pt_pathfinder( self, broken_routes=True, broken_modes=True, route_column='route_id', mode_column='route_type', boarding_time=None, speedup=False, walk_on_road=False, # keep_graph=False, keep_pathfinder=False, force=False, path_analysis=True, **kwargs): """ * requires: zones, links, footpaths, zone_to_road, zone_to_transit * builds: pt_los """ sets = ['nodes', 'links', 'zones'] if walk_on_road: sets += ['road_nodes', 'road_links'] if not force: self.integrity_test_collision(sets) self.links = engine.graph_links(self.links) publicpathfinder = PublicPathFinder(self, walk_on_road=walk_on_road) publicpathfinder.find_best_paths( broken_routes=broken_routes, broken_modes=broken_modes, route_column=route_column, mode_column=mode_column, speedup=speedup, boarding_time=boarding_time, **kwargs ) # if keep_graph: # self.nx_graph=publicpathfinder.nx_graph if keep_pathfinder: self.publicpathfinder = publicpathfinder self.pt_los = publicpathfinder.paths analysis_nodes = pd.concat([self.nodes, self.road_nodes]) if walk_on_road else self.nodes if path_analysis: self.pt_los = analysis.path_analysis_od_matrix( od_matrix=self.pt_los, links=self.links, nodes=analysis_nodes, centroids=self.centroids, )
[docs] @track_args def step_concatenate_los(self): """ * requires: pt_los, car_los * builds: los """
[docs] @track_args def step_modal_split(self, build_od_stack=True, **modal_split_kwargs): """ * requires: volumes, los * builds: od_stack, shared :param modal_split_kwargs: kwargs of engine.modal_split example: :: sm.step_modal_split( time_scale=1/1800, alpha_car=2, beta_car=600 ) """ shared = engine.modal_split_from_volumes_and_los( self.volumes, self.los, **modal_split_kwargs ) # shared['distance_car'] = shared['distance'] if build_od_stack: self.od_stack = analysis.volume_analysis_od_matrix(shared) self.shared = shared
[docs] def compute_los_volume(self, time_expanded=False, keep_segments=True): los = self.los if not time_expanded else self.te_los segments = self.segments probabilities = [(segment, 'probability') for segment in segments] shared_cols = list(set(self.volumes.columns).intersection(set(los.columns))) on = [col for col in shared_cols if col in ['origin', 'destination', 'wished_departure_time']] left = los[on + probabilities] left['index'] = left.index df = pd.merge(left, self.volumes, on=on).set_index('index') df = df.reindex(los.index) values = df[probabilities].values * df[segments].values i = 0 for segment in segments: los[segment] = values.T[i] i += 1 los['volume'] = np.nansum(values, axis=1) if time_expanded: los_volumes = self.te_los.groupby('path_id')[['volume'] + segments].sum() path_id_list = list(self.los['path_id']) volume_values = los_volumes.reindex(path_id_list).fillna(0).values for c in los_volumes.columns: self.los[c] = np.nan # create_columns self.los.loc[:, los_volumes.columns] = volume_values
[docs] def step_assignment( self, road=False, boardings=False, boarding_links=False, alightings=False, alighting_links=False, transfers=False, segmented=False, time_expanded=False, compute_los_volume=True ): if compute_los_volume: self.compute_los_volume(time_expanded=time_expanded) los = self.los.copy() column = 'link_path' l = los.dropna(subset=[column]) l = l.loc[l['volume'] > 0] self.links['volume'] = assign(l['volume'], l[column]) if road: self.road_links[('volume', 'car')] = assign(l['volume'], l[column]) if 'road_link_list' in self.links.columns: to_assign = self.links.dropna(subset=['volume', 'road_link_list']) self.road_links[('volume', 'pt')] = assign( to_assign['volume'], to_assign['road_link_list'] ) if boardings and not boarding_links: print('to assign boardings on links pass boarding_links=True') if boarding_links: column = 'boarding_links' l = los.dropna(subset=[column]) self.links['boardings'] = assign(l['volume'], l[column]) if boardings: column = 'boardings' l = los.dropna(subset=[column]) self.nodes['boardings'] = assign(l['volume'], l[column]) if alighting_links: column = 'alighting_links' l = los.dropna(subset=[column]) self.links['alightings'] = assign(l['volume'], l[column]) if alightings: column = 'alightings' l = los.dropna(subset=[column]) self.nodes['alightings'] = assign(l['volume'], l[column]) if transfers: column = 'transfers' l = los.dropna(subset=[column]) self.nodes[ 'transfers'] = assign(l['volume'], l[column]) if segmented: self.segmented_assigment( road=road, boardings=boardings, alightings=alightings, transfers=transfers, aggregated_los=los )
[docs] def segmented_assigment( self, road=False, boardings=False, alightings=False, transfers=False, aggregated_los=None ): los = aggregated_los if aggregated_los is not None else self.los for segment in self.segments: column = 'link_path' l = los.dropna(subset=[column]) self.links[segment] = assign(l[segment], l[column]) if road: self.road_links[(segment, 'car')] = assign(l[segment], l[column]) self.road_links[(segment, 'pt')] = assign( self.links[segment], self.links['road_link_list'] ) if boardings: column = 'boarding_links' l = los.dropna(subset=[column]) self.links[(segment, 'boardings')] = assign(l[segment], l[column]) column = 'boardings' l = los.dropna(subset=[column]) self.nodes[(segment, 'boardings')] = assign(l[segment], l[column]) if alightings: column = 'alighting_links' l = los.dropna(subset=[column]) self.links[(segment, 'alightings')] = assign(l[segment], l[column]) column = 'alightings' l = los.dropna(subset=[column]) self.nodes[(segment, 'alightings')] = assign(l[segment], l[column]) if transfers: column = 'transfers' l = los.dropna(subset=[column]) self.nodes[(segment, 'transfers')] = assign(l[segment], l[column])
[docs] @track_args def step_pt_assignment( self, volume_column=None, on_road_links=False, split_by=None, **kwargs ): """ Assignment step * requires: links, nodes, pt_los, road_links, volumes, path_probabilities * builds: loaded_links, loaded_nodes, add load to road_links :param volume_column: volume column of self.volumes to assign. If none, all columns will be assigned :param on_road_links: if True, performs pt assignment on road_links as well :param split_by: path categories to be tracked in the assignment. Must be a column of self.pt_los example: :: sm.step_assignment( volume_column=None, on_road_links=False, split_by='route_type', boardings=True, alightings=True, transfers=True } ) """ if volume_column is None: self.segmented_pt_assignment( on_road_links=on_road_links, split_by=split_by, **kwargs ) return # When split_by is not None, this call could be replaced by a sum, provided # prior dumb definition of loaded_links and loaded_nodes self.loaded_links, self.loaded_nodes = engine.loaded_links_and_nodes( self.links, self.nodes, volumes=self.volumes, path_finder_stack=self.pt_los, volume_column=volume_column, **kwargs ) # Rename columns self.loaded_links.rename(columns={volume_column: ('load', volume_column)}, inplace=True) self.loaded_nodes.rename(columns={volume_column: ('load', volume_column)}, inplace=True) for col in list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())): self.loaded_links.rename(columns={col: (col, volume_column)}, inplace=True) self.loaded_nodes.rename(columns={col: (col, volume_column)}, inplace=True) # Group assignment if split_by is not None: groups = self.pt_los[split_by].unique() for group in groups: # TODO remove rows with empty link_path group_pt_los = self.pt_los.loc[self.pt_los[split_by]==group] group_loaded_links, group_loaded_nodes = engine.loaded_links_and_nodes( self.links, self.nodes, volumes=self.volumes, path_finder_stack=group_pt_los, volume_column=volume_column, **kwargs ) # Append results columns self.loaded_links[('load', volume_column, group)] = group_loaded_links[volume_column] self.loaded_nodes[('load', volume_column, group)] = group_loaded_nodes[volume_column] for col in list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())): self.loaded_links[(col, volume_column, group)] = group_loaded_links[col] self.loaded_nodes[(col, volume_column, group)] = group_loaded_nodes[col] # Assignment on road_links if on_road_links: if not 'road_link_path' in self.pt_los.columns: # create road_link_path column from networkcasted linkss if not already defined self._analysis_road_link_path() merged = pd.merge(self.pt_los, self.volumes, on=['origin', 'destination']) merged['to_assign'] = merged[(volume_column ,'probability')] * merged[volume_column].fillna(0) if split_by is not None: def assign_group(g): x = g.reset_index() result = raw_assignment.assign(x['to_assign'], x['road_link_path']) return result group_assigned = merged.groupby(split_by).apply(assign_group) assigned = group_assigned.unstack().T.loc['volume'].fillna(0) # Add empty groups for empty in list(set(groups).difference(set(assigned.columns))): assigned[empty] = 0 self.road_links[[(volume_column, col) for col in groups]] = assigned[[col for col in groups]] self.road_links[volume_column] = assigned.T.sum() else: # no groups assigned = raw_assignment.assign(merged['to_assign'], merged['road_link_path']) self.road_links[volume_column] = assigned['volume'] # todo remove 'load' from analysis module: self.road_links['load'] = self.road_links[volume_column]
[docs] def segmented_pt_assignment(self, split_by=None, on_road_links=False, *args, **kwargs): """ Performs pt assignment for all demand segments. Requires computed path probabilities in pt_los for each segment. """ segments = self.segments iterator = tqdm(segments) for segment in iterator: iterator.desc = str(segment) # Assign demand segment self.step_pt_assignment( volume_column=segment, path_pivot_column=(segment, 'probability'), split_by=split_by, on_road_links=on_road_links, **kwargs ) # Update links and nodes to keep results as loaded links and nodes # are erased at each call of step_pt_assignment self.links = self.loaded_links self.nodes = self.loaded_nodes # Group assignment results: sum over demand segments try: groups = self.pt_los[split_by].unique() except KeyError as e: groups = [] cols = ['load'] # Add boardings, alightings and transfers if processed cols += list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())) for col in cols: for g in groups: columns = [tuple([col, s, g]) for s in segments] name = tuple([col, g]) self.loaded_links[name] = self.loaded_links[columns].T.sum() self.loaded_links.drop(columns, 1, inplace=True) self.loaded_nodes[name] = self.loaded_nodes[columns].T.sum() self.loaded_nodes.drop(columns, 1, inplace=True) columns = [tuple([col, s]) for s in segments] self.loaded_links[col] = self.loaded_links[columns].T.sum() self.loaded_links.drop(columns, 1, inplace=True) self.loaded_nodes[col] = self.loaded_nodes[columns].T.sum() self.loaded_nodes.drop(columns, 1, inplace=True) if on_road_links: for group in groups: self.road_links[('all', group)] = self.road_links[[(s, group) for s in segments]].T.sum() self.road_links.drop([(s, group) for s in segments], 1, inplace=True) self.road_links['load'] = self.road_links[[s for s in segments]].T.sum() self.road_links.drop([s for s in segments], 1, inplace=True)
[docs] def step_car_assignment(self, volume_column=None): """ Assignment step * requires: road_links, car_los, road_links, volumes, path_probabilities * builds: loaded_road_links """ if volume_column is None: self.segmented_car_assignment()
[docs] def segmented_car_assignment(self): segments = self.segments iterator = tqdm(segments) for segment in iterator: iterator.desc = str(segment) merged = pd.merge(self.car_los, self.volumes, on=['origin', 'destination']) merged['to_assign'] = merged[(segment ,'probability')] * merged[segment].fillna(0) assigned = raw_assignment.assign(merged['to_assign'], merged['link_path']).fillna(0) self.road_links[(segment, 'car')] = assigned columns = [(segment, 'car') for segment in self.segments] self.road_links[('all', 'car')] = self.road_links[columns].T.sum()
#TODO Merge conflict: TO CHECK WITH ACCRA # self.road_links.drop(columns, 1, inplace=True) # if not 'load' in self.road_links.columns: # self.road_links['load'] = 0 # self.road_links['load'] += self.road_links[('all','car')] #TODO move all utility features to another object / file
[docs] def analysis_mode_utility(self, how='min', segment=None, segments=None, time_expanded=False): """ * requires: mode_utility, los, utility_values * builds: los """ if segment is None: for segment in tqdm(self.segments): self.analysis_mode_utility(how=how, segment=segment, time_expanded=time_expanded) return if time_expanded: logit_los = self.te_los else: logit_los = self.los mode_utility = self.mode_utility[segment].to_dict() # route type utilities rtu = { rt: get_combined_mode_utility( rt, how=how, mode_utility=mode_utility ) for rt in logit_los['route_types'].unique() } logit_los['mode_utility'] = logit_los['route_types'].map(rtu.get) utility_values = self.utility_values[segment].to_dict() u = 0 for key, value in utility_values.items(): u += value * logit_los[key] logit_los[(segment, 'utility')] = u logit_los[(segment, 'utility')] = logit_los[(segment, 'utility')]
[docs] def analysis_utility(self, segment='root', time_expanded=False): """ * requires: mode_utility, los, utility_values * builds: los """ if segment is None: for segment in self.segments: print(segment) self.analysis_mode_utility(how=how, segment=segment, time_expanded=time_expanded) return if time_expanded: los = self.te_los else: los = self.los utility_values = self.utility_values[segment].to_dict() u = 0 for key, value in utility_values.items(): u += value * los[key] los[(segment, 'utility')] = u los[(segment, 'utility')] = los[(segment, 'utility')].astype(float)
[docs] def initialize_logit(self): zones = list(self.zones.index) od = pd.DataFrame(index=pd.MultiIndex.from_product([zones, zones])) self.od_probabilities = od.copy() self.od_utilities = od.copy()
[docs] def step_logit( self, time_expanded=False, decimals=None, n_paths_max=None, nchunks=10, workers=1, keep_od_tables=True ): """ * requires: mode_nests, logit_scales, los * builds: los, od_utilities, od_probabilities, path_utilities, path_probabilities """ # concatenate paths od_cols = ['origin', 'destination'] if time_expanded: od_cols.append('wished_departure_time') to_concat = [] for segment in self.segments: keep_columns = od_cols + ['route_type', (segment, 'utility')] if time_expanded: paths = self.te_los[keep_columns] else: paths = self.los[keep_columns] paths.rename(columns={(segment, 'utility'): 'utility'}, inplace=True) paths = paths.dropna(subset=['utility']) paths['segment'] = segment to_concat.append(paths) segmented_paths = pd.concat(to_concat) try: # all the segments can be proccessed together # assert all logit scales are the same and pick one logit_scales = self.logit_scales.T.drop_duplicates().T assert len(logit_scales.columns) == 1 logit_scales.columns = ['root'] nls = logit_scales['root'].to_dict() # assert all mode_nests are the same and pick one mode_nests = self.mode_nests.T.drop_duplicates().T assert len(mode_nests.columns) == 1 mode_nests.columns = ['root'] nests = mode_nests.reset_index().groupby('root')['route_type'].agg( lambda s: list(s)).to_dict() p, mu, mp = nested_logit.nested_logit_from_paths( segmented_paths, od_cols, mode_nests=nests, phi=nls, verbose=False, decimals=decimals, n_paths_max=n_paths_max, nchunks=nchunks, workers=workers, return_od_tables=keep_od_tables ) except AssertionError: p_list = [] mu_list = [] mp_list = [] for segment in self.segments: mode_nests = self.mode_nests.reset_index().groupby(segment)['route_type'].agg( lambda s: list(s)).to_dict() nls = self.logit_scales[segment].to_dict() paths = segmented_paths.loc[segmented_paths['segment'] == segment] p, mu, mp = nested_logit.nested_logit_from_paths( paths, mode_nests=mode_nests, phi=nls, od_cols=od_cols, decimals=decimals, n_paths_max=n_paths_max, ) p_list.append(p) mu_list.append(mu) mp_list.append(mp) p = pd.concat(p_list) mu = pd.concat(mu_list, ignore_index=True) mp = pd.concat(mp_list, ignore_index=True) p.reset_index(inplace=True) p.set_index(['segment', 'index'], inplace=True) for segment in self.segments: if time_expanded: self.te_los[(segment, 'probability')] = p.loc[segment]['probability'] else: self.los[(segment, 'probability')] = p.loc[segment]['probability'] self.probabilities = mp self.utilities = mu
[docs]def get_combined_mode_utility(route_types, mode_utility, how='min',): utilities = [mode_utility[mode] for mode in route_types] if not len(utilities): return 0 if how=='min': # worse mode return min(utilities) elif how=='max': # best mode return max(utilities) elif how=='sum': return sum(utilities) elif how=='mean': return sum(utilities) / len(utilities)