Source code for geomapviz.aggregator

import numpy as np
import pandas as pd
import warnings
import geopandas as gpd
from typing import Optional, Union, Tuple, List
from .utils import check_list_of_str


[docs]def encode_categorical_columns(df: pd.DataFrame) -> pd.DataFrame: """ Encode categorical columns in the input DataFrame using the `.cat.codes` method. Parameters ---------- df : Input DataFrame to encode categorical columns. Returns ------- pd.DataFrame Returns a new DataFrame with categorical columns encoded. Examples -------- >>> import pandas as pd >>> from typing import List >>> df = pd.DataFrame({'A': pd.Categorical(['a', 'b', 'c', 'a'], categories=['a', 'b', 'c']), >>> 'B': pd.Categorical(['b', 'a', 'b', 'c'], categories=['a', 'b', 'c']), >>> 'C': [1, 2, 3, 4], >>> 'D': [5, 6, 7, 8]}) >>> encoded_df = encode_categorical_columns(df) >>> print(encoded_df) """ cat_cols = df.select_dtypes("category").columns.tolist() if cat_cols: for c in cat_cols: df[c] = df[c].cat.codes return df
[docs]def prepare_dataframe( df: pd.DataFrame, groups: Union[List[str], str], target: str, other_cols_avg: Optional[List[str]] = None, weight: Optional[str] = None, verb: int = 0, distr: str = "gaussian", ) -> pd.DataFrame: """ Prepare dataframe for the confidence interval computation. Parameters ---------- df : Input data. groups : List of column names containing the groups of interest. target : Name of the target column. other_cols_avg : Other columns to average, such as the predicted values of a model weight : Name of the weight column. Default is None. verb : Controls the verbosity of the warning message. Default is 0. distr : Name of the distribution. Default is "gaussian". Returns ------- pd.DataFrame Prepared dataframe. Notes ----- If weight is None, a weight column is added and set to 1. If the distribution is not Gaussian and the weight is not provided, a warning message is raised. """ if isinstance(groups, str): groups = [groups] check_list_of_str(groups) check_list_of_str(other_cols_avg) if other_cols_avg is None: other_cols_avg = [] if weight is None: weight = "weight" df_ = df[groups + other_cols_avg + [target]].copy() df_[weight] = 1 if verb > 0 and distr != "gaussian": warnings.warn( "Weight not provided, using the Gaussian approx for " "the CI. For the Poisson or Gamma ci, please provide weights (exposure or ncl)" ) else: df_ = df[groups + other_cols_avg + [weight, target]].copy() # to count rows for each level df_["count"] = 1 df_ = encode_categorical_columns(df_) # for convenience and less complexity through the different function # let's rename the columns df_ = df_.rename(columns={target: "target", weight: "weight"}) return df_
[docs]def compute_weighted_average( df: pd.DataFrame, groups: Union[str, List[str]], target: str = "target", weight: str = "weight", other_cols_avg: Optional[List[str]] = None, ): """compute_weighted_average computes the weighted arithmetic average, grouped by the column `group`. The weighted average is :math: `\sum_{i} w_{i} x_{i} / \sum_{i} w_{i}` If the weight is None, it computes the arithmetic average without weights :math: `\sum_{i} x_{i} / N` Parameters ---------- df : the data set groups : the predictor(s) to group by target : the name of the observed/target column weight : the name of the column weight other_cols_avg : Other columns to average, such as the predicted values of a model Returns ------- pd.DataFrame the dataframe with the arithmetic average, by group """ # the weighted avg is sum(x_i * w_i) / sum(w_i * w_j) # this is the numerator df[target] = df[target] * df[weight] # check if str --> make a list if isinstance(groups, str): groups = [groups] check_list_of_str(groups) if other_cols_avg is None: df = ( df.groupby(groups)[[weight, target, "count"]] .sum() .reset_index() .assign(target=lambda x: x[target] / x[weight]) ) return df else: df[other_cols_avg] = df[other_cols_avg].values * np.expand_dims( df[weight].values, axis=-1 ) keep_cols = other_cols_avg + [target, weight, "count"] df = ( df.groupby(groups)[keep_cols] .sum() .reset_index() .assign(target=lambda x: x[target] / x[weight]) ) df[other_cols_avg] = df[other_cols_avg].values / np.expand_dims( df[weight].values, axis=-1 ) return df
[docs]def compute_confidence_interval( df: pd.DataFrame, groups: Union[str, List[str]], target: str = "target", weight: str = "weight", other_cols_avg: Optional[List[str]] = None, distr: str = "gaussian", n_std: float = 2.0, ): # check if str --> make a list if isinstance(groups, str): groups = [groups] check_list_of_str(groups) selected_cols = groups + [target, weight, "count"] # update the list of selected columns if predictions are included if other_cols_avg: selected_cols = list(set(selected_cols).union(set(other_cols_avg))) df_long = pd.melt( df[selected_cols].copy(), id_vars=groups + [weight, "count"], var_name="model", value_name="avg", ) if distr == "poisson": df_long["target_std"] = np.sqrt(df_long["avg"] / df_long[weight]) elif distr == "gamma": df_long["target_std"] = df_long["avg"] * np.sqrt(1 / df_long[weight]) elif distr == "gaussian": df_long["target_std"] = df_long["avg"] * np.sqrt(1 / df_long["count"]) else: warnings.warn( 'distr is not in ["poisson", "gamma", "gaussian"], using Gaussian approx. for the conf. int.' ) df_long["target_std"] = df_long["avg"] * np.sqrt(1 / df_long["count"]) df_long["ci_low"] = df_long["avg"] - n_std * df_long["target_std"] df_long["ci_low"] = df_long["ci_low"].clip(lower=0) df_long["ci_up"] = df_long["avg"] + n_std * df_long["target_std"] upper_bound = df_long["ci_up"].quantile(0.999) df_long["ci_up"] = df_long["ci_up"].clip(upper=upper_bound) df_long = df_long.reset_index()[ ["model"] + groups + ["avg", "ci_low", "ci_up", weight, "count"] ] return df_long
[docs]def weighted_average_aggregator( df: pd.DataFrame, groups: Union[str, List[str]], target: str, other_cols_avg: Optional[List[str]] = None, distr: str = "gaussian", weight: str = None, verb: int = 0, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Computes the weighted average and the confidence interval of a target variable in a Pandas DataFrame, grouped by one or more categorical columns. Parameters ---------- df : The input DataFrame to compute the weighted average and confidence interval on. groups : The name(s) of the column(s) in `df` that define the groups to aggregate. If `groups` is a string, it will be interpreted as a single group column name. If `groups` is a list of strings, it will be interpreted as multiple group column names. target : The name of the column in `df` that contains the target variable to aggregate. other_cols_avg : The predicted values of the target variable to use for computing the confidence interval or any other columns to average. If `other_cols_avg` is not None, it should be a list of column names. distr : The distribution to use for computing the confidence interval. Supported distributions are 'gaussian' (default), 't' and 'bootstrap'. weight : The name of the column in `df` that contains the weights to use for computing the weighted average. If `weight` is None (default), all rows are assumed to have equal weight. verb : Verbosity level of the function (0: no message, 1: info, 2: debug). The default is 0. Returns ------- Tuple[pandas.DataFrame, pandas.DataFrame] A tuple of two DataFrames: - The first DataFrame contains the weighted average and the number of observations per group. - The second DataFrame contains the confidence interval of the weighted average, computed at 95% confidence level. Raises ------ ValueError If any of the input arguments is invalid. Examples -------- >>> import pandas as pd >>> from my_module import weighted_average_aggregator >>> data = pd.DataFrame({'color': ['red', 'green', 'red', 'green', 'green'], ... 'size': ['small', 'large', 'medium', 'large', 'small'], ... 'price': [1.0, 2.0, 3.0, 4.0, 5.0]}) >>> groups = ['color', 'size'] >>> target = 'price' >>> weights = 'weights' >>> data[weights] = [1, 2, 3, 4, 5] >>> result, conf = weighted_average_aggregator(df=data, groups=groups, target=target, weight=weights) """ # check if str --> make a list if isinstance(groups, str): groups = [groups] check_list_of_str(groups) df_ = prepare_dataframe( df=df, groups=groups, target=target, other_cols_avg=other_cols_avg, weight=weight, verb=verb, distr=distr, ) df_ = encode_categorical_columns(df_) # for convenience and less complexity through the different function # let's rename the columns df_ = df_.rename(columns={target: "target", weight: "weight"}) df_ = compute_weighted_average( df=df_, groups=groups, other_cols_avg=other_cols_avg, weight="weight", target="target", ) df_long = compute_confidence_interval( df=df_, groups=groups, other_cols_avg=other_cols_avg, distr=distr, n_std=2.0, weight="weight", target="target", ) return df_, df_long
[docs]def merge_zip_df( zip_path: str, df: pd.DataFrame, geoid: str = "geoid", cols_to_keep: Optional[List[str]] = None, ) -> pd.DataFrame: """ Merge a DataFrame `df` with a mapping table for the zipcode and other relevant geographical information (district name, sub-districts, etc.). The key is the `geoid` column. The zip mapper might be such as: | | geoid | town | lat | long | postcode | district | borough | | 0 | 21004 | BRUSSEL | 50.8333 | 4.35 | 1000 | Brussels | Brussel Hoofdstad | | 1 | 21015 | SCHAARBEEK | 50.85 | 4.38333 | 1030 | Brussels | Brussel Hoofdstad | Parameters ---------- zip_path : The path to the zipcode mapper, a csv file with additional geo info and a geoid column df : The DataFrame to merge with the zipcode mapper geoid : The name of the `geoid` column in both the `df` and the zipcode mapper cols_to_keep : The list of columns to keep from the zipcode mapper. If None, keep all columns. Returns ------- pd.DataFrame The merged DataFrame with additional geo information Raises ------ TypeError If `cols_to_keep` is not None and not a list of strings """ if (not isinstance(cols_to_keep, list)) and (cols_to_keep is not None): raise TypeError("If `cols_to_keep` is not None, it should be a list of strings") # Load zipcode mapper and adding borough to the DataFrame zip_df = pd.read_csv(zip_path) zip_df["geoid"] = zip_df["geoid"].astype(str) if cols_to_keep is not None: zip_map = zip_df[["geoid"] + cols_to_keep].copy() else: zip_map = zip_df.copy() df[geoid] = df[geoid].astype(str) zip_map["geoid"] = zip_map["geoid"].astype(str) df = pd.merge(df, zip_map, how="left", left_on=[geoid], right_on=["geoid"]) if geoid != "geoid": df = df.drop([geoid], axis=1) return df
[docs]def dissolve_and_aggregate( df: pd.DataFrame, target: str, other_cols_avg: Optional[List[str]] = None, dissolve_on: Optional[List[str]] = None, distr: str = "gaussian", geoid: str = "INS", weight: Optional[List[str]] = None, shp_file: Union[gpd.geodataframe.GeoDataFrame, None] = None, ) -> gpd.GeoDataFrame: """ Dissolves a GeoDataFrame based on a column, and aggregates data based on the dissolved polygons. Parameters ---------- df : Dataframe with the data to be aggregated. cols_to_plot : List of columns to plot on map. target : Column with the target variable. other_cols_avg : Columns with the predicted values or any other columns to average. distr : Distribution of the target variable, by default "gaussian". weight : Column with the weights to be used, by default None. dissolve_on : Column to dissolve the GeoDataFrame, by default None. geoid : Column with the geoid, by default "geoid". shp_file : The shapefile to use for the map, as a GeoDataFrame. The default is None. Returns ------- geopandas.GeoDataFrame geodataframe with the dissolved polygons. """ # sanity checks if not isinstance(shp_file, gpd.geodataframe.GeoDataFrame): raise TypeError("The shapefile should be a GeoDataFrame") geom_merc = shp_file.copy() if other_cols_avg and not isinstance(other_cols_avg, list): raise TypeError("'other_cols_avg' should be a list of strings or None") if dissolve_on: if dissolve_on not in df.columns: raise KeyError(f"{dissolve_on} is not a column in df") groups = dissolve_on else: groups = geoid df_, df_long = weighted_average_aggregator( df=df, groups=groups, target=target, other_cols_avg=other_cols_avg, distr=distr, weight=weight, ) if dissolve_on: geo_df = geom_merc.dissolve(by=dissolve_on).reset_index() merge_key = dissolve_on else: geo_df = geom_merc.reset_index() merge_key = geoid df_long = df_long.fillna(0) df_[merge_key] = df_[merge_key].astype(str) geo_df[merge_key] = geo_df[merge_key].astype(str) geo_df = geo_df.merge(df_long, left_on=merge_key, right_on=merge_key, how="left") return gpd.GeoDataFrame(geo_df)