import os
import warnings
from abc import abstractmethod, ABCMeta
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error
from fdsim.helpers import progress
[docs]class BaseIncidentPredictor(object):
"""Base class for incident predictors. Not useful to instantiate on its own."""
__metaclass__ = ABCMeta
def __init__(self, load_forecast=True, fc_dir="data/forecasts", verbose=True):
self.verbose = verbose
self.fc_dir = fc_dir
self.forecast = None
if load_forecast:
try:
self.forecast = pd.read_csv(os.path.join(self.fc_dir,
self.file_name))
self.types = [c for c in self.forecast.columns if c != "ds"]
except FileNotFoundError:
warnings.warn("No forecast found, check if 'fc_dir' specifies"
" the right directory. If you didn't mean to "
"load a forecast, initialize with "
"'load_forecast=False'. Instance intialized "
"without forecast. Create one by running "
"self.fit() and then .predict()."
"Given directory: {}.".format(self.fc_dir))
[docs] @abstractmethod
def fit(self, data):
""" Fit the model on the data. """
[docs] @abstractmethod
def predict(self, data):
""" Predict using the fitted model. """
[docs] @staticmethod
def evaluate(y_true, y_predict, metric="RMSE"):
"""Evaluate a given prediction.
Parameters
----------
y_true: array,
The ground truth values.
y_predict: array
The predicted labels / forecasted values.
metric: str, one of ['MAE', 'RMSE'], optional (default: 'RMSE'),
The evaluation metric. Uses the Root Mean Squared Error for 'RMSE' and the
Mean Absolute Error for 'MAE'.
Returns
-------
score: float,
The error score(s) of the prediction. If y is multi-output, outputs a list of
scores (one score per variable).
"""
assert len(y_true) == len(y_predict), \
"Values and predictions must have same length"
if metric == "RMSE":
return np.sqrt(mean_squared_error(y_true, y_predict, multioutput="raw_values"))
elif metric == "MAE":
return mean_absolute_error(y_true, y_predict, multioutput="raw_values")
else:
raise ValueError("{} is not a supported value for 'metric'. "
"Must be one of ['RMSE', 'MAE'].")
[docs] def create_sampling_dict(self, start_time=None, end_time=None, incident_types=None):
"""Create a dictionary that can conveniently be used for
sampling random incidents based on the forecast.
Parameters
----------
start_time: Timestamp or str convertible to Timestamp
The earliest time that should be included in the dictionary.
end_time: Timestamp or str convertible to Timestamp
The latest time that should be included in the dictionary.
incident_types: array-like of strings
The incident types to forecast for. Defaults to None. If None,
uses all incident types in the forecast.
Returns
-------
sampling_dict: dict,
The sampling dictionary as described below.
Notes
-----
Stores three results:
-self.sampling_dict, a dictionary like:
`{t -> {'type_distribution' -> probs,
'beta' -> expected interarrival time in minutes,
'time' -> the timestamp corresponding to start_time+t}}`
where t is an integer representing the time_units since the
start_time.
-self.sampling_start_time, timestamp of earliest time
in the dictionary.
-self.sampling_end_time, timestamp of the latest time
in the dictionary.
"""
assert self.forecast is not None, \
("No forecast available, initiate with load_forecast=True "
"or use .fit() and .predict() to create one.")
# determine incident types
if incident_types is not None:
fc = self.forecast[["ds"] + list(incident_types)].copy()
else:
fc = self.forecast.copy()
# determine start and end times
fc["ds"] = pd.to_datetime(fc["ds"], dayfirst=True)
if start_time is None:
start_time = fc["ds"].min()
if end_time is None:
end_time = fc["ds"].max()
msg = "Creating a sampling dictionary from {} to {}.".format(start_time, end_time)
progress(msg, verbose=self.verbose)
# process date time range and remove it from the forecast
fc = fc[(fc["ds"] >= start_time) & (fc["ds"] <= end_time)]
timestamps = fc["ds"].copy()
del fc["ds"]
# create the dictionary
rates_dict = fc.reset_index(drop=True).T.to_dict(orient="list")
self.sampling_dict = {}
for i, rts in rates_dict.items():
self.sampling_dict[i] = {"type_distribution": np.array(rts) / np.sum(rts),
"beta": 1 / np.sum(rts) * 60,
"lambda": np.sum(rts),
"time": timestamps.iloc[i]}
# save start and end time for future reference
self.sampling_start_time = start_time
self.sampling_end_time = end_time
progress("Sampling dictionary created.", verbose=self.verbose)
return self.sampling_dict
@staticmethod
def _infer_time_unit(time_sequence):
x = pd.to_datetime(pd.Series(time_sequence))
unit = (x.max() - x.min()) / (len(x)-1)
return unit
[docs] def save_forecast(self):
""" Save forecasted incident rate to csv. """
path = os.path.join(self.fc_dir, self.file_name)
self.forecast.to_csv(path, index=False)
progress("Forecast saved to {}.".format(path), verbose=self.verbose)
[docs] def get_forecast(self):
""" Return the DataFrame with the forecast. """
return self.forecast
[docs] def get_sampling_dict(self):
""" Return the dictionary from which to sample. """
if self.sampling_dict is None:
self._create_sampling_dict()
return self.sampling_dict
@staticmethod
def _create_date_hour_column(data, datetime_col="dim_incident_start_datumtijd"):
""" Create a datetime column with hourly precision to facilitate counting incidents
per hour. """
dts = pd.to_datetime(data[datetime_col])
return dts.apply(lambda x: pd.Timestamp(year=x.year, month=x.month,
day=x.day, hour=x.hour))
@staticmethod
def _create_complete_hourly_index(start_datetime, end_datetime=None, n_hours=None):
""" Create an array of datetime values of hourly precision from start to end or from
start until a specified number of values. The array can be used to reindex a dataframe
that counts incidents per hour, so that values of zero will be listed as well.
"""
start_datetime = pd.to_datetime(start_datetime)
end_datetime = pd.to_datetime(end_datetime)
if end_datetime is not None:
result = pd.date_range(start=start_datetime, end=end_datetime, freq="H")
elif n_hours is not None:
result = pd.date_range(start=start_datetime, end=end_datetime,
periods=n_hours, freq="H")
else:
raise ValueError("One of 'end_datetime' or 'n_hours' must be given")
index = pd.Index(result, name="hourly_index")
return index
[docs] def ts_cross_validate(self, data, n_splits=5, types=None, last_n_years=True, metric="MAE"):
""" Perform n-fold time series cross validation to evaluate the forecast method. """
if types is not None:
data = data[np.in1d(data["dim_incident_incident_type"], types)].copy()
else:
data = (data[~np.in1d(data["dim_incident_incident_type"], ["nan", "NVT", np.nan])]
.copy())
print(data.shape)
data["datetime"] = self._create_date_hour_column(data)
print("min, max time: {}, {}".format(data["datetime"].min(), data["datetime"].max()))
times = self._create_complete_hourly_index(data["datetime"].min(),
end_datetime=data["datetime"].max())
if types is None:
types = list(set(data["dim_incident_incident_type"].unique()) -
set(["nan", "NVT", np.nan]))
ts = pd.pivot_table(
data,
values="dim_incident_id",
index="datetime",
columns="dim_incident_incident_type",
aggfunc="count",
fill_value=0
)
ts = ts.reindex(times, fill_value=0, axis=0)
assert len(ts) == len(times), "Pivot went wrong, length pivot = {}, " \
"while length times = {}".format(len(ts), len(times))
if last_n_years:
splitter = YearSplitter(n_splits=n_splits, obs_per_year=365*24)
else: # use classic equal-sized folds
splitter = TimeSeriesSplit(n_splits=n_splits)
scores = []
for train_index, test_index in splitter.split(times):
future = pd.DataFrame({"ds": times[test_index]})
self.fit(
data[(data["datetime"] >= times[train_index[0]]) &
(data["datetime"] < times[train_index[-1]])],
types=types
)
self.predict(future=future)
y_predict = self.get_forecast().drop("ds", axis=1)
y_true = ts.iloc[test_index, :]
scores.append(self.evaluate(y_true, y_predict, metric=metric))
return np.array(scores)
def set_custom_forecast(self, forecast):
self.forecast = forecast
[docs]class ProphetIncidentPredictor(BaseIncidentPredictor):
""" Class that forecasts incident rate for different incident types.
Uses Facebook's Prophet to create a forecast of the incident rate.
It does so by calculating the hourly arrivals per incident type, then
treating this as a signal/time series and decomposing it into trend,
yearly pattern, weekly pattern, and daily pattern.
Example
-------
>>> predictor = ProphetIncidentPredictor(load_forecast=False)
>>> predictor.fit(incident_data)
>>> predictor.predict(periods=365*24, freq="H", save=True)
>>> forecast = predictor.get_forecast()
>>> forecast.head()
Parameters
----------
load_forecast: boolean
Whether to load a pre-existing forecast from disk.
Defaults to True, since recomputing forecasts is costly.
fc_dir: str
The directory in which forecasts should be saved and
from which they should be loaded if applicable. Defaults
to './data/forecasts/'.
verbose: boolean
Whether to print what is happening, defaults to True.
"""
__name__ = "ProphetIncidentPredictor"
def __init__(self, **kwargs):
try:
from fbprophet import Prophet
except ImportError:
raise ImportError("the ProphetIncidentPredictor requires the fbprophet package."
" Install with 'pip install fbprophet'.")
self.fitted = False
self.file_name = "prophet_forecast.csv"
self.sampling_dict = None
super().__init__(**kwargs)
[docs] def fit(self, data, types=None):
""" Perform time series decomposition using Prophet.
This function first prepares the data and saves the prepared data
as 'self.incidents'. then it creates a dictionary of Prophet() objects,
where the keys equal the incident types and the corresponding model
is fitted to the data of that type. The dictionary of models is stored
as 'self.models_dict' and used when predict is called.
Notes
-----
This function does not return anything.
Parameters
----------
data: pd.DataFrame
The incidents to train the models on.
types: Sequence(str)
The incident types to fit models for. If None, uses
all incident types in the data, except 'nan' and 'NVT'.
Defaults to None.
"""
if types is not None:
self.types = types
else:
progress("No incident types given, using all types in data.", verbose=self.verbose)
self.types = [t for t in data["dim_incident_incident_type"]
.unique() if t not in ["nan", "NVT", np.nan]]
progress("Preparing incident data for analysis...", verbose=self.verbose)
self.incidents = self._prep_data_for_prediction(data)
self.incidents["hourly_datetime"] = self._create_date_hour_column(
self.incidents,
datetime_col="dim_incident_start_datumtijd"
)
start = self.incidents["hourly_datetime"].min()
end = self.incidents["hourly_datetime"].max()
self.time_index = self._create_complete_hourly_index(start, end_datetime=end)
self.models_dict = dict()
for type_ in self.types:
progress("Fitting model for type {}...".format(type_), verbose=self.verbose)
m = Prophet()
dfprophet = self._create_prophet_data(self.incidents, self.time_index, type_=type_)
m.fit(dfprophet)
self.models_dict[type_] = m
self.fitted = True
progress("Models fitted.", verbose=self.verbose)
[docs] def predict(self, periods=365*24, freq="H", save=False, future=None):
""" Forecast the incident rate using Prophet.
Notes
-----
Can only be called after calling '.fit()', throws assertion error
otherwise. Does not return anything, since it's main use cases are
sampling from directly from this predictor and saving predictions to
file. The result of this method can be obtained by calling
'get_forecast()' afterwards.
Parameters
----------
periods: int
The number of periods to forecast.
freq: str,
The frequency to predict the incident rates at. Accepts any valid frequency
for pd.date_range, such as 'H' (default), 'D', or 'M'.
save: boolean
Whether to save the forecast to a csv file. Optional, defaults to false.
"""
assert self.fitted, "First use 'fit()' to fit a model before predicting."
if future is None:
future = self.models_dict[self.types[0]].make_future_dataframe(
periods=periods, freq=freq, include_history=False)
forecast_dict = dict(ds=future["ds"].tolist())
for type_ in self.types:
progress("Predicting incident rates for {}".format(type_), verbose=self.verbose)
forecast_dict[type_] = np.maximum(0.0, self.models_dict[type_]
.predict(future)["yhat"]
.tolist())
self.forecast = pd.DataFrame(forecast_dict)
progress("Forecast made.", verbose=self.verbose)
if save:
self.save_forecast()
def _prep_data_for_prediction(self, incidents):
""" Format time columns.
Parameters
----------
incidents: pd.DataFrame
The incident data to prepare.
Returns
-------
The prepared DataFrame and a pd.Index with hourly timestamps.
"""
incidents["dim_tijd_uur"] = (incidents["dim_tijd_uur"].astype(float)
.astype(int)
.astype(str)
.str.zfill(2)
.copy())
incidents["dim_datum_datum"] = pd.to_datetime(
incidents["dim_datum_datum"]).dt.strftime("%Y-%m-%d").copy()
# assert that it's sorted
incidents.sort_values(["dim_datum_datum", "dim_tijd_uur"],
ascending=True, inplace=True)
return incidents
def _create_prophet_data(self, incidents, new_index, type_=None,
groupby_col="hourly_datetime"):
""" Create a DataFrame in the format required by Prophet.
Parameters
----------
incidents: pd.DataFrame
The incident data.
new_index: pandas.Index object
Specifies the times that the resulting DataFrame should contain.
type_: str or None (default: None)
The incident type to make a DataFrame for. If None, ignore incident types.
Returns
-------
A DataFrame with two columns: 'ds' and 'y', where 'ds' is the timestamp
and 'y' is the number of incidents per time_unit. This DataFrame can be
used directly as input for Prophet.fit().
"""
cols = ["dim_incident_id", groupby_col, "dim_incident_incident_type"]
dfprophet = incidents[cols].copy()
if type_ is not None:
dfprophet = dfprophet[dfprophet["dim_incident_incident_type"] == type_]
dfprophet = dfprophet.groupby(groupby_col)["dim_incident_id"].size()
dfprophet = dfprophet.reindex(new_index, fill_value=0).reset_index()
dfprophet.rename(columns={"dim_incident_id": "y"}, inplace=True)
dfprophet["ds"] = dfprophet[new_index.name]
return dfprophet[["ds", "y"]]
[docs]class BasicLambdaForecaster(BaseIncidentPredictor):
"""Forecast arrival rates of incidents based on historic averages.
Arrival rates are obtained for every hour in the week, per month, per type of incident.
So, different weeks in the same month always get the same arrival rates, but weeks in
different months have different rates. Rates are determined as the average number of
arrivals in a similar period.
For example, the rate for a Monday in January between 8:00 and 9:00 is calculated as the
average number of incidents between 8:00 and 9:00 of all Mondays in January in the time
range of the data.
Parameters
----------
ignore_dates: array-like of datetime objects,
Dates that are considered 'out of the ordinary' in terms of number of incidents
and should not be taken into account when calculating average incident rates.
Typically, this list includes days with storms and impactful events such as
New Year's Eve and perhaps Kingsday.
id_col, date_col, month_col, day_name_col, hour_col: str, optional,
The column names indicating respectively the id of the incident, the date,
month number, name of the week day, hour of day in [0, 24).
**kwargs: dict,
Parameters passed to BaseIncidentPredictor.
"""
def __init__(self, ignore_dates=None, id_col="dim_incident_id",
type_col="dim_incident_incident_type", date_col="dim_datum_datum",
month_col="dim_datum_maand_nr", month_day_col="dim_datum_maand_dag_nr",
day_name_col="dim_datum_dag_naam_nl", hour_col="dim_tijd_uur",
file_name="basic_lambda_forecast.csv", **kwargs):
# store names of columns for use in multiple methods.
self.id_col = id_col
self.type_col = type_col
self.date_col = date_col
self.month_col = month_col
self.day_name_col = day_name_col
self.hour_col = hour_col
self.month_day_col = month_day_col
# save dates to ignore during modeling
if ignore_dates is not None:
ignore_dates = np.array(ignore_dates)
if isinstance(ignore_dates[0], str):
self.ignore_dates = pd.to_datetime(ignore_dates).values
print("Found string values in 'ignore_dates'. I've converted them to datetime,"
" but it's safer to provide datetime objects in the first place.")
else:
self.ignore_dates = ignore_dates
else:
self.ignore_dates = None
# fixed attributes
self.lambdas = None
self.fitted = False
self.day_col = "weekday_number"
self.file_name = file_name
super().__init__(**kwargs)
[docs] def fit(self, data, last_n_years=8, fit_nye=True):
"""Obtain arrival rates from the data.
Fits arrival rates per incident type, month, day of the week, and hour of the day.
Saves the results under self.lambdas and self.nye_lambdas (if fit_nye == True). Sets
self.fitted = True when fit procedure is completed.
Parameters
----------
data: pd.DataFrame,
The incident data.
last_n_years: int, optional (default: 8),
How many years to use to estimate the arrival rates. It uses the latest
'last_n_years' years.
fit_nye: boolean, optional (default: True),
Whether to fit New Year's Eve separately (True) or to treat it as a regular day.
"""
progress("Start fitting arrival rates.", verbose=self.verbose)
# prepare data
data = self._filter_data(data, last_n_years=last_n_years)
data[self.day_col] = data[self.day_name_col].map(
{"Maandag": 1, "Dinsdag": 2, "Woensdag": 3, "Donderdag": 4,
"Vrijdag": 5, "Zaterdag": 6, "Zondag": 7})
for col in [self.month_col, self.day_col, self.hour_col, self.month_day_col]:
data[col] = data[col].astype(float).astype(int)
# obtain lambdas
progress("Obtaining lambdas..", verbose=self.verbose)
lambdas = (data.groupby([self.type_col, self.month_col])
.apply(lambda x: self._get_incidents_per_hour_of_week(x, x.name[1])))
# reindex on a complete set of types, months, and weekdays
new_index = pd.MultiIndex.from_product(
[data[self.type_col].unique(), np.arange(1, 13), np.arange(1, 8)],
names=[self.type_col, self.month_col, self.day_col]
)
lambdas = lambdas.reindex(new_index, fill_value=0)
# stack the hour columns and use types as columns instead
self.lambdas = lambdas.stack().unstack(self.type_col, fill_value=0)
progress("Lambdas obtained.", verbose=self.verbose)
if fit_nye:
progress("Fitting New Year's Eve.", verbose=self.verbose)
self.nye_lambdas = self._get_incidents_at_nye(data)
progress("New Year's Eve arrival rates fitted.", verbose=self.verbose)
progress("Fit completed.", verbose=self.verbose)
self.fitted = True
[docs] def predict(self, start, end, predict_nye=True, save=False):
"""Forecast arrival rates for a given future period and save it under 'self.forecast'.
Parameters
----------
start, end: datetime object,
The start and end dates and times (rounded to the whole hour) for the period
to forecast.
predict_nye: boolean, optional (default: True),
Whether to predict NYE with high activity like in reality (True) or ignore it
and forecast a regular day instead (False).
"""
assert self.fitted, "First use the 'fit' method before making predictions."
def replace_with_other(df1, df2, match_cols, fill_cols):
"""Fill one dataframe with values from another, based on specified columns."""
assert len(match_cols) == 3, "This function needs three columns to match on."
for i in range(len(df2)):
mask = ((df1[match_cols[0]] == df2[match_cols[0]].iloc[i]) &
(df1[match_cols[1]] == df2[match_cols[1]].iloc[i]) &
(df1[match_cols[2]] == df2[match_cols[2]].iloc[i]))
df1.loc[mask, fill_cols] = df2[fill_cols].iloc[i, :].values
return df1
# create dataframe with requested date range
indx = pd.date_range(start=start, end=end, freq="H")
df = pd.DataFrame({"ds": pd.Series(indx)})
df[self.month_col] = df["ds"].dt.month
df[self.day_col] = df["ds"].apply(lambda x: x.isoweekday())
df[self.month_day_col] = df["ds"].dt.day
df[self.hour_col] = df["ds"].dt.hour
types = self.lambdas.columns
for type_ in types:
df[type_] = np.nan
lambdas = self.lambdas.copy()
lambdas.reset_index(drop=False, inplace=True)
# fill with the overall patterns/lambdas
progress("Filling future DataFrame..", verbose=self.verbose)
cols = [self.month_col, self.day_col, self.hour_col]
df = replace_with_other(df, lambdas, cols, types)
progress("DataFrame filled with general patterns (shape: {}).".format(df.shape))
# fill NYEs with high activity if requested
if predict_nye:
progress("Filling future New Year's Eves", verbose=self.verbose)
cols = [self.month_col, self.month_day_col, self.hour_col]
nye = self.nye_lambdas.copy()
nye.reset_index(drop=False, inplace=True)
df = replace_with_other(df, nye, cols, types)
msg = "New Year's Eve forecasts added to DataFrame (shape: {})".format(df.shape)
progress(msg, verbose=self.verbose)
# remove added columns
df.drop([self.month_col, self.day_col, self.month_day_col, self.hour_col],
axis=1, inplace=True)
self.forecast = df
progress("Forecast created.", verbose=self.verbose)
if save:
self.save_forecast()
def _filter_data(self, data, remove_unfinished_month=True, last_n_years=5):
"""Filter out some stuff for proper analysis."""
data[self.date_col] = pd.to_datetime(data[self.date_col], dayfirst=True)
end = data[self.date_col].max()
if remove_unfinished_month:
cutoff = pd.Timestamp(year=end.year, month=end.month, day=1, hour=0)
progress("Cutting off at {}.".format(cutoff))
data = data[data[self.date_col] < cutoff].copy()
else:
cutoff = end
if last_n_years:
start = pd.Timestamp(
year=(cutoff.year - last_n_years),
month=cutoff.month,
day=cutoff.day,
hour=cutoff.hour
)
progress("Using incidents after {}.".format(start))
data = data[data[self.date_col] >= start].copy()
progress("Data filtered.", verbose=self.verbose)
return data
def _get_incidents_per_hour_of_day(self, data, day_of_week=None, month=None):
"""Get average number of incidents per time (hour) of the day.
Parameters
----------
data: pd.DataFrame,
day_of_week: int, default: None,
Number of the weekday in [1, 7] (starting at Monday, ending at Sunday).
If None, averages over all days. Otherwise, calculates only for the
requested day of the week.
month: int, default: None,
Month number [1, 12]. If None, averages over all months. Otherwise,
calculates only for the requested month.
"""
start, end = data[self.date_col].min(), data[self.date_col].max()
date_range = pd.Series(pd.date_range(start=start, end=end, freq="H"))
# filter date range on weekday, month, and outliers
if day_of_week is not None:
date_range = date_range[date_range.apply(lambda x: x.isoweekday()) == day_of_week]
if month is not None:
date_range = date_range[date_range.dt.month == month]
if self.ignore_dates is not None:
date_range = date_range[~np.in1d(date_range, self.ignore_dates)]
indx = pd.MultiIndex.from_arrays(
[date_range.apply(lambda x: pd.Timestamp(year=x.year, month=x.month, day=x.day)),
date_range.apply(lambda x: x.hour)],
names=(self.date_col, self.hour_col))
grouped = data.groupby([self.date_col, self.hour_col])[self.id_col].count()
reindexed = grouped.reindex(indx, fill_value=0).reset_index()
means = pd.Series(reindexed.groupby(self.hour_col)[self.id_col].mean(),
name=day_of_week)
return means
def _get_incidents_per_hour_of_week(self, data, month=None):
"""Get the mean number of incidents per every hour in a week.
Parameters
----------
data: pd.DataFrame
month: int, default: None,
The month number in [1, 12]. If None, averages over all months.
Returns
-------
lambdas: pd.DataFrame,
The arrival rates in a table with a row for every day of the week and a column
for every hour of the day."""
grouped = pd.DataFrame({d: self._get_incidents_per_hour_of_day(data, day_of_week=d, month=month) for d in np.arange(1, 8)}).T
grouped.index.rename(self.day_col, inplace=True)
return grouped
def _get_incidents_at_nye(self, data):
"""Obtain average arrivals around New Year's Eve."""
start, end = data[self.date_col].min(), data[self.date_col].max()
years = np.array(
[[pd.Timestamp(year=y, month=12, day=31), pd.Timestamp(year=y+1, month=1, day=1)]
for y in range(start.year, end.year)]).flatten()
hours = np.array([h for h in range(0, 24)], dtype=np.int8)
indx = pd.MultiIndex.from_product([years, hours], names=[self.date_col, self.hour_col])
data = (data.groupby([self.type_col, self.date_col, self.hour_col])[self.id_col]
.count()
.unstack(self.type_col, fill_value=0)
.reindex(indx, fill_value=0)
.reset_index())
data[self.month_col] = data[self.date_col].dt.month
data[self.month_day_col] = data[self.date_col].dt.day
rates = data.groupby([self.month_col, self.month_day_col, self.hour_col]).mean()
return rates
[docs]class YearSplitter():
""" Split data on whole years to provide constant evaluation metric. """
def __init__(self, n_splits=3, obs_per_year=365*24):
self.n = n_splits
self.obs_per_year = obs_per_year
def split(self, data):
N = len(data)
splits = [tuple([np.arange(0, N - self.obs_per_year * i),
np.arange(N - self.obs_per_year * i, N - self.obs_per_year*(i - 1))])
for i in range(1, self.n + 1)]
for train, test in splits[::-1]:
yield train, test