Source code for quetzal.model.model

# -*- coding: utf-8 -*-

import json
import zlib
import os
import sys
import pickle
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point

from copy import deepcopy
from functools import wraps
from tqdm import tqdm
import shutil
import uuid
import ntpath
from quetzal.io import hdf_io

from syspy.syspy_utils.data_visualization import add_basemap
from concurrent.futures import ProcessPoolExecutor
from quetzal.model.integritymodel import IntegrityModel


[docs]def read_hdf(filepath): m = Model() m.read_hdf(filepath) return m
[docs]def log(text, debug=False): if debug: print(text)
[docs]def authorized_column( df, column, authorized_types=(str, int, float) ): type_set = set(df[column].dropna().apply(type)) delta = type_set - set(authorized_types) return delta == set()
[docs]def track_args(method): # wraps changes decorated attributes for method attributes # decorated.__name__ = method.__name__ etc... @wraps(method) def decorated(self, *args, **kwargs): """ All the parameters are stored if use_tracked_args=True is passed to a method, the last parameters passed to this method are used """ try: use_tracked_args = kwargs.pop('use_tracked_args') except KeyError: use_tracked_args = False name = method.__name__ debug = self.debug if 'debug' in self.__dict__ else True if use_tracked_args: args = self.parameters[name]['args'] kwargs = self.parameters[name]['kwargs'] log('using parameters from self.parameters:', debug) log('args:' + str(args), debug) log('kwargs:' + str(kwargs), debug) else: self.parameters[name] = {} self.parameters[name]['args'] = args self.parameters[name]['kwargs'] = kwargs return method(self, *args, **kwargs) return decorated
[docs]def merge( left, right, suffixes=['_left', '_right'], how='inner', reindex=True, clear=True ): assert left.epsg == right.epsg # we want to return an object with the same class as left model = left.__class__( epsg=left.epsg, coordinates_unit=left.coordinates_unit) if clear else left.copy() model.links, model.nodes = merge_links_and_nodes( left_links=left.links, left_nodes=left.nodes, right_links=right.links, right_nodes=right.nodes, suffixes=suffixes, join_nodes=how, join_links=how, reindex=reindex ) return model
[docs]def obj_size_fmt(num): if num < 10**3: return "{:.2f}{}".format(num, "B") elif ((num >= 10**3) & (num < 10**6)): return "{:.2f}{}".format(num/(1.024*10**3), "KB") elif ((num >= 10**6) & (num < 10**9)): return "{:.2f}{}".format(num/(1.024*10**6), "MB") else: return "{:.2f}{}".format(num/(1.024*10**9), "GB")
[docs]class Model(IntegrityModel): def __init__( self, json_database=None, json_folder=None, hdf_database=None, zip_database=None, zippedpickles_folder=None, omitted_attributes=(), only_attributes=None, *args, **kwargs ): """ Initialization function, either from a json folder or a json_database representation. Args: json_database (json): a json_database representation of the model. Default None. json_folder (json): a json folder representation of the model. Default None. Examples: >>> sm = stepmodel.Model(json_database=json_database_object) >>> sm = stepmodel.Model(json_folder=folder_path) """ super().__init__(*args, **kwargs) if json_database and json_folder: raise Exception( 'Only one argument should be given to the init function.') elif json_database: self.read_json_database(json_database) elif json_folder: self.read_json(json_folder) elif hdf_database: self.read_hdf( hdf_database, omitted_attributes=omitted_attributes, only_attributes=only_attributes ) elif zip_database: self.read_zip( zip_database, omitted_attributes=omitted_attributes, only_attributes=only_attributes ) elif zippedpickles_folder: self.read_zippedpickles( zippedpickles_folder, omitted_attributes=omitted_attributes, only_attributes=only_attributes ) self.debug = True # Add default coordinates unit and epsg if self.epsg is None: # print('Model epsg not defined: setting epsg to default one: 4326') self.epsg = 4326 if self.coordinates_unit is None: # print('Model coordinates_unit not defined: setting coordinates_unit to default one: degree') self.coordinates_unit = 'degree'
[docs] def memory_usage(model, head=10): memory_usage_by_variable = pd.DataFrame( {k: sys.getsizeof(v) for (k, v) in model.__dict__.items()},index=['Size'] ) memory_usage_by_variable = memory_usage_by_variable.T memory_usage_by_variable = memory_usage_by_variable.sort_values(by='Size',ascending=False) memory_usage_by_variable['Size'] = memory_usage_by_variable['Size'].apply(lambda x: obj_size_fmt(x)) return memory_usage_by_variable
[docs] def plot( self, attribute, ticks=False, basemap_url=None, zoom=12, title=None, fontsize=24, fname=None, *args, **kwargs ): gdf = gpd.GeoDataFrame(self.__dict__[attribute]) if self.epsg != 3857 and basemap_url is not None: gdf.crs = {'init': 'epsg:{}'.format(self.epsg)} gdf = gdf.to_crs(epsg=3857) plot = gdf.plot(*args, **kwargs) if ticks is False: plot.set_xticks([]) plot.set_yticks([]) if title: plot.set_title(title, fontsize=fontsize) if basemap_url is not None: add_basemap(plot, zoom=zoom, url=basemap_url) if fname: fig = plot.get_figure() fig.savefig(fname, bbox_inches='tight') return plot
[docs] def split_attribute(self, attr, by=None, nchunks=None, drop=True): # attr = pt_los # split self.pt_los in several attributes self.pt_los_1, self.pt_los_2 etc if by is not None: pool = self.__getattribute__(attr).set_index(by, append=True, drop=drop).swaplevel() self.__delattr__(attr) keys = set(pool.index.levels[0]) for k in keys: self.__setattr__('%s_%s' % (attr, str(k)), pool.loc[k]) elif nchunks is not None: pool = self.__getattribute__(attr) self.__delattr__(attr) length = len(pool) chunk_size = length // nchunks + 1 pd.Series(range(length)) // chunk_size for i in range(nchunks): self.__setattr__( '%s_%s' % (attr, str(i)), pool.iloc[i*chunk_size: (i+1)*chunk_size] )
[docs] def merge_attribute(self, attr, keys=None): # attr = pt_los # merge several attributes self.pt_los_1, self.pt_los_2 etc in self.pt_los if keys is None: keys = [ s.split(attr + '_')[1] for s in self.__dict__.keys() if s.startswith(attr + '_') ] to_concat = [] for key in keys: to_concat.append(self.__getattribute__(attr + '_' + key)) self.__delattr__(attr + '_' + key) self.__setattr__(attr, pd.concat(to_concat))
[docs] def to_zippedpickles( self, folder, omitted_attributes=(), only_attributes=None, max_workers=1, complevel=-1, remove_first=True ): if remove_first: shutil.rmtree(folder, ignore_errors=True) os.makedirs(folder, exist_ok=True) if max_workers == 1: iterator = tqdm(self.__dict__.items()) for key, value in iterator: iterator.desc = key if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue hdf_io.to_zippedpickle( value, '%s/%s.zippedpickle' % (folder, key), complevel=complevel ) else: with ProcessPoolExecutor(max_workers=max_workers) as executor: for key, value in self.__dict__.items(): if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue executor.submit( hdf_io.to_zippedpickle, value, r'%s/%s.zippedpickle' % (folder, key), complevel=complevel )
[docs] def read_zippedpickles(self, folder, omitted_attributes=(), only_attributes=None): files = os.listdir(folder) keys = [ file.split('.zippedpickle')[0] for file in files if '.zippedpickle' in file ] iterator = tqdm(keys) for key in iterator: if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue iterator.desc = key with open ('%s/%s.zippedpickle' % (folder, key), 'rb') as file: buffer = file.read() bigbuffer = zlib.decompress(buffer) self.__setattr__(key, pickle.loads(bigbuffer))
[docs] def read_hdf(self, filepath, omitted_attributes=(), only_attributes=None): with pd.HDFStore(filepath) as hdf: keys = [k.split('/')[1] for k in hdf.keys()] iterator = tqdm(keys, desc='read_hdf: ') for key in iterator: if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue value = pd.read_hdf(filepath, key) if isinstance(value, pd.DataFrame) and 'geometry' in value.columns: value = gpd.GeoDataFrame(value) self.__setattr__(key, value) # some attributes may have been store in the json_series try: json_dict = self.jsons.to_dict() for key, value in json_dict.items(): self.__setattr__(key, json.loads(value)) except AttributeError: print('coul not read json attributes')
[docs] def read_zip(self, filepath, omitted_attributes=(), only_attributes=None): if only_attributes is not None: only_attributes = {'/' + a for a in only_attributes}.union(only_attributes) omitted_attributes = {'/' + a for a in omitted_attributes}.union(omitted_attributes) # read the zip in a buffer with open(filepath, 'rb') as file: data = file.read() bigbyte = zlib.decompress(data) # build a store from the buffer with pd.HDFStore( "quetzal-%s.h5" % str(uuid.uuid4()), mode="r", driver="H5FD_CORE", driver_core_backing_store=0, driver_core_image=bigbyte ) as store: iterator = tqdm(store.keys()) for key in iterator: skey = key.split(r'/')[-1] iterator.desc = skey if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue value = store.select(key) if isinstance(value, pd.DataFrame) and 'geometry' in value.columns: value = gpd.GeoDataFrame(value) self.__setattr__(skey, value) value = None # some attributes may have been store in the json_series try: json_dict = self.jsons.to_dict() for key, value in json_dict.items(): self.__setattr__(key, json.loads(value)) except AttributeError: print('could not read json attributes')
[docs] def to_excel(self, filepath, prefix='stack'): length = len(prefix) stacks = { name[length+1:]: attr for name, attr in self.__dict__.items() if name[:length] == prefix } with pd.ExcelWriter(filepath) as writer: for name, stack in stacks.items(): stack.reset_index().to_excel(writer, sheet_name=name, index=False)
[docs] def to_frames(self, omitted_attributes=(), only_attributes=None): """ export the full model to a dataframe dict """ jsons = {} attributeerrors = [] frames = {} for key, attribute in self.__dict__.items(): if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue elif isinstance(attribute, gpd.GeoSeries): frames[key] = pd.Series(attribute).astype(object) elif isinstance(attribute, (pd.DataFrame, pd.Series, gpd.GeoDataFrame)): df = pd.DataFrame(attribute) try: df['geometry'] = df['geometry'].astype(object) except: pass frames[key] = df else: try: jsons[key] = json.dumps(attribute) except TypeError: attributeerrors.append(key) frames['jsons'] = pd.Series(jsons) return frames
[docs] def to_zip(self, filepath, complevel=None, *args, **kwargs): frames = self.to_frames(*args, **kwargs) buffer = hdf_io.write_hdf_to_buffer(frames, complevel=complevel) smallbuffer = zlib.compress(buffer) with open(filepath, 'wb') as file: file.write(smallbuffer)
[docs] @track_args def to_hdf(self, filepath, omitted_attributes=(), only_attributes=None): """ export the full model to a hdf database """ try: os.remove(filepath) status = 'overwriting' except FileNotFoundError: status = 'new file' jsons = {} iterator = tqdm(self.__dict__.items(), desc='to_hdf(%s)' % status) attributeerrors = [] for key, attribute in iterator: if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue if isinstance(attribute, gpd.GeoDataFrame): df = pd.DataFrame(attribute) df['geometry'] = df['geometry'].astype(object) df.to_hdf(filepath, key=key, mode='a') elif isinstance(attribute, gpd.GeoSeries): pd.Series(attribute).astype(object).to_hdf( filepath, key=key, mode='a') elif isinstance(attribute, (pd.DataFrame, pd.Series)): df = attribute try: df['geometry'] = df['geometry'].astype(object) except: pass df.to_hdf(filepath, key=key, mode='a') else: try: jsons[key] = json.dumps(attribute) except TypeError: attributeerrors.append(key) for key in attributeerrors: log('could not save attribute: ' + key, self.debug) json_series = pd.Series(jsons) # mode=a : we do not want to overwrite the file ! json_series.to_hdf(filepath, 'jsons', mode='a')
[docs] def to_zipped_hdf(self, filepath, *args, **kwargs): filedir = ntpath.dirname(filepath) tempdir = filedir + '/quetzal_temp' + '-' + str(uuid.uuid4()) os.mkdir(tempdir) hdf_path = tempdir + r'/model.hdf' self.to_hdf(hdf_path, *args, **kwargs) shutil.make_archive(filepath.split('.zip')[0], 'zip', tempdir) shutil.rmtree(tempdir)
[docs] @track_args def to_json(self, folder, omitted_attributes=(), only_attributes=None, verbose=False): """ export the full model to a hdf database """ try: os.makedirs(folder, exist_ok=True) status = 'overwriting' except FileNotFoundError: status = 'new file' jsons = {} iterator = tqdm(self.__dict__.items(), desc='to_hdf(%s)' % status) attributeerrors = [] for key, attribute in iterator: if key in omitted_attributes: continue if only_attributes is not None and key not in only_attributes: continue root_name = folder + '/' + key geojson_file = root_name + '.geojson' json_file = root_name + '.json' for filename in (geojson_file, json_file): try: os.remove(filename) except OSError: pass if isinstance(attribute, (pd.DataFrame, pd.Series)): msg = 'datframe attributes must have unique index:' + key assert attribute.index.is_unique, msg attribute = pd.DataFrame(attribute) # copy and type conversion attribute.drop('index', axis=1, errors='ignore', inplace=True) attribute.index.name = 'index' attribute = attribute.reset_index() # loss of index name attribute.rename(columns={x: str(x) for x in attribute.columns}, inplace=True) df = attribute geojson_columns = [ c for c in df.columns if authorized_column(df, c) or c in ('index', 'geometry') ] json_columns = [ c for c in df.columns if c not in geojson_columns] try: gpd.GeoDataFrame(attribute[geojson_columns]).to_file( geojson_file, driver='GeoJSON' ) if len(json_columns): attribute[json_columns + ['index'] ].to_json(root_name + '_quetzaldata.json') # "['geometry'] not in index" except (AttributeError, KeyError, ) as e: if verbose: print(e) df = pd.DataFrame(attribute).drop( 'geometry', axis=1, errors='ignore') df.to_json(root_name + '.json') except ValueError as e: if verbose: print(e) # Geometry column cannot contain mutiple geometry types when writing to file. print('could not save geometry from table ' + key) df = pd.DataFrame(attribute).drop( 'geometry', axis=1, errors='ignore') df.to_json(root_name + '.json') else: try: jsons[key] = json.dumps(attribute) except TypeError: attributeerrors.append(key) for key in attributeerrors: print('could not save attribute: ' + key) json_series = pd.Series(jsons) json_series.name = 'json' json_series = json_series.reset_index() json_series.to_json(folder + '/' + 'jsons.json')
[docs] def read_json(self, folder): files = os.listdir(folder) geojson_attributes = [file.split('.geojson')[0] for file in files if '.geojson' in file] json_attributes = [file.split('.json')[0] for file in files if '.json' in file] for key in json_attributes: value = pd.read_json('%s/%s.json' % (folder, key)) value.set_index('index', inplace=True) self.__setattr__(key, value) for key in geojson_attributes: value = gpd.read_file('%s/%s.geojson' % (folder, key)) value.set_index('index', inplace=True) self.__setattr__(key, pd.DataFrame(value)) # some attributes may have been store in the json_series try: json_dict = self.jsons['json'].to_dict() for key, value in json_dict.items(): self.__setattr__(key, json.loads(value)) except AttributeError: print('error') # some attributes have been stored separately in quetzaldata.json files to_delete = [] for key, data in self.__dict__.items(): if '_quetzaldata' in key: key_to_set = key.split('_quetzaldata')[0] left = self.__getattribute__(key_to_set) merged = pd.merge( left, data, left_index=True, right_index=True) self.__setattr__(key_to_set, merged) to_delete.append(key) for key in to_delete: self.__delattr__(key)
[docs] @track_args def to_json_database(self): """ Dumps the model into a single json organized as follow: json_database = { 'geojson': { # Contains all GeoDataFrame objects key: value }, 'pd_json': { # Contains all DataFrame objects but GeoDataFrame key: value }, 'json': { # Contains all other objects (model parameters) key: value } } Args: stepmodel Returns: json_database (json): the single json representation of the model """ iterator = tqdm(self.__dict__.items(), desc='to_json_database') # Create json_database object json_database = { 'geojson': {}, 'pd_json': {}, 'json': {} } attributeerrors = [] for key, attribute in iterator: # If DataFrame attribute if isinstance(attribute, (pd.DataFrame, pd.Series)): # Check index assert attribute.index.is_unique, 'DataFrame attributes must have unique index' attribute = pd.DataFrame(attribute) # copy and type conversion attribute.drop('index', axis=1, errors='ignore', inplace=True) attribute.index.name = 'index' attribute = attribute.reset_index() # loss of index name try: geojson_to_store = gpd.GeoDataFrame(attribute).to_json() json_database['geojson'].update({key: geojson_to_store}) except KeyError: df = pd.DataFrame(attribute).drop( 'geometry', axis=1, errors='ignore') json_to_store = df.to_json() json_database['pd_json'].update({key: json_to_store}) # Else parameter attribute else: try: json_database['json'][key] = json.dumps(attribute) except TypeError: attributeerrors.append(key) for key in attributeerrors: print('could not save attribute: ' + key) return(json.dumps(json_database))
[docs] def read_json_database(self, json_database): """ Load model from its json_database representation. Args: stepmodel json_database (json): the json_database model representation Returns: None """ # Load json json_database = json.loads(json_database) # Geojson objects for key, attr in json_database['geojson'].items(): value = gpd.GeoDataFrame.from_features(json.loads(attr)) value.set_index('index', inplace=True) self.__setattr__(key, pd.DataFrame(value)) # Dataframe objects for key, attr in json_database['pd_json'].items(): value = pd.read_json(attr) value.set_index('index', inplace=True) self.__setattr__(key, pd.DataFrame(value)) # Parameters for key, attr in json_database['json'].items(): self.__setattr__(key, json.loads(attr))
[docs] def copy(self): copy = deepcopy(self) return copy
[docs] def merge(self, *args, **kwargs): return merge(left=self, *args, **kwargs)
[docs] @track_args def change_epsg(self, epsg, coordinates_unit): projected_model = self.copy() iterator = tqdm( projected_model.__dict__.items(), desc='Reprojecting model from epsg {} to epsg {}'.format( self.epsg, epsg) ) failed = [] for key, attribute in iterator: if isinstance(attribute, (gpd.GeoDataFrame, gpd.GeoSeries)): try: attribute.crs = {'init': 'epsg:{}'.format(self.epsg)} attribute = attribute.to_crs(epsg=epsg) projected_model.__setattr__(key, attribute) except RuntimeError: # b'tolerance condition error', b'latitude or longitude exceeded limits' failed.append(key) elif isinstance(attribute, pd.DataFrame): if 'geometry' in attribute.columns: try: # print('Converting {}'.format(key)) temp = gpd.GeoDataFrame(attribute) temp.crs = {'init': 'epsg:{}'.format(self.epsg)} attribute = pd.DataFrame(temp.to_crs(epsg=epsg)) projected_model.__setattr__(key, attribute) except RuntimeError: # b'tolerance condition error', b'latitude or longitude exceeded limits' failed.append(key) elif isinstance(attribute, pd.Series): try: temp = gpd.GeoSeries(attribute) temp.crs = {'init': 'epsg:{}'.format(self.epsg)} attribute = pd.Series(temp.to_crs(epsg=epsg)) projected_model.__setattr__(key, attribute) except: if attribute.name == 'geometry': failed.append(key) projected_model.epsg = epsg projected_model.coordinates_unit = coordinates_unit if len(failed) > 0: print('could not change epsg for the following attributes: ') print(failed) return projected_model
[docs] def describe(self): data = [] # general case for attr, value in self.__dict__.items(): if isinstance(value, bool): desc = value elif isinstance(value, str): desc = value elif isinstance(value, int): desc = value elif isinstance(value, gpd.GeoDataFrame) and value.centroid.equals(gpd.GeoSeries({0: Point(0, 0)})): continue elif isinstance(value, pd.DataFrame) and value.empty: continue elif isinstance(value, list) and len(value) < 4: desc = value elif isinstance(value, dict) and len(value) < 4: desc = value else: try: desc = len(value) except TypeError: desc = None data.append([attr, type(value), desc]) # specific cases return pd.DataFrame(data=data, columns=['name', 'type', 'desc']).set_index('name').sort_index()