Source code for responsetimefitting

import numpy as np
import pandas as pd

from scipy.stats import lognorm, gamma, bayes_mvs
from sklearn import linear_model
import copy

from fdsim.helpers import lonlat_to_xy, xy_to_lonlat


[docs]def prepare_data_for_response_time_analysis(incidents, deployments, stations, vehicles): """ Prepare data for fitting dispatch, turnout, and travel times. Parameters ---------- incidents: pd.DataFrame Contains the log of incidents. deployments: pd.DataFrame Contains the log of deployments. stations: pd.DataFrame Contains information on the fire stations. vehicles: array-like of strings The vehicles to keep in the resulting data. Must correspond to entries in the 'voertuig_groep' column in 'incidents'. Returns ------- The merged and preprocessed DataFrame. """ # specify duplicate column names before merging incidents.rename({"kazerne_groep": "incident_kazerne_groep"}, axis="columns", inplace=True) deployments.rename({"kazerne_groep": "inzet_kazerne_groep"}, axis="columns", inplace=True) # merge incidents and deployments merged = pd.merge(deployments, incidents, how="inner", left_on="hub_incident_id", right_on="dim_incident_id") # preprocess station name merged['inzet_kazerne_groep'] = merged['inzet_kazerne_groep'].str.upper() # rename x, y coordinate columns to avoid confusion merged.rename({"st_x": "incident_xcoord", "st_y": "incident_ycoord"}, axis="columns", inplace=True) # add longitude, latitude coordinates as well merged[["incident_longitude", "incident_latitude"]] = \ merged[["incident_xcoord", "incident_ycoord"]] \ .apply(lambda x: xy_to_lonlat(x[0], x[1]), axis=1) \ .apply(pd.Series) # add x, y coordinates to station location data stations["station_xcoord"], stations["station_ycoord"] = \ [list(l) for l in zip(*list(stations.apply( lambda x: lonlat_to_xy(x["lon"], x["lat"]), axis=1)))] # preprocess station name in same way as for the deployments stations["kazerne"] = stations["kazerne"].str.upper() # and rename to avoid confusion stations.rename({"lon": "station_longitude", "lat": "station_latitude"}, axis="columns", inplace=True) # create the merged dataset df = pd.merge(merged, stations, left_on="inzet_kazerne_groep", right_on="kazerne", how="inner") # ensure data types df["inzet_rijtijd"] = df["inzet_rijtijd"].astype(float) # remove NaNs in location and cast to string df = df[~df["hub_vak_bk"].isnull()].copy() df["hub_vak_bk"] = df["hub_vak_bk"].astype(int).astype(str) df = df[df["hub_vak_bk"].str[0:2] == "13"] # filter vehicles df = df[np.isin(df["voertuig_groep"], vehicles)] return df
[docs]def get_osrm_distance_and_duration(longlat_origin, longlat_destination, osrm_host="http://192.168.56.101:5000"): """ Calculate distance over the road and normal travel duration from one point to the other. Parameters ---------- longlat_origin: tuple(float, float) coordinates of the start location in decimal longitude and latitude (in that order). longlat_destination: tuple(float, float) coordinates of the destination in decimal longitude and latitude. osrm_host: str The URL to the OSRM API. Returns ------- Tuple of ('distance', 'duration') according to OSRM. Notes ----- Requires OSRM to be installed (an optional dependency of fdsim). """ try: import osrm except ImportError: raise ImportError("Please install the OSRM Python package to calculate travel" "distances and durations.") osrm.RequestConfig.host = osrm_host result = osrm.simple_route(longlat_origin, longlat_destination, output="route", geometry="wkt")[0] return result["distance"], result["duration"]
[docs]def add_osrm_distance_and_duration(df, osrm_host="http://192.168.56.101:5000"): """ Calculate distance and duration over the road from station to incident for every incident in the data. Parameters ---------- df: DataFrame The merged data of incidents and deployments. Must contain the following columns: {station_longitude, station_latitude, incident_longitude, incident_latitude}.If not present, call 'prepare_data_for_response_time_analysis' first. osrm_host: str The URL to the OSRM API, defaults to 'http://192.168.56.101:5000', which is the default if running OSRM locally. Returns ------- The DataFrame with two added columns 'osrm_distance' (meters) and 'osrm_duration' (seconds). Notes ----- Requires OSRM to be installed (an optional dependency of fdsim). """ try: import osrm except ImportError: raise ImportError("Please install the OSRM Python package to calculate travel" "distances and durations.") osrm.RequestConfig.host = osrm_host df[["osrm_distance", "osrm_duration"]] = \ df.apply(lambda x: get_osrm_distance_and_duration( (x["station_longitude"], x["station_latitude"]), (x["incident_longitude"], x["incident_latitude"])), axis=1).apply(pd.Series) return df
[docs]def fit_simple_linear_regression(data, xcol, ycol, fit_intercept=False): """ Fit simple linear regression on the data. Parameters ---------- data: DataFrame The data to fit a model on. xcol: str The name of the column that acts as a predictor. ycol: str The name of the column acting as the dependent variable. fit_intercept: boolean If true, also fits the intercept. If false, forces intercept of the resulting model to 0. NOTE: Defaults to false. Returns ------- Parameters of fitted model, i.e., a tuple of (intercept, coefficient) """ lm = linear_model.LinearRegression(fit_intercept=fit_intercept) lm.fit(data[xcol].values.reshape(-1, 1), data[ycol]) return lm.intercept_, lm.coef_[0]
[docs]def fit_lognorm_rv(x, **kwargs): """ Fit a lognormal distribution to data and return a fitted random variable that can be used for sampling. Parameters ---------- x: array-like The data to fit. **kwargs: additional arguments passed to scipy.stats.lognorm.fit() Returns ------- The fitted scipy.stats.lognorm object. """ shape, loc, scale = lognorm.fit(x, **kwargs) return lognorm(shape, loc, scale)
[docs]def fit_gamma_rv(x, **kwargs): """ Fit a Gamma distribution to data and return a fitted random variable that can be used for sampling. Parameters ------ x: array-like The data to fit. **kwargs: additional arguments passed to scipy.stats.gamma.fit() Returns ------- The fitted scipy.stats.gamma object. """ shape, loc, scale = gamma.fit(x, **kwargs) return gamma(shape, loc, scale)
[docs]def safe_bayes_mvs(x, alpha=0.9): """ Bayesian confidence intervals for mean and standard deviation. Parameters ---------- x: array-like The data to compute confidence intervals over alpha: float The confidence level, defaults to 0.9 (90% confidence) Returns ------- Tuple of (lower bound mean, upper bound mean, lower bound std, upper bound std). """ if len(x) > 1: mean, _, std = bayes_mvs(x, alpha=alpha) mean_lb = mean[1][0] mean_ub = mean[1][1] std_lb = std[1][0] std_ub = std[1][1] return mean_lb, mean_ub, std_lb, std_ub else: return 0, np.inf, 0, np.inf
[docs]def sample_size_sufficient(x, alpha=0.95, max_mean_range=30, max_std_range=25): """ Determines if sample size is sufficient based on Bayesian confidence intervals and tresholds on the maximum range. Parameters ---------- x: array-like The data to evaluate. alpha: float in range (0,1) The confidence level, defaults to 0.95 (95%). max_mean_range: float or int the maximum range of the confidence interval for the mean to classify the sample size as sufficient. max_std_range: float or int the maximum range of the confidence interval for the standard deviation to classify the sample size as sufficient. Returns ------- True if the size of the confidence intervals are within the specified tresholds, False otherwise. """ mean_lb, mean_ub, std_lb, std_ub = safe_bayes_mvs(x, alpha=alpha) if (mean_ub - mean_lb < max_mean_range) & (std_ub - std_lb < max_std_range): return True else: False
[docs]def robust_remove_travel_time_outliers(data): """ Remove outliers in travel time in a robust way by looking at the 50% most reliable data points. Performs two one-way outlier detection methods. It determines tresholds average speed and on time per distance unit (the inverse of speed) and cuts the values falling off on the high side. Tresholds are computed as follows: $$limit = 75% quantile + 1.5*(75% quantile - 25% quantile)$$ Thus only data points between the 25% and 75% quantiles are used, which are likely to be reliable points. This makes the method robust against unreliable data. Parameters ---------- data: DataFrame The data to remove outliers from. Assumes the columns 'osrm_distance' and 'inzet_rijtijd' to be present. Returns ------- tuple of (filtered DataFrame, minimum speed, maximum speed), where speed is in kilometers per hour. """ # add km/h and seconds/meter as columns data["km_h"] = data["osrm_distance"] / data["inzet_rijtijd"] * 3.6 data["s_m"] = data["inzet_rijtijd"] / data["osrm_distance"] # calculate tresholds speed_treshold = data[["km_h", "s_m"]].describe() \ .apply(lambda x: x["75%"] + 1.5*(x["75%"]-x["25%"])) max_speed = speed_treshold.loc["km_h"] min_speed = 1 / speed_treshold.loc["s_m"] * 3.6 # filter data and return df_filtered = data[(data["km_h"] > min_speed) & (data["km_h"] < max_speed)].copy() df_filtered.drop(["km_h", "s_m"], axis=1, inplace=True) return df_filtered, min_speed, max_speed
[docs]def model_noise_travel_time(y, x, a, b): """ Fit a random variable to the residual of simple linear regression on the travel time. Parameters ---------- y: array-like The values to predict / simulate. x: array-like, same shape as y The independent variable that partially explains y. a: float, The intercept of the linear model $y ~ a + b*x$. b: float The coefficient of the linear model $y ~ a + b*x$. Returns ------- A Lognormally distributed random variable (scipy.stats.lognorm) fitted on the residual: $y - (a + bx)$. """ noise_factor = (np.array(y) - a) / (b*np.array(x)) shape, loc, scale = lognorm.fit(noise_factor, loc=np.min(noise_factor), scale=1) rv = lognorm(shape, loc, scale) return rv
[docs]def model_travel_time(data): """ Model the travel time as a function of the estimated travel time from OSRM. Parameters ---------- data: DataFrame Output of 'prepare_data_for_response_time_analysis'. Returns ------- Tuple of (intercept, coefficient, residual random variable), where the intercept and coefficient form a linear model predicting the travel time based on the OSRM estimated travle duration and the random variable is a scipy.stats.lognorm object explaining the residual after prediction. The results can be used to simulate travel times for arbitrary incidents. """ # remove records without travel time data = data[~data["inzet_rijtijd"].isnull()].copy() # remove unreliable data points data, _, _ = robust_remove_travel_time_outliers(data) # fit linear model: osrm duration -> realized duration intercept, coefficient = \ fit_simple_linear_regression(data, "osrm_duration", "inzet_rijtijd") # model the residuals as a lognormal random variable residual_rv = model_noise_travel_time(data["inzet_rijtijd"], data["osrm_duration"], intercept, coefficient) return intercept, coefficient, residual_rv
[docs]def model_travel_time_per_vehicle(data): """ Model the travel time for every vehicle type separately. Parameters ---------- data: pd.DataFrame Output of 'prepare_data_for_response_time_analysis'. Returns ------- Dictionary like: {'vehicle' -> {'a' -> intercept, 'b' -> coefficient, 'noise_rv' -> random variable for noise}} See 'model_travel_time' for details on those variables. """ backup_a, backup_b, backup_rv = model_travel_time(data) travel_time_dict = {} travel_time_dict["overall"] = {"a": backup_a, "b": backup_b, "noise_rv": backup_rv} for v in data["voertuig_groep"].unique(): df_vehicle = data[data["voertuig_groep"] == v] if len(df_vehicle) > 1000: a, b, rv = model_travel_time(df_vehicle) travel_time_dict[v] = {"a": a, "b": b, "noise_rv": rv} else: travel_time_dict[v] = {"a": backup_a, "b": backup_b, "noise_rv": backup_rv} return travel_time_dict
[docs]def fit_dispatch_times(data, rough_upper_bound=600): """ Fit a lognormal random variable to the dispatch time per incident type. Parameters ---------- data: DataFrame Merged log of deployments and incidents. All deployments in the data will be used for fitting, so any filtering (e.g., on priority or 'volgnummer') must be done in advance. rough_upper_bound: int Number of seconds to use as a rough upper bound filter, dispatch times above this value are considered unrealistic/unreliable and are removed before fitting. DEfaults to 600 seconds (10 minutes). Returns ------- A dictionary like {'incident type' -> 'scipy.stats.lognorm object'}. """ # calculate dispatch times data["dispatch_time"] = \ (pd.to_datetime(data["inzet_gealarmeerd_datumtijd"], dayfirst=True) - pd.to_datetime(data["dim_incident_start_datumtijd"], dayfirst=True)).dt.seconds # filter out unrealistic values data = data[data["dispatch_time"] <= rough_upper_bound].copy() # fit one random variable on all data for the types with too few # observations backup_rv = fit_lognorm_rv(data["dispatch_time"], loc=np.min(data["dispatch_time"]), scale=100) # fit variables per incident type (use backup rv if not enough samples) rv_dict = {} for type_ in data["dim_incident_incident_type"].unique(): X = data[data["dim_incident_incident_type"] == type_]["dispatch_time"] if sample_size_sufficient(X): rv_dict[type_] = fit_lognorm_rv(X, loc=np.min(X), scale=100) else: rv_dict[type_] = copy.deepcopy(backup_rv) return rv_dict
[docs]def add_parttime_fulltime_indicator(data, station_col="inzet_kazerne_groep", volunteer_stations=None): """ Add a column to the data, indicating whether it is a fulltime manned station or a parttime (volunteer) station. Parameters ---------- data: pd.DataFrame The data to add the column to. station_col: str, optional (default: "inzet_kazerne_groep") The column indicating the station responsible for the deployment. volunteer_stations: array-like of strings, optional, The station names (all uppercases) of the stations that are parttime. Returns ------- data: pd.DataFrame The data with an added boolean column "fulltime". """ stations = data[station_col].unique() is_full_time_station = {station: True for station in stations} if volunteer_stations is not None: for station in volunteer_stations: is_full_time_station[station] = False data["fulltime"] = data[station_col].apply(lambda x: is_full_time_station[x]) return data
[docs]def fit_turnout_times(data, prios=[1, 2, 3], vehicle_types=["TS", "RV", "HV", "WO"], rough_lower_bound=30, rough_upper_bound=600, stations_to_exclude=None, station_col="inzet_kazerne_groep", volunteer_stations=None): """ Fit a lognormal random variable to the turn-out time per appointment (fulltime/parttime), priority, and vehicle type. Parameters ---------- data: DataFrame Merged log of deployments and incidents. All deployments in the data will be used for fitting, so any filtering (e.g., on priority or 'volgnummer') must be done in advance. prios: array-like of int, optional (default: [1, 2, 3]) The priority levels to fit turnout times for. rough_lower_bound, rough_upper_bound: int Number of seconds to use as a rough lower and upper bound filter, turn-out times outside these value are considered unrealistic/unreliable and are removed before fitting. Defaults to 600 seconds (10 minutes) and 30 seconds. stations_to_exclude: array-like of str, optional (default: None) Stations to remove from the data before fitting. Some stations may imply invalid deployments (e.g, "Regio", "Onbekend"), which could influence the turnout times. station_col: str, optional (default: "inzet_kazerne_groep") The column in data that holds the station responsible for the deployment. volunteer_stations: array-like of str, optional (default: None) The names of the stations that are run by volunteers. These are fitted separately from full time stations. Returns ------- A dictionary like {'prio' -> {'parttime' -> 'scipy.stats.gamma object', 'fulltime' -> 'scipy.stats.gamma object'}}. """ # filter stations data[station_col] = data[station_col].str.upper() if stations_to_exclude: data = data[~np.isin(data[station_col], stations_to_exclude)].copy() # add full time or part time indicator data = add_parttime_fulltime_indicator(data, station_col=station_col, volunteer_stations=volunteer_stations) # calculate dispatch times data["turnout_time"] = (pd.to_datetime(data["inzet_uitgerukt_datumtijd"], dayfirst=True) - pd.to_datetime(data["inzet_gealarmeerd_datumtijd"], dayfirst=True) ).dt.seconds # filter out unrealistic values data = data[(data["turnout_time"] > rough_lower_bound) & (data["turnout_time"] <= rough_upper_bound)].copy() # fit variables per incident type (use backup rv if not enough samples) rv_dict = {} for appointment in ["fulltime", "parttime"]: df = data[data["fulltime"] == (appointment == "fulltime")].copy() backup_rv = fit_gamma_rv(df["turnout_time"], scale=100) df_rv_dict = {} for prio in prios: df_prio = df[df["dim_prioriteit_prio"] == prio] prio_backup_rv = fit_gamma_rv(df["turnout_time"], scale=100) prio_rv_dict = {} for vtype in vehicle_types: X = df[df["voertuig_groep"] == vtype]["turnout_time"] if sample_size_sufficient(X): prio_rv_dict[vtype] = fit_gamma_rv(X, scale=100) else: prio_rv_dict[vtype] = copy.deepcopy(prio_backup_rv) df_rv_dict[prio] = prio_rv_dict.copy() rv_dict[appointment] = df_rv_dict.copy() return rv_dict
[docs]def fit_onscene_times(data, vehicles=["TS", "HV", "RV", "WO"], rough_lower_bound=60, rough_upper_bound=24*60*60): """ Fit a lognormal random variable to the dispatch time per incident type. Parameters ---------- data: DataFrame Merged log of deployments and incidents. All deployments in the data will be used for fitting, so any filtering (e.g., on priority or 'volgnummer') must be done in advance. vehicles: array-like of strings The vehicles to fit on-scene times for. Optional, defaults to ["TS", "HV", "RV", "WO"]. rough_lower_bound: int Number of seconds to use as a rough lower bound filter, on-scene times below this value are considered unrealistic/unreliable and are removed before fitting. Defaults to 60 seconds. rough_upper_bound: int Number of seconds to use as a rough upper bound filter, on-scene times above this value are considered unrealistic/unreliable and are removed before fitting. Defaults to 24*60*60 seconds (24 hours). Returns ------- A dictionary like {'incident type' -> {'vehicle type' -> 'scipy.stats.gamma object'}}. """ # filter vehicles data = data[np.in1d(data["voertuig_groep"], vehicles)].copy() # set arrival time of NaN values to the start of incident (conservative alternative) nan_arrival = data["inzet_terplaatse_datumtijd"].isnull() data.loc[nan_arrival, "inzet_terplaatse_datumtijd"] = data.loc[nan_arrival, "inzet_start_inzet_datumtijd"].copy() # calculate on-scene times data["inzet_duration"] = \ (pd.to_datetime(data["inzet_eind_inzet_datumtijd"], dayfirst=True) - pd.to_datetime(data["inzet_terplaatse_datumtijd"], dayfirst=True)).dt.seconds # filter out unrealistically small and large values data = data[(data["inzet_duration"] > rough_lower_bound) & (data["inzet_duration"] < rough_upper_bound)].copy() # fit variables per incident type (use backup rv if not enough samples) overall_backup_rv = fit_gamma_rv(data["inzet_duration"], floc=np.min(data["inzet_duration"])-1, scale=100) types = data["dim_incident_incident_type"].unique() rv_dict = {} for type_ in types: # backup random variable per station df_type = data[data["dim_incident_incident_type"] == type_].copy() if sample_size_sufficient(df_type["inzet_duration"], max_mean_range=20*60, max_std_range=20*60): type_backup_rv = fit_gamma_rv(df_type["inzet_duration"], floc=np.min(df_type["inzet_duration"]) - 1, scale=100) else: type_backup_rv = copy.deepcopy(overall_backup_rv) # create dict with entry for every type with vehicle-specific data type_rv_dict = {} for v in vehicles: X = df_type[df_type["voertuig_groep"] == v]["inzet_duration"] if sample_size_sufficient(X, max_mean_range=20*60, max_std_range=20*60): type_rv_dict[v] = fit_gamma_rv(X, floc=np.min(X) - 1, scale=100) else: type_rv_dict[v] = copy.deepcopy(type_backup_rv) rv_dict[type_] = type_rv_dict.copy() return rv_dict
[docs]def get_coordinates_locations_stations(data, location_col="hub_vak_bk"): """ Obtain the coordinates of the demand locations and stations. Parameters ---------- data: pd.DataFrame Merged and preprocessed data (result from 'prepare_data_for_response_time_analysis'). location_col: str, column name of data The column to use as identifier for the (demand) location. custom_station_locations: array-like of strings Identifiers of the locations (in data[location_col]) of the stations in case the custom station locations should be used. If provided, does not use the stations in the data. Notes ----- Assumes data has the following columns for coordinates: 'incident_longitude', 'incident_latitude', 'station_longitude', 'station_latitude'. Data must be in the desired coordinate system already. Returns ------- Two dictionaries. The first contains demand location coordinates: {'location id' -> (longitude, latitude)} The second holds the coordinates of the stations: {'station name' -> (longitude, latitude)} In case of custom station locations, the station name is replaced with an arbitraty identifier. """ location_coords = (data.groupby(location_col).apply( lambda x: tuple([x["incident_longitude"].mean(), x["incident_latitude"].mean()])) .to_dict()) station_coords = (data.groupby("inzet_kazerne_groep") .apply(lambda x: tuple([x["station_longitude"].iloc[0], x["station_latitude"].iloc[0]])) .to_dict()) return location_coords, station_coords
[docs]def fit_big_incident_duration(big_incidents): """Fit a Gamma random variable on the duration of big incidents. Parameters ---------- big_incidents: pd.DataFrame The incident data, filtered to only big incidents / output of `fdsim.incidentfitting.get_big_incident_data`. Returns ------- duration_rv: scipy.stats.gamma frozen distribution A random variable describing the distribution of big incident durations. """ big_incidents["duration"] = (pd.to_datetime(big_incidents["dim_incident_eind_datumtijd"], dayfirst=True) - pd.to_datetime(big_incidents["dim_incident_start_datumtijd"], dayfirst=True) ).dt.seconds / 60 big_incidents = big_incidents[~big_incidents["duration"].isnull()] rv = fit_gamma_rv(big_incidents["duration"], floc=np.min(big_incidents["duration"]) - 1, scale=100) return rv