"""
:code:`fdsim` provides an easy and flexible experimentation interface that takes care of
using the appropriate statistical tests and number of simulation runs, so that you don't
have to.
There are three usable experiment setups available: :code:`ABTest` to compare two alternative
scenarios, :code:`MultiScenarioExperiment` to compare more than two alternatives, and the
:code:`MultiFactorExperiment` for experiments with multiple independent decision variables.
All you have to do is choose the right experiment for your question, set up the corresponding
experiment class and run it.
Setting up the experiments differs somewhat between the three cases, because they are of
different complexity.
"""
import os
import pandas as pd
import numpy as np
import itertools
import copy
import pickle
from abc import abstractmethod, ABCMeta
import statsmodels.api as sm
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt
from math import gcd
from functools import reduce
from fdsim.helpers import progress, quick_load_simulator
import fdsim
[docs]class BaseExperiment():
"""Base class for experimentation with fdsim. Not useful to instantiate on its own.
Parameters
----------
forced_runs: int or None, optional, default=None
A fixed number of simulation runs to use for the entire experiment (so this number
will be divided over the number of scenarios.
max_runs: int, optional, default=1000
The maximum number of runs to use in the experiment.
effect_size: float, optional, default=0.5
The effect size, normalized by the standard deviation, that you want to be able to
detect in the experiment. The lower the value, the more runs will be used (up to
:code:`max_runs` and provided that :code:`forced_runs=None`).
alpha: float, optional, default=0.05
The allowed probability of falsely discovering a significant effect. One minus the
significance level. Using :math:`\\alpha=0.05` is conventional in statistical
experiments.
power: float, optional, default=0.8
The statistical power, i.e., the probability of finding a significant effect if it
exists. One minus the power is the probability of a Type II error (false negatives).
name: str, optional, default=None
The name of the experiment. Will be used when printing results.
description: str, optional, default=None
Description of the experiment. Will be used when printing results.
to_minutes: bool, optional, default=True
Whether to translate the results to minutes (mm:ss) or to leave it in full seconds.
with_std: bool, optional, default=True
Whether to include the standard deviation of a metric among several runs with the same
settings in the results tables.
pvalue_zero_threshold: float, optional, default=1e-8
From which value to print p-values as zeros in results tables.
verbose: bool, optional, default=True
Whether to print progress during experiment runs.
"""
__metaclass__ = ABCMeta
not_to_minutes = ["count", "prop_missing"]
def __init__(self, forced_runs=None, max_runs=1000, effect_size=0.5, alpha=0.05, power=0.8,
name=None, description=None, to_minutes=True, with_std=True, pvalue_zero_threshold=1e-8, verbose=True):
self.name = name
self.description = description
self.forced_runs = forced_runs
self.max_runs = max_runs
self.effect_size = effect_size
self.alpha = alpha
self.power = power
self.to_minutes = to_minutes
self.with_std = with_std
self.pvalue_zero_threshold = pvalue_zero_threshold
self.verbose = verbose
[docs] @abstractmethod
def run(self, n_runs=None):
"""Run the specified experiment."""
[docs] @abstractmethod
def analyze(self):
"""Analyze the simulation output by performing statistical tests."""
[docs] @abstractmethod
def save_simulation_logs(self, dirpath=".", prefix="experiment1"):
"""Save the raw simulation logs to disk."""
[docs] @abstractmethod
def print_results(self):
"""Print the results of the last experiment."""
@abstractmethod
def _determine_sample_size(self):
"""Determine the required number of observations per variant to ensure
sufficient statistical power."""
def set_evaluator(self, evaluator):
assert isinstance(evaluator, fdsim.evaluation.Evaluator), "Expected an instance" \
" of type fdsim.evaluation.Evaluator. Got {}.".format(type(evaluator))
self.evaluator = evaluator
@staticmethod
def _reorganize_results_by_metric(results_dict):
"""Reorganize the data by measure and add scenario as column."""
metric_data = {}
for scenario, scenario_results in results_dict.items():
for metric_set, results in scenario_results.items():
results["scenario"] = scenario
if metric_set in metric_data.keys(): # concat on previous data
metric_data[metric_set] = metric_data[metric_set].append(results, ignore_index=True)
else: # create new entry in dictionary
metric_data[metric_set] = results
return metric_data
def _lcm_two_numbers(self, x, y):
"""This function takes two integers and returns the Least Common Multiple."""
lcm = (x * y) // gcd(x, y)
return lcm
def _lcm_n_numbers(self, *values):
"""Takes arbitrary number of integers and returns the Least Common Multiple."""
return reduce(self._lcm_two_numbers, values)
def _find_lcm_in_range(self, values, minimum, maximum):
"""Find the least common multiple (LCM) of an arbitrary number of integers
within a given range."""
lcm = self._lcm_n_numbers(*values)
x = lcm
while x <= maximum:
if (x >= minimum) & (x <= maximum):
return x
x += lcm
return None
@staticmethod
def _find_closest_higher_multiple(x, multiple, maximum=None):
while True:
if x % multiple == 0:
break
x += 1
while x > maximum:
x -= multiple
return x
def _solve_power(self, k):
n = sm.stats.FTestAnovaPower().solve_power(
effect_size=self.effect_size,
alpha=self.alpha,
power=self.power,
k_groups=k,
nobs=None
)
return n
def _get_final_n_runs(self, n, k_groups=2, find_lcm=True):
"""make sure number of runs does not exceed maximum and is an integer."""
n = int(np.ceil(n))
# calculate least common multiple if specified and multiple group sizes given
if (not isinstance(k_groups, (int, float))) and (find_lcm is True):
common_multiple = self._find_lcm_in_range(k_groups, n, self.max_runs)
if common_multiple is not None:
progress("Set number of runs to common multiple of group sizes: {}."
.format(common_multiple))
return common_multiple
else:
final_n = int(np.minimum(n, self.max_runs))
progress("No common multiple exists in the given range. Ignoring common "
"multiples. Using minimum of desired n and maximum: {}."
.format(final_n))
elif (isinstance(k_groups, (int, float))) and (find_lcm is True):
final_n = self._find_closest_higher_multiple(n, k_groups, maximum=self.max_runs)
progress("Set number of runs to multiple of number of groups: {}."
" Number of groups: {}.".format(final_n, k_groups))
# print what just happened
elif n > self.max_runs:
final_n = self.max_runs
progress("Statistically desired number of runs larger than maximum ({} < {})."
" Using the maximum.".format(final_n, n))
else:
final_n = n
progress("Desired total number of runs is {}.".format(final_n))
return final_n
[docs] @staticmethod
def anova_table(aov):
"""Add effect sizes $\\eta^2 and \\omega^2$$ to an ANOVA table from statsmodels."""
aov['mean_sq'] = aov[:]['sum_sq']/aov[:]['df']
aov['eta_sq'] = aov[:-1]['sum_sq']/sum(aov['sum_sq'])
aov['omega_sq'] = ((aov[:-1]['sum_sq'] - (aov[:-1]['df'] * aov['mean_sq'][-1])) /
(sum(aov['sum_sq']) + aov['mean_sq'][-1]))
cols = ['sum_sq', 'df', 'mean_sq', 'F', 'PR(>F)', 'eta_sq', 'omega_sq']
aov = aov[cols]
aov.columns = ['Sum of Squares', 'Degrees of Freedom', 'Mean Sum of Squares',
'F-statistic', 'P-value', 'Effect (eta^2)', 'Effect (omega^2)']
return aov
[docs]class ABTest(BaseExperiment):
"""Class that performs A/B test to evaluate two scenarios against each other (usually
one is the current situation (A) and one is a considered alternative (B)).
Parameters
----------
sim_A, sim_B: fdsim.simulation.Simulator, optional, default=None
The simulators for scenario A and B. If None, these must be provided later, using
the :code:`set_scenario_A` and :code:`set_scenario_B` methods.
evaluator: fdsim.evaluation.Evaluator, optional, default=None
The :code:`Evaluator` object that should be used to analyze the simulation
runs. This class specifies which metrics will be compared between the two
scenarios.
**kwargs: key-value pairs
Any parameters that should be passed to the BaseSimulator (see :code:`BaseExperiment`)
for the available parameters.
"""
def __init__(self, sim_A=None, sim_B=None, evaluator=None, **kwargs):
self.simulator_A = sim_A
self.simulator_B = sim_B
self.evaluator = evaluator
self.ready = False
self._check_readiness()
super().__init__(**kwargs)
def set_scenario_A(self, simulator, description=None):
assert isinstance(simulator, fdsim.simulation.Simulator), \
"Expected an instance of fdsim.simulation.Simulator as input."
self.simulator_A = simulator
self.description_A = description
self._check_readiness()
def set_scenario_B(self, simulator, description=None):
assert isinstance(simulator, fdsim.simulation.Simulator), \
"Expected an instance of fdsim.simulation.Simulator as input."
self.simulator_B = simulator
self.description_B = description
self._check_readiness()
def set_evaluator(self, evaluator):
assert isinstance(evaluator, fdsim.evaluation.Evaluator), \
"Expected an instance of fdsim.evaluation.Evaluator as input."
self.evaluator = evaluator
[docs] def run(self):
if not self.ready:
raise ValueError("Simulators A and B and an evaluator must be provided before"
" running.")
self.n_runs = self.forced_runs or self._determine_sample_size()
progress("Running scenario A for {} runs.".format(self.n_runs / 2))
self.simulator_A.simulate_period(n=self.n_runs / 2)
progress("Running scenario B for {} runs.".format(self.n_runs / 2))
self.simulator_B.simulate_period(n=self.n_runs / 2)
progress("Simulation completed. Evaluating results.")
results_A = self.evaluator.evaluate(self.simulator_A.results)
results_B = self.evaluator.evaluate(self.simulator_B.results)
self.test_results = self.analyze(results_A, results_B, to_minutes=self.to_minutes,
with_std=self.with_std)
progress("Statistical tests performed and results obtained.")
[docs] def analyze(self, results_A, results_B, to_minutes=True, with_std=True):
"""Analyze the output of the evaluator on the two simulation logs."""
def seconds_to_minutes(*args):
return ["{}{:02.0f}:{:02.0f}"
.format("-" if x < 0 else "", abs(x) // 60, abs(x) % 60)
for x in args]
def round(value, measure, descriptor):
if (measure == "on_time") and (descriptor != "count"):
return np.round(value, 4)
elif descriptor == "prop_missing":
return np.round(value, 5)
else:
return int(np.round(value))
metric_set_names = list(results_A.keys())
test_results = {}
for set_name in metric_set_names:
dfA = results_A[set_name]
dfB = results_B[set_name]
test_results[set_name] = {}
set_measure = self.evaluator.metric_set_measures[set_name]
if set_measure == "on_time":
digits = 4
else:
digits = 0
for col in dfA.columns:
tstat, pvalue, degrees = sm.stats.ttest_ind(dfA[col], dfB[col])
mean_A, mean_B = round(dfA[col].mean(), set_measure, col), round(dfB[col].mean(), set_measure, col)
# add standard deviation to the estimates
if with_std:
std_A, std_B = round(dfA[col].std(), set_measure, col), round(dfB[col].std(), set_measure, col)
if to_minutes and (col not in self.not_to_minutes) and (set_measure is not "on_time"):
estimate_A, estimate_B, std_A, std_B, effect = \
seconds_to_minutes(mean_A, mean_B, std_A, std_B, (mean_B - mean_A))
else:
estimate_A, estimate_B = mean_A, mean_B
effect = estimate_B - estimate_A
# format as "mean (std)"
estimate_A = "{} ({})".format(estimate_A or mean_A, std_A)
estimate_B = "{} ({})".format(estimate_B or mean_B, std_B)
# no standard deviation, but do change to minutes:seconds formatting
elif to_minutes and (col not in self.not_to_minutes) and (set_measure is not "on_time"):
estimate_A, estimate_B, effect = seconds_to_minutes(mean_A, mean_B, (mean_B - mean_A))
# use mean in seconds only
else:
estimate_A, estimate_B, effect = mean_A, mean_B, mean_B - mean_A
test_results[set_name][col] = {"A estimate": estimate_A,
"B estimate": estimate_B,
"effect": effect,
"t-statistic": tstat,
"p-value": pvalue if pvalue > self.pvalue_zero_threshold else 0,
"significant": pvalue < self.alpha / 2,
"df": degrees}
return test_results
[docs] def save_simulation_logs(self, dirpath=".", prefix="experiment1"):
"""Write simulation logs to disk.
Parameters
----------
dirpath: str
Path to directory to save the result in.
prefix: str
String to prepend to '_results_A' and 'results_B' for the two logs.
"""
self.simulator_A.results.to_csv(os.path.join(dirpath, prefix + "_results_A.csv"))
self.simulator_B.results.to_csv(os.path.join(dirpath, prefix + "_results_B.csv"))
[docs] def print_results(self, to_latex=False, plot=True):
"""Print the results of the experiments to stdout.
Parameters
----------
to_latex: bool, optional, default=True
Whether to print tables as LaTeX Tabular environments.
plot: bool, optional, default=True
Whether to print distribution plots as well.
"""
if self.test_results is None:
progress("Nothing to print.")
else:
if self.name is None:
additional = ""
else:
additional = ": " + self.name
print("------------------\nA/B Test Results{}\n------------------"
.format(additional))
if self.description is not None:
print(self.description)
if (self.description_A is not None) and (self.description_B is not None):
print("Scenario A: {}".format(self.description_A))
print("Scenario B: {}".format(self.description_B))
print("------------------", end="\n")
print("Runs: {} ({} per scenario)".format(self.n_runs, self.n_runs / 2))
print("------------------", end="\n\n")
for name, f in self.test_results.items():
print("Measure: '{}'\n{}"
.format(name, self.evaluator.metric_sets[name]["description"]))
print("------------------------------------------------------------")
if to_latex:
print(pd.DataFrame(f).T.to_latex())
else:
print(pd.DataFrame(f).T)
print("------------------------------------------------------------", end="\n\n")
if plot and (self.evaluator.metric_set_measures[name] in ["response_time", "delay"]):
fig = self.evaluator.plot(name, self.simulator_A.results, self.simulator_B.results,
labels=["Scenario A", "Scenario B"], return_fig=True)
plt.plot()
[docs] def plot_distributions(self, metric_set_name, return_fig=True):
"""Plot figures showing the probability of exceeding any given value for a measure.
Parameters
----------
metric_set_name: str
The name of the metric set in the :code:`Evaluator` object that you want to plot.
return_fig: bool, optional, default=True
Whether to return the figure or just print it.
"""
fig = self.evaluator.plot(metric_set_name, self.simulator_A.results, self.simulator_B.results,
labels=["Scenario A", "Scenario B"], return_fig=True)
if return_fig:
return fig
else:
plt.show()
def _check_readiness(self):
if ((self.simulator_A is not None) and (self.simulator_B is not None) and
(self.evaluator is not None)):
self.ready = True
def _determine_sample_size(self):
"""Determine the required number of observations given the power, confidence, and
effect size."""
n = self._solve_power(2)
return self._get_final_n_runs(n*2, find_lcm=True)
[docs]class MultiScenarioExperiment(BaseExperiment):
"""Class that is used to evaluate more than two alternative scenarios against
each other.
Scenarios can be added one by one using the :code:`add_scenario` method. Since this method
takes initialized and configured :code:`fdsim.simulation.Simulator`s as input, no
assumptions are made on the setups of the scenarios. Each is treated as an alternative
situation that is compared against all other situations/scenarios.
Parameters
----------
evaluator: fdsim.evaluation.Evaluator, optional, default=None
The :code:`Evaluator` object that should be used to analyze the simulation
runs. This class specifies which metrics will be compared between the two
scenarios.
**kwargs: key-value pairs
Any parameters that should be passed to the BaseSimulator (see :code:`BaseExperiment`)
for the available parameters.
"""
def __init__(self, evaluator=None, **kwargs):
super().__init__(**kwargs)
self.evaluator = evaluator
self.scenarios = {}
self.descriptions = {}
[docs] def add_scenario(self, simulator, name, description=None):
"""Add an alternative scenario to the possible options.
Parameters
----------
simulator: fdsim.simulation.Simulator
The simulator that is configured to simulate the desired scenario.
name: str
The name for this scenario. It is used to print and distinguish between
scenarios later on and is therefore required.
description: str, optional, default=None
An optional description of this scenario that may provide additional context
to the user.
"""
assert isinstance(name, str), "'name' must be a string. Got {}.".format(type(name))
assert isinstance(simulator, fdsim.simulation.Simulator), ("'simulator' must be an "
"instance of fdsim.simulation.Simulator. Got {}".format(type(simulator)))
assert name not in self.scenarios.keys(), ("Scenario {} already exists"
.format(name))
self.scenarios[name] = simulator
if description is not None:
self.descriptions[name] = description
def _determine_sample_size(self):
"""Determine the required number of observations given the power, confidence, and
effect size."""
# solve power equation for number of observations (nobs)
total_n = self._solve_power(len(self.scenarios))
return self._get_final_n_runs(total_n, k_groups=len(self.scenarios), find_lcm=True)
[docs] def run(self):
"""Run the experiment."""
# determine number of runs per scenario
self.n_runs = self.forced_runs or self._determine_sample_size()
self.scenario_runs = int(np.ceil(self.n_runs / len(self.scenarios)))
# run scenarios
results_dict = {}
for scenario, sim in self.scenarios.items():
progress("Running scenario {} for {} runs.".format(scenario, self.scenario_runs))
sim.simulate_period(n=self.scenario_runs)
progress("Computing performance metrics.")
results_dict[scenario] = self.evaluator.evaluate(sim.results)
progress("Simulation of scenario {} completed.".format(scenario))
progress("Simulation completed. Conducting statistical analysis on results.")
self.test_results = self.analyze(results_dict)
progress("Statistical tests performed and results obtained.")
[docs] def analyze(self, results_dict):
"""Analyze the results using one-way ANOVA.
Parameters
----------
results_dict: dict
Dictionary where keys are scenario names and values are outputs of
:code:`Evaluator.evaluate`.
"""
# reorganize the data by measure and add scenario as column
metric_data = self._reorganize_results_by_metric(results_dict)
# save for post-hoc analysis
self.metric_data = metric_data
# analyze results per metric using one-way ANOVA
anova_results = {}
for measure, data in metric_data.items():
tables = []
for ycol in [c for c in data.columns if c != "scenario"]:
# perform ANOVA
ols_results = smf.ols("Q('{}') ~ C(scenario)".format(ycol), data=data).fit()
table = self.anova_table(sm.stats.anova_lm(ols_results, typ=2))
table.index.rename("predictor", inplace=True)
table["target"] = ycol
table = table.reset_index(drop=False)
tables.append(table)
measure_table = pd.concat(tables, axis=0)
measure_table["predictor"].loc[measure_table["predictor"] == "C(scenario)"] = "scenario"
measure_table.set_index(["target", "predictor"], inplace=True)
measure_table["significant"] = measure_table["P-value"] < self.alpha / 2
anova_results[measure] = measure_table
significant_variable_dict = self.get_significant_results(anova_results)
tukey_results = self.perform_tukey(metric_data, significant_variable_dict)
self.anova_results = anova_results
self.tukey_results = tukey_results
return anova_results, tukey_results
[docs] def get_significant_results(self, anova_results):
"""Find the dependent variables for which the scenario had a significant impact
based on the ANOVA results."""
target_dict = {}
for measure, df in anova_results.items():
df2 = df.reset_index()
df2 = df2[(df2["significant"] == True) & (df2["predictor"] == "scenario")]
targets = df2["target"].unique()
target_dict[measure] = targets
return target_dict
[docs] def save_simulation_logs(self, dirpath=".", prefix="experiment1"):
"""Write simulation logs to disk.
Parameters
----------
dirpath: str
Path to directory to save the result in.
prefix: str
String to prepend to '_results_A' and 'results_B' for the two logs.
"""
for name, sim in self.scenarios.items():
sim.results.to_csv(os.path.join(dirpath, prefix + "_scenario_{}.csv".format(name)))
[docs] def print_results(self, to_latex=False, plot=True):
"""Print the results of the ANOVA and Tukey analysis to give a quick overview of the
results.
Parameters
----------
to_latex: bool, optional, default=True
Whether to print tables as LaTeX Tabular environments.
plot: bool, optional, default=True
Whether to print distribution plots as well.
"""
if (self.anova_results is None) and (self.tukey_results is None):
progress("Nothing to print.")
else:
if self.name is None:
additional = ""
else:
additional = ": " + self.name
print("------------------\nMultiple Scenarios Test Results{}\n------------------"
.format(additional))
if len(self.descriptions) > 0:
for name, description in self.descriptions.items():
print("Scenario '{}': {}".format(name, description))
print("------------------", end="\n\n")
print("Runs: {} ({} per scenario)".format(self.n_runs, self.scenario_runs))
print("------------------", end="\n\n")
for measure, aov_table in self.anova_results.items():
print("\nMeasure: '{}'\n{}"
.format(measure, self.evaluator.metric_sets[measure]["description"]),
end="\n\n")
print("Analysis of Variance (ANOVA)")
print("------------------------------------------------------------")
if to_latex:
print(aov_table.to_latex())
else:
print(aov_table)
print("------------------------------------------------------------", end="\n\n")
print("Tukey HSD post-hoc analysis: pairwise comparison of relevant groups")
print("------------------------------------------------------------")
if to_latex:
print(self.tukey_results[measure].to_latex)
else:
print(self.tukey_results[measure])
print("------------------------------------------------------------", end="\n\n")
if plot and (self.evaluator.metric_set_measures[measure] in ["response_time", "delay"]):
print("Probabilities of exceeding various response times")
self.evaluator.plot(measure, *[sim.results for sim in self.scenarios.values()],\
labels=[scenario for scenario in self.scenarios.keys()])
[docs] def plot_distributions(self, metric_set_name, return_fig=True):
"""Plot figures showing the probability of exceeding any given value for a measure.
Parameters
----------
metric_set_name: str
The name of the metric set in the :code:`Evaluator` object that you want to plot.
return_fig: bool, optional, default=True
Whether to return the figure or just print it.
"""
fig = self.evaluator.plot(metric_set_name, *[sim.results for sim in self.scenarios.values()],
labels=list(self.scenarios.keys()), return_fig=True)
if return_fig:
return fig
else:
plt.show()
[docs]class MultiFactorExperiment(BaseExperiment):
"""Experiment to evaluate combinations of values for multiple decision variables / factors.
This has a slightly more complex API due to the fact that different factor levels should
be applied to a :code:`Simulator` in order to define every possible setting. How this works
is explained in the :code:`add_factor` method.
Parameters
----------
base_simulator: fdsim.simulation.Simulator
The base situation to which different factor levels will be applied. Note that for
every factor in the experiment, a relevant value should be set in this base situation.
evaluator: fdsim.evaluation.Evaluator
The :code:`Evaluator` object to use to calculate performance measures and metrics.
cache_path: str
Directory to store base simulator in, so that it can be reloaded for different runs.
**kwargs: key-value pairs
Parameters passed to :code:`BaseExperiment`.
"""
all_factor_types = ["resources", "location", "station_status", "add_station",
"remove_station", "activate_backup", "remove_backup"]
def __init__(self, base_simulator, evaluator=None, cache_path="cache", **kwargs):
self.evaluator = evaluator
self.factors = {}
self.factor_types = {}
if not os.path.isdir(cache_path):
os.mkdir(cache_path)
self.cache_path = cache_path
self.base_simulator = base_simulator
self.base_simulator.save_simulator_object(os.path.join(cache_path, "base.fdsim"))
super().__init__(**kwargs)
[docs] def add_factor(self, factor_name, factor_type, levels=None):
"""Add a factor to the experiment. A factor is a variable that can take on at least
two levels (categories).
Parameters
----------
factor_name: str,
The name of the factor.
factor_type: str,
What type of factor it concerns. One of ["vehicles", "station_location",
"station_status", "add_station", "remove_station", "crew"].
levels: dict, optional (default: None),
A nested dictionary specifying the parameters for the different levels, like:
{'level_name_1' -> {'parameter 1' -> value1, 'parameter 2' -> value2, etc.},
'level_name_2' -> {'parameter 1' -> value1, 'parameter 2' -> value2, etc.}}.
If None, then it must be specified later by using self.add_factor_level().
Notes
-----
The base_simulator's settings are also considered a level. I.e., make sure that the
base case includes a relevant setting for each factor that is added.
Examples
--------
>>> # add resources factor
>>> experiment = MultiFactorExperiment(base_simulator)
>>> levels = {"extra-ts": {"station_name": "HENDRIK", "TS": 1, "TS_crew_ft": 1}}
>>> experiment.add_factor("TS-Hendrik", "resources", levels=levels)
Change the location of stations:
.. code::
>>> # add station_location factor with two alternatives to the base case
>>> levels = {"at N200": {"new_location": "13781452"}}
"other-loc": {"new_location": "13781363"}}
>>> experiment.add_factor("Osdorp loc", "location", "OSDORP", levels=levels)
Set a `status cycle` such as closing a station part of the day or having part time
crew operate it:
.. code:: python
>>> # set a station status cycle
>>> levels = {"Closed at night":
{"station_name": "VICTOR", "start": 23, "end": 7, "status": "closed"},
"Office hours":
{"station_name": "VICTOR", "start": 18, "end": 8, "status": "closed"}
}
>>> experiment.add_factor("Status Victor", "station_status", levels=levels)
.. code:: python
>>> # add a station
>>> levels = {"Bovenkerk":
{"station_name": "BOVENKERK", location": "13710002", "TS": 1, "TS_ft": 1},
"Harbor":
{"station_name": "HARBOR", location": "13710001", "TS": 1, "TS_ft": 1}
}
>>> experiment.add_factor("New station location", "add_station", levels=levels)
.. code:: python
>>> # remove a station
>>> levels = {"Hendrik": {"station_name": "HENDRIK"},
"Nico": {"station_name": "NICO"}}
>>> experiment.add_factor("Remove station", "remove_station", levels=levels)
"""
if not factor_type in self.all_factor_types:
raise ValueError("'{}' is not a valid factor type. Must be one of {}."
.format(factor_type, self.all_factor_types))
self.factor_types[factor_name] = factor_type
self.factors[factor_name] = {"base": {}}
if levels is not None:
for i, (level_name, level_params) in enumerate(levels.items()):
assert isinstance(level_params, dict), ("'levels' must be a nested dictionary"
" of at least 2 deep. Found a {} on "
"the second level."
.format(type(level_params)))
self.factors[factor_name][level_name] = level_params
[docs] def add_factor_level(self, factor_name, level_name, params):
"""Add a single level to an existing factor.
Parameters
----------
factor_name: str,
The name of the factor to add a level to.
level_name: str,
The name of the new level.
params: dict,
The parameters to pass to the underlying method of fdsim.simulation.Simulator.
"""
assert isinstance(factor_name, str), "'factor_name' must be a string."
assert isinstance(level_name, str), "'factor_name' must be a string."
assert level_name not in self.factors[factor_name].keys(), ("Level {} already exists"
" in factor {}. Please choose another name.".format(level_name, factor_name))
assert isinstance(params, dict), "'params' must be a dictionary."
self.factors[factor_name][level_name] = params
[docs] def run(self):
"""Run the experiment."""
all_levels = [[level for level in factor.keys()] for factor in self.factors.values()]
# determine sample size
self.n_runs = self.forced_runs or self._determine_sample_size()
self.n_scenarios = np.prod([len(levels) for levels in all_levels])
self.scenario_runs = int(np.ceil(self.n_runs / self.n_scenarios))
progress("Simulating {} runs in total ({} per scenario)."
.format(self.n_runs, self.scenario_runs))
results_dict = {}
scenario_dict = {}
self.scenario_logs = {}
# run full factorial experiment design
for k, level_combo in enumerate(itertools.product(*all_levels)):
progress("Setting up combination {} / {}.".format(k + 1, self.n_scenarios))
# load base simulator and apply level values
simulator = quick_load_simulator(os.path.join(self.cache_path, "base.fdsim"))
scenario = {}
for i, level_name in enumerate(level_combo):
factor_name = list(self.factors.keys())[i]
scenario[factor_name] = level_name
if level_name != "base":
simulator = self._apply_factor_level(simulator, factor_name, level_name)
else:
# progress("Using base settings for factor {}.".format(factor_name))
pass
# save experiment setup
scenario_dict[k] = scenario
progress("Factor levels: {}.".format(scenario))
# run experiment
progress("Running simulation for {} runs.".format( self.scenario_runs))
simulator.simulate_period(n=self.scenario_runs)
self.scenario_logs[k] = simulator.results.copy()
progress("Computing performance metrics.")
results_dict[k] = self.evaluator.evaluate(simulator.results)
progress("Simulation of combination {} / {} completed.".format(k + 1, self.n_scenarios))
# for debugging
self.results_dict = results_dict
self.scenario_dict = scenario_dict
# progress("results_dict and scenario_dict saved as attributes for debugging.")
progress("Simulation completed. Conducting statistical analysis on results.")
self.anova_results, self.tukey_results = self.analyze(results_dict, scenario_dict)
[docs] def analyze(self, results_dict, scenario_dict):
# reorganize the data by measure and add scenario as column
metric_data = self._reorganize_results_by_metric(results_dict)
# save for post-hoc analysis
self.metric_data = metric_data
progress("Adding used factor levels to results data.")
# add factor information to data
factor_names = list(self.factors.keys())
for measure, data in metric_data.items():
for fn in factor_names:
data[fn] = data["scenario"].apply(lambda k: scenario_dict[k][fn])
# create right hand side (rhs) for regression formula
predictors = ["C(Q('{}'))".format(v) for v in factor_names]
rhs = " + ".join(predictors)
progress("Right hand side for OLS: {}".format(rhs))
# analyze results per metric using multi-way ANOVA
anova_results = {}
for measure, data in metric_data.items():
data.drop("scenario", axis=1, inplace=True)
tables = []
for ycol in [c for c in data.columns if c not in factor_names]:
# perform ANOVA
ols_results = smf.ols("Q('{}') ~ {}".format(ycol, rhs), data=data).fit()
table = self.anova_table(sm.stats.anova_lm(ols_results, typ=2))
table.index.rename("factor", inplace=True)
table.insert(0, column="metric", value=ycol)
table = table.reset_index(drop=False)
tables.append(table)
measure_table = pd.concat(tables, axis=0)
measure_table["significant"] = measure_table["P-value"] < self.alpha / 2
measure_table["factor"].loc[measure_table["factor"] != "Residual"] = \
measure_table["factor"].loc[measure_table["factor"] != "Residual"].str[5:-3]
anova_results[measure] = measure_table
significant_variable_dict = self.get_significant_results(anova_results)
tukey_results = self.perform_tukey(metric_data, significant_variable_dict)
return anova_results, tukey_results
def _apply_factor_level(self, simulator, factor_name, level_name):
"""Apply a factor level to a Simulator.
Parameters
----------
simulator: fdsim.simulation.Simulator,
The simulator object to apply the settings to.
factor_name: str,
The name of the factor to change the level for.
level_name: str,
The name of the level to apply.
Returns
-------
simulator: fdsim.simulation.Simulator,
The simulator with adjusted settings.
"""
level_params = copy.copy(self.factors[factor_name][level_name])
station_name = level_params.pop("station_name")
if self.factor_types[factor_name] == "resources":
rs = simulator.resource_allocation.copy()
rs.set_index("kazerne", inplace=True)
cols = rs.columns
for param, value in level_params.items():
if param in cols:
rs.loc[station_name, param] = value
rs = rs.astype(int).reset_index(drop=False)
simulator.set_resource_allocation(rs)
elif self.factor_types[factor_name] == "location":
location = level_params.pop("new_location")
simulator.move_station(station_name, location, **level_params)
elif self.factor_types[factor_name] == "station_status":
start = level_params.pop("start_hour")
end = level_params.pop("end_hour")
simulator.set_daily_station_status(station_name, start, end, **level_params)
elif self.factor_types[factor_name] == "add_station":
location = level_params.pop("location")
simulator.add_station(station_name, location, **level_params)
elif self.factor_types[factor_name] == "remove_station":
simulator.remove_station(station_name)
elif self.factor_types[factor_name] == "activate_backup":
simulator.activate_backup_protocol(station_name, **level_params)
elif self.factor_types[factor_name] == "remove_backup":
simulator.remove_backup_protocol(station_name)
return simulator
[docs] def get_significant_results(self, anova_results):
"""Find the combinations of independent and dependent variables for which the scenario
had a significant impact based on the ANOVA results."""
sig_var_dict = {}
for measure, df in anova_results.items():
df2 = df.reset_index()
df2 = df2[(df2["significant"] == True) & (df2["factor"] != "Residual")]
tuples = df2.apply(lambda x: (x["factor"], x["metric"]), axis=1)
tuples = tuples.values.tolist()
sig_var_dict[measure] = tuples
return sig_var_dict
[docs] def print_results(self, to_latex=False, plot=True):
"""Print the results of the ANOVA and Tukey analysis to give a quick overview of the
results.
Parameters
----------
to_latex: bool, optional, default=True
Whether to print tables as LaTeX Tabular environments.
plot: bool, optional, default=True
Whether to print distribution plots as well.
"""
if (self.anova_results is None) and (self.tukey_results is None):
progress("Nothing to print.")
else:
if self.name is None:
additional = ""
else:
additional = ": " + self.name
print("------------------\nMultiple Factors Experiment Results{}\n------------------"
.format(additional))
if len(self.factors) > 0:
print("Factors:\n--------")
for name, levels in self.factors.items():
print("Factor '{}' with levels: {}.".format(name, list(levels.keys())))
print("------------------", end="\n\n")
for measure, aov_table in self.anova_results.items():
print("\nMeasure: '{}'\n{}"
.format(measure, self.evaluator.metric_sets[measure]["description"]),
end="\n\n")
print("Analysis of Variance (ANOVA)")
print("------------------------------------------------------------")
if to_latex:
print(aov_table.to_latex())
else:
print(aov_table)
print("------------------------------------------------------------", end="\n\n")
print("Tukey HSD post-hoc analysis: pairwise comparison of relevant groups")
print("------------------------------------------------------------")
if to_latex:
print(self.tukey_results[measure].to_latex)
else:
print(self.tukey_results[measure])
print("------------------------------------------------------------", end="\n\n")
if plot and (self.evaluator.metric_set_measures[measure] in ["response_time", "delay"]):
print("Probabilities of exceeding various response times")
self.evaluator.plot(measure, *[log for log in self.scenario_logs.values()],
labels=["Scenario {}".format(k) for k in self.scenario_logs.keys()])
[docs] def plot_distributions(self, metric_set_name, return_fig=True):
"""Plot figures showing the probability of exceeding any given value for a measure.
Parameters
----------
metric_set_name: str
The name of the metric set in the :code:`Evaluator` object that you want to plot.
return_fig: bool, optional, default=True
Whether to return the figure or just print it.
"""
fig = self.evaluator.plot(metric_set_name, *[log for log in self.scenario_logs.values()],
labels=[k for k in self.scenario_logs.keys()], return_fig=True)
if return_fig:
return fig
else:
plt.show()
def _determine_sample_size(self):
# solve equation for number of observations (nobs)
group_sizes = [len(self.factors[f].keys()) for f in self.factors.keys()]
n = self._solve_power(np.max(group_sizes))
progress("Statistically desired runs: {}".format(n))
return self._get_final_n_runs(n, k_groups=group_sizes, find_lcm=True)
[docs] def save_simulation_logs(self, dirpath=".", prefix="experiment1"):
"""Write simulation logs to disk.
Parameters
----------
dirpath: str
Path to directory to save the result in.
prefix: str
String to prepend to '_results_A' and 'results_B' for the two logs.
"""
for nr, log in self.scenario_logs.items():
log.to_csv(os.path.join(dirpath, prefix + "_scenario_{}.csv".format(nr)))
with open(os.path.join(dirpath, prefix + "_scenario_explanation.txt"), "w") as f:
print("Scenario setups:", file=f)
print("----------------\n", file=f)
for k, dict_ in self.scenario_dict.items():
print("Scenario {}".format(k), file=f)
for key, value in dict_.items():
print("- " + str(key) + ": " + str(value), file=f)
print("----", file=f)