Source code for incidentfitting

import numpy as np
import pandas as pd


[docs]def prepare_incidents_for_spatial_analysis(incidents): """Perform initial preprocessing tasks before fitting parameters and obtaining probabilities from the incident data. Parameters ---------- incidents: pd.DataFrame The incident data to prepare. Notes ----- Some tasks to perform before fitting: 1. Remove NaNs in location and building function 2. Cast or load location column as int->string 3. remove incidents outside AA 4. ... Returns ------- The prepared DataFrame. """ # 1. remove NaNs in location and in building function data = incidents[~incidents["hub_vak_bk"].isnull()].copy() data = data[~data["inc_dim_object_functie"].isnull()].copy() # 2. cast 'vak' to string data["hub_vak_bk"] = data["hub_vak_bk"].astype(int).astype(str) # 3. only keep those in Amsterdam-Amstelland data = data[data["hub_vak_bk"].str[0:2] == "13"].copy() return data
[docs]def get_prio_probabilities_per_type(incidents): """Create dictionary with the probabilities of having priority 1, 2, and 3 for every incident type. Parameters ---------- incidents: pd.DataFrame Contains the log of incidents from which the probabilities should be obtained. Returns ------- Dictionary with incident type names as keys and lists of length 3 as elements, where probabilities of prio 1, 2, 3 are in position 0, 1, 2 respectively. """ # filter out null values and prio 5 incidents = incidents[~incidents["dim_prioriteit_prio"].isnull()] incidents = incidents[incidents["dim_prioriteit_prio"] != 5] grouped = incidents.groupby(["dim_incident_incident_type", "dim_prioriteit_prio"]) prio_per_type = grouped["dim_incident_id"].count().reset_index() prio_per_type["prio_probability"] = prio_per_type \ .groupby(["dim_incident_incident_type"])["dim_incident_id"] \ .apply(lambda x: x / x.sum()) prio_probabilities = pd.pivot_table(prio_per_type, columns="dim_incident_incident_type", values="prio_probability", index="dim_prioriteit_prio").fillna(0) return {col: list(prio_probabilities[col]) for col in prio_probabilities.columns}
[docs]def get_vehicle_requirements_probabilities(incidents, deployments, vehicles): """Calculate the probabilities of needing a number of vehicles of a specific type for a specified incident type. Parameters ---------- incidents: pd.DataFrame, The log of incidetns to extract probabilities from. deployments: pd.DataFrame, The log of deployments to extract probabilities from. vehicles: list The vehicle types to take into account. Returns ------- Nested dictionary like {"incident type": {"vehicles": prob}}. """ deployments = deployments[np.isin(deployments["voertuig_groep"], vehicles)] # add incident type to the deployment data deployments = deployments.merge( incidents[["dim_incident_id", "dim_incident_incident_type"]], left_on="hub_incident_id", right_on="dim_incident_id", how="left") # filter out missing values and create tuples of needed vehicle types deployments = deployments[~deployments["voertuig_groep"].isnull()] grouped = deployments.groupby(["dim_incident_id", "dim_incident_incident_type"]) \ .apply(lambda x: tuple(x["voertuig_groep"].sort_values())) \ .reset_index() # count occurences of every combination of vehicles per type counted = grouped.groupby(["dim_incident_incident_type", 0])["dim_incident_id"] \ .count() \ .reset_index() counted["prob"] = counted.groupby("dim_incident_incident_type")["dim_incident_id"] \ .transform(lambda x: x / x.sum()) # create dictionary and return counted_dict = counted.groupby("dim_incident_incident_type").apply( lambda x: x.set_index(0)["prob"].to_dict()) return counted_dict
[docs]def get_spatial_distribution_per_type(incidents, location_col="hub_vak_bk", locations=None): """Obtain the distribution over demand locations for every incident type. Parameters ---------- incidents: pd.DataFrame The log of incidents to obtain probabilities from. location_col: str, default='hub_vak_bk' The column in 'incidents' to use as identifier for demand location. locations: list(str), default=None The locations that should be present in the result. If None, only incorporates the locations that have had incidents in the past for the concerning incident type. Returns ------- Dictionary like `{"type": {"location": probability}}`. """ # filter out missing values and other irrelevant observations incidents = prepare_incidents_for_spatial_analysis(incidents) # group and count grouped = incidents.groupby(["dim_incident_incident_type", location_col]) counted = grouped["dim_incident_id"].count() # make sure all locations are present if they are provided if locations is not None: counted = counted.reindex(pd.MultiIndex.from_product([incidents['dim_incident_incident_type'].unique(), locations], names=['dim_incident_incident_type', location_col]), fill_value=0) # calculate probabilties counted = counted.reset_index() counted["prob"] = counted.groupby("dim_incident_incident_type")["dim_incident_id"] \ .transform(lambda x: x / x.sum()) # return as a dictionary counted_dict = counted.groupby("dim_incident_incident_type") \ .apply(lambda x: x.set_index(location_col)["prob"].to_dict()) \ .to_dict() return counted_dict
[docs]def get_building_function_probabilities(incidents, location_col="hub_vak_bk", locations=None): """Find the distribution of building functions per demand location. Parameters ---------- incidents: pd.DataFrame The log of incidents to obtain building function distributions from. location_col: str The column name in 'incidents' that identifies the demand location. Returns ------- A nested dictionary like: `{'location id' -> {'incident type' -> {'building function' -> probability}}}`. """ incidents = prepare_incidents_for_spatial_analysis(incidents) grouped = incidents.groupby([location_col, "dim_incident_incident_type", "inc_dim_object_functie"])["dim_incident_id"] \ .count().reset_index() grouped["prob"] = ( grouped.groupby([location_col, "dim_incident_incident_type"])["dim_incident_id"] .transform(lambda x: x / x.sum()) ) partial_dict = grouped.groupby([location_col, "dim_incident_incident_type"]).apply( lambda x: x.set_index("inc_dim_object_functie")["prob"].to_dict()) \ .reset_index() building_dict = partial_dict.groupby(location_col) \ .apply(lambda x: x.set_index("dim_incident_incident_type")[0].to_dict()) \ .to_dict() # add the locations that are not in the incident data. if locations is not None: # get overall distribution overall_dist = get_overall_building_dist(incidents, location_col=location_col) # add to locations that are not in the building_dict locs_to_add = list(set(locations) - set(building_dict.keys())) for loc in locs_to_add: building_dict[loc] = overall_dist return building_dict
[docs]def get_overall_building_dist(incidents, location_col="hub_vak_bk"): """Get aggregated building function distribution for a list of locations.""" overall_dist = (incidents.groupby(['dim_incident_incident_type', 'inc_dim_object_functie']) .count() .reset_index()) overall_dist['prob'] = (overall_dist.groupby('dim_incident_incident_type')['dim_incident_id'] .transform(lambda x: x / x.sum())) overall_dist = (overall_dist.groupby("dim_incident_incident_type") .apply(lambda x: x.set_index('inc_dim_object_functie')['prob'].to_dict()) .to_dict()) return overall_dist
[docs]def get_big_incident_type_dist(big_incidents, types=None): """Get the distribution of big incidents over incident types. Parameters ---------- big_incidents: pd.DataFrame The incident data, filtered to only big incidents / output of `get_big_incident_data`. types: list of strings, default=None The incident types to use. If None, use all. """ # count incidents for each type by_type = (big_incidents.groupby("dim_incident_incident_type")["dim_incident_id"] .count() .reset_index() .rename(columns={"dim_incident_id": "number of incidents", "dim_incident_incident_type": "incident type"})) # sort descendingly by_type = by_type.sort_values("number of incidents", ascending=False) # get the distribution by_type["probability"] = by_type["number of incidents"] / by_type["number of incidents"].sum() # filter types if types is not None: by_type = by_type[np.in1d(by_type["incident type"], types)] return {"types": by_type["incident type"].values, "probabilities": by_type["probability"].values}
[docs]def get_big_incident_arrival_dist(big_incidents): """Get distributions of big incidents over months, days of the week, and hours. Parameters ---------- big_incidents: pd.DataFrame The incident data, filtered to only big incidents / output of `get_big_incident_data`. """ def groupby_count(data, groupby, count_col="dim_incident_id", rename="number of incidents"): return data.groupby(groupby)[count_col].count().reset_index().rename(columns={count_col: rename}) # add time attributes big_incidents["hour"] = big_incidents["dim_incident_start_datumtijd"].dt.hour big_incidents["weekday"] = big_incidents["dim_incident_start_datumtijd"].dt.weekday big_incidents["month"] = big_incidents["dim_incident_start_datumtijd"].dt.month # count by_hour = groupby_count(big_incidents, "hour") by_day = groupby_count(big_incidents, "weekday") by_month = groupby_count(big_incidents, "month") # get distributions by_hour["probability"] = by_hour["number of incidents"] / by_hour["number of incidents"].sum() by_day["probability"] = by_day["number of incidents"] / by_day["number of incidents"].sum() by_month["probability"] = by_month["number of incidents"] / by_month["number of incidents"].sum() return {"hour": by_hour["probability"].values, "day": by_day["probability"].values, "month": by_month["probability"].values}
[docs]def get_big_incident_ids(deployments, min_ts=3): """Find incidents that are have at least a specified number of TS deployments. Parameters ---------- deployments: pd.DataFrame The deployment data. min_ts: int, default=3 The minimum number of TS deployments for an incident to be classified as big. Returns ------- ids: list A list of incident IDs that had at least min_ts TS deployments. """ deployments = deployments[deployments["voertuig_groep"] == "TS"] num_ts = deployments.groupby("hub_incident_id").size() return num_ts[num_ts >= min_ts].index.tolist()
[docs]def infer_types(data): """Infer incident types from an incident log. Parameters ---------- data: pd.DataFrame The incident data. Must contain the 'dim_incident_incident_type' column. Returns ------- types: list of strings The incident types found in the data. Notes ----- Excludes 'NVT' and 'nan' from the resulting list. """ types = [t for t in incidents["dim_incident_incident_type"].unique() if t not in ["NVT", "nan"]] return types
[docs]def get_big_incident_data(incidents, deployments, types=None, vehicles=["TS"], min_ts=3): """Filter incident and deployment data to those instances relating to a 'big' incident. Parameters ---------- incidents: pd.DataFrame The incident data. deployments: pd.DataFrame The deployment data. types: list of strings, default=None The incident types to include, if None, use all in the data. vehicles: list of strings, default=["TS"] The vehicle types to take into account. Deployments of all other vehicle types will be dropped. min_ts: int, default=3 The minimum number of TS deployments for an incident to be included in the result. Returns ------- big_incidents, big_deployments: pd.DataFrame The filtered incident and deployment data (as a tuple). """ # some basic preparations incidents = prepare_incidents_for_spatial_analysis(incidents.copy()) # filter incident types if types is None: types = infer_types(incidents) incidents = incidents[np.in1d(incidents["dim_incident_incident_type"], types)] # filter vehicle types if isinstance(vehicles, str): deployments = deployments[deployments["voertuig_groep"] == vehicles].copy() else: deployments = deployments[np.in1d(deployments["voertuig_groep"], vehicles)] # filter big incidents big_ids = get_big_incident_ids(deployments, min_ts=min_ts) big_incidents = incidents.set_index("dim_incident_id").loc[big_ids, :].reset_index() big_deployments = deployments[np.in1d(deployments["hub_incident_id"], big_ids)] return big_incidents, big_deployments