Source code for quetzal.engine.connectivity

# -*- coding: utf-8 -*-

from syspy.spatial import spatial
from syspy.skims import skims
import pandas as pd
import geopandas as gpd
import shapely

[docs]def node_clustering(links, nodes, n_clusters, prefixe='', group_id=None,**kwargs): disaggregated_nodes = nodes.copy() if group_id is None: clusters, cluster_series = spatial.zone_clusters( nodes, n_clusters=n_clusters, **kwargs ) else: clusters = nodes.groupby(group_id).first() cluster_series = nodes[group_id] cluster_dict = cluster_series.to_dict() centroids = clusters.copy() centroids['geometry'] = centroids['geometry'].apply(lambda g: g.centroid) try: links = links.copy() links['disaggregated_a'] = links['a'] links['disaggregated_b'] = links['b'] links['a'] = links['a'].apply(lambda x: prefixe + str(cluster_dict[x])) links['b'] = links['b'].apply(lambda x: prefixe + str(cluster_dict[x])) except AttributeError: links = None clusters['count'] = cluster_series.value_counts() disaggregated_nodes['cluster'] = cluster_series parenthood = pd.merge( disaggregated_nodes, centroids, left_on='cluster', right_index=True, suffixes=['_node', '_centroid'] ) parenthood['geometry'] = parenthood.apply(parenthood_geometry, axis=1) centroids.index = prefixe + pd.Series(centroids.index).astype(str) return links, centroids, clusters, parenthood
[docs]def parenthood_geometry(row): g = shapely.geometry.LineString( [row['geometry_node'], row['geometry_centroid']] ) return g
[docs]def geo_join_method(geo): return geo.convex_hull.buffer(1e-4)
[docs]def voronoi_graph_and_tesselation(nodes, max_length=None, coordinates_unit='degree'): v_tesselation, v_graph = spatial.voronoi_diagram_dataframes(nodes['geometry']) # Compute length if coordinates_unit=='degree': # Default behaviour, assuming lat-lon coordinates v_graph['length'] = skims.distance_from_geometry(v_graph['geometry']) elif coordinates_unit=='meter': # metric v_graph['length'] = v_graph['geometry'].apply(lambda x: x.length) else: raise('Invalid coordinates_unit.') if max_length: v_graph = v_graph.loc[v_graph['length'] <= max_length] return v_graph, v_tesselation
[docs]def build_footpaths(nodes, speed=3, max_length=None, n_clusters=None, coordinates_unit='degree'): if n_clusters and n_clusters < len(nodes): centroids, links = centroid_and_links(nodes, n_clusters, coordinates_unit=coordinates_unit) nodes=nodes.loc[centroids] # not a bool for the geodataframe to be serializabe links['voronoi'] = 0 graph, tesselation = voronoi_graph_and_tesselation( nodes, max_length, coordinates_unit=coordinates_unit ) footpaths = pd.concat( [ graph, graph.rename(columns={'a': 'b', 'b': 'a'}) ] ) footpaths['voronoi'] = 1 try: footpaths = footpaths.append(links) if max_length: footpaths = footpaths.loc[footpaths['length'] <= max_length] except NameError: pass footpaths.reset_index(drop=True, inplace=True) footpaths.index = 'footpath_' + pd.Series(footpaths.index).astype(str) footpaths['time'] = footpaths['length'] / speed / 1000 * 3600 return footpaths