Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 43 additions & 44 deletions jobs/kpi-forecasting/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# KPI and other Metric Forecasting

This job forecasts [Metric Hub](https://mozilla.acryl.io/glossaryNode/urn:li:glossaryNode:Metric%20Hub/Contents?is_lineage_mode=false) metrics based on YAML configs defined in `.kpi-forecasting/configs`. The output destinations in BigQuery for each config can be found in the `write_results` section. Note that different configs can write to the same table.

Currently the forecasts are all done by Prophet. There are two classes used:
- `models/prophet_forecast.py/ProphetForecast` Fits a single prophet model on the entire dataset, configured as specified in the config file
- `models/funnel_forecast.py/FunnelForecast` Fits multiple models based on what segment they fall into. Segments are defined in the `metric_hub.segments` in which columns in the data are specified to used for segmentation. The data is partitioned into subsets based on all the different combinations of values the specified columns can take. A subset of the parameters can be used to specify parameters for partitions with specific values on those parameters. For funnel forecast, the `parameters` section of the config is a list, each element of which specifies configuration to be applied to partitions where the columns and values within those columns have the values of the keys and values of the `parameters.segement` fields respectively. The segmentation functionality is defined in `models/base_forecast.py/BaseEnsembleForecast`. Additionally, funnel forecast has automatic hyperparameter tuning which is implemented by `models/funnel_forecast.py/ProphetAutotunerForecast`.

This job forecasts [Metric Hub](https://mozilla.acryl.io/glossaryNode/urn:li:glossaryNode:Metric%20Hub/Contents?is_lineage_mode=false) metrics based on YAML configs defined in `.kpi-forecasting/configs`.

# Usage

Expand Down Expand Up @@ -43,11 +38,16 @@ Note that if the code changes, `docker compose build` needs to be re-run for `do
## Local Python
### Setup

You can also run the code outside of a Docker container. The code below shows to create a new environment
You can also run the code outside of a Docker container. The code below creates a new Conda environment called `kpi-forecasting-dev`.
It assumes you have Conda installed. If you'd like to run the code in a Jupyter notebook, it is handy to install Jupyter in your `base` environment.
The `ipykernel` commands below will ensure that the `kpi-forecasting-dev` environment is made available to Jupyter.

```sh
pyenv virtualenv 3.9.17 <name>
pyenv activate <name>
conda create --name kpi-forecasting-dev python=3.10 pip ipykernel
conda activate kpi-forecasting-dev
ipython kernel install --name kpi-forecasting-dev --user
pip install -r requirements.txt
conda deactivate
```

If you're running on an M1 Mac, there are [currently some additional steps](https://github.com/facebook/prophet/issues/2250#issuecomment-1317709209) that you'll need to take to get Prophet running. From within your python environment, run the following (making sure to update the path appropriately):
Expand Down Expand Up @@ -108,47 +108,49 @@ metric_hub: # this configures the observed data fed to the model which is obtai
partner: "partner"
where: "partner = 'Google'" # filter to apply to the metric hub pull

forecast_model: # this section configures the model
forecast_start: NULL
forecast_model: # this section configures the model
model_type: "funnel"
# type of model object to use, current options are "funnel" for FunnelForecast and "prophet" for ProphetForecast
start_date: NULL
# starting date for the predicted data (unless predict_historical_dates is set),
# if unset, value depends on predict_historical_dates.
forecast_end: NULL
end_date: NULL
# final date for the predicted data
use_all_us_holidays: False
For prophet-based models, when true, call `model.add_country_holidays(country_name="US")` on model
predict_historical_dates: True
# if predict_historical_dates is True, set to first date of the observed data
# if predict_historical_dates is False, defaults to the day after the last day in the observed data
number_of_simulations: 1000
# for prophet-based models,number of simulations to run
parameters:
# this section can be a map or a list.
# If it's a map, these parameters are used for all models
# (recall multiple models are train if there is a metric_hub.segments)
# If it's a list, it will set different parameters
# for different subsets of the parition specified in `metric_hub.segments`.
- segment:
# specifies which subset of the partitions this applies to
# key is a column specified in metric_hub.segments
# value is a value that column can take to which the configuration is applied
- segment:
# specifies which subset of the partitions this applies to
# key is a column specified in metric_hub.segments
# value is a value that column can take to which the configuration is applied
device: desktop
start_date: "2018-01-01"
# start date specific to a segment, superceeds
# forecast_start_date
parameters:
holidays: ["easter", "covid_sip11"]
# holidays specified in `configs.model_inputs.holidays` to use.
regressors: ["post_esr_migration", "in_covid"]
# regressors specified in `configs.model_inputs.regressors`
use_all_us_holidays: False
grid_parameters:
# sets grid for hyperparameter tuning
changepoint_prior_scale: [0.001, 0.01, 0.1, 0.2, 0.5]
changepoint_range: [0.8, 0.9]
weekly_seasonality: True
yearly_seasonality: True
cv_settings:
# sets parameters for prophet cross-validation used in FunnelForecast
initial: "1296 days"
period: "30 days"
horizon: "30 days"
parallel: "processes"
start_date: "2018-01-01" # only applies to FunnelForecast, allows one to set start date for each sub-model
end_date: NULL # only applies to FunnelForecast, allows one to set end date for each sub-model
holidays: ["easter", "covid_sip11"] # holidays specified in `configs.model_inputs.holidays` to use.
regressors: ["post_esr_migration", "in_covid", "ad_click_bug"] # regressors specified in `configs.model_inputs.regressors`
grid_parameters:
# sets grid for hyperparameter tuning
changepoint_prior_scale: [0.001, 0.01, 0.1, 0.2, 0.5] # parameter of prior distribution controlling how much the trend fluctuates at changepoints
changepoint_range: [0.8, 0.9] # the proportion of the time series over which the changepoints are distributed
n_changepoints: [25, 50] # number of trend changepoints, equally spaced over the time series
weekly_seasonality: True # if weekly seasonality is included in the model
yearly_seasonality: True # if yearly seasonality is included in the model
cv_settings:
# sets parameters for prophet cross-validation used in FunnelForecast
initial: "1296 days" # the initial training period, used to train the first iteration of the model for CV
period: "30 days" # spacing between cutoff dates, the sliding window over which each round of cross validation is performed
horizon: "30 days" # forecast horizon used to make predictions and calculate model fit metrics for optimization
parallel: "processes" # how parallelization is performed by Prophet, or None if no paralellization is used
...

summarize:
Expand All @@ -173,12 +175,9 @@ write_results:
- `./kpi_forecasting/models` contains the forecasting models.

This repo was designed to make it simple to add new forecasting models in the future. In general, a model needs to inherit
the `models.base_forecast.BaseForecast` class and to implement the `fit` and `predict` methods.

When testing locally, be sure to modify any config files to use non-production `project` and `dataset` values that you have write access to; otherwise the `write_output` step will fail.

## Interface
The forecast objects in this repo implement an interface similar to `sklearn` or `darts`. Every forecast method should have a `fit` method for fitting the forecast and `predict` method for making predictions. The signature of these functions can be seen in `models.base_forecast.BaseForecast`.
the `models.base_forecast.BaseForecast` class and to implement the `_fit` and `_predict` methods. Output from the `_fit` method will automatically be validated by `BaseForecast._validate_forecast_df`.

The `BaseEnsembleForecast` makes it possible to fit multiple models over the data, where different subsets of the data have different models applied to them. These subsets are referred to as "segments" in the code. Only one kind of model is supported, and different instances of this model are fit over the different segments. The type of model is set by the `model_class` argument, and should be a class that implements the same interface as `BaseForecast`. The `fit` and `predict` methods in `BaseEnsembleForecast` determine which segment each row of incoming data belongs to and uses the `fit` and `predict` methods of the model class on the segment. This can be seen in the `FunnelForecast` object, which uses the `BaseEnsembleForecast` with `ProphetAutotunerForecast` as the model_class.
One caveat is that, in order for aggregations over time periods to work (e.g. monthly forecasts), the `_predict` method must generate a number
of simulated timeseries. This enables the measurement of variation across a range of possible outcomes. This number is set by `BaseForecast.number_of_simulations`.

When testing locally, be sure to modify any config files to use non-production `project` and `dataset` values that you have write access to; otherwise the `write_output` step will fail.
172 changes: 13 additions & 159 deletions jobs/kpi-forecasting/kpi_forecasting.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
import pandas as pd
from datetime import datetime, timezone, timedelta
import json
import pickle

from kpi_forecasting.inputs import CLI, load_yaml
from kpi_forecasting.models.prophet_forecast import (
ProphetForecast,
summarize as prophet_summarize,
write_results as prophet_write_results,
summarize_legacy as prophet_summarize_legacy,
)
from kpi_forecasting.models.funnel_forecast import (
FunnelForecast,
summarize as funnel_summarize,
write_results as funnel_write_results,
)
from kpi_forecasting.models.prophet_forecast import ProphetForecast
from kpi_forecasting.models.funnel_forecast import FunnelForecast
from kpi_forecasting.metric_hub import MetricHub


Expand All @@ -25,154 +11,22 @@
}


class KPIPipeline:
def __init__(self, config_path):
self.config_data = load_yaml(filepath=config_path)
self.model_type = self.config_data["model_type"]
if self.model_type == "funnel":
self.model_class = FunnelForecast
self.segments = list(self.config_data["metric_hub"]["segments"].keys())
elif self.model_type == "prophet":
self.model_class = ProphetForecast
self.segments = None
else:
raise ValueError(f"Model type {self.model_type} is not supported")

def add_metadata(self, summary_df):
# add Metric Hub metadata columns
summary_df["metric_alias"] = self.metric_hub.alias.lower()
summary_df["metric_hub_app_name"] = self.metric_hub.app_name.lower()
summary_df["metric_hub_slug"] = self.metric_hub.slug.lower()
summary_df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date)
summary_df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date)
summary_df["metric_collected_at"] = self.collected_at

# add forecast model metadata columns
summary_df["forecast_start_date"] = self.start_date
summary_df["forecast_end_date"] = self.end_date
summary_df["forecast_trained_at"] = self.trained_at
summary_df["forecast_predicted_at"] = self.predicted_at

return summary_df

def get_historical_data(self):
self.metric_hub = MetricHub(**self.config_data["metric_hub"])
self.collected_at = datetime.now(timezone.utc).replace(tzinfo=None)
observed_df = self.metric_hub.fetch()
return observed_df

def get_predict_dates(self, observed_df):
self.start_date = pd.to_datetime(
self.config_data["forecast_model"]["forecast_start"]
or self._default_start_date(observed_df)
)
self.end_date = pd.to_datetime(
self.config_data["forecast_model"]["forecast_end"]
or self._default_end_date()
)
return pd.DataFrame(
{"submission_date": pd.date_range(start_date, end_date).date}
)

def fit(self, observed_df):
# the model parameters are mixed in a bit with paraters to configure
# the predict dates so we have to do this

if self.model_type == "funnel":
model_parameters = {
"parameters": self.config_data["forecast_model"]["parameters"],
"segments": self.segments,
}
elif self.model_type == "prophet":
model_parameters = self.config_data["forecast_model"]["parameters"]
model = self.model_class(**model_parameters)
self.trained_at = datetime.now(timezone.utc).replace(tzinfo=None)
return model.fit(observed_df)

def predict_and_summarize(self, model, predict_dates, observed_df):
self.predicted_at = datetime.now(timezone.utc).replace(tzinfo=None)
raw_predictions = model.predict(predict_dates)
if self.model_type == "funnel":
# get filtered observed data
observed_df_filtered = model.get_filtered_observed_data(observed_df)
return funnel_summarize(
raw_predictions,
observed_df_filtered,
segment_cols=self.segments,
**self.config_data["summarize"],
)
elif self.model_type == "prophet":
forecast_parameters = json.dumps(model._get_parameters())
return prophet_summarize(
raw_predictions,
observed_df,
forecast_parameters=forecast_parameters,
**self.config_data["summarize"],
)

def write_results(self, model, summarized, predict_dates):
summarized = self.add_metadata(summarized)
if self.model_type == "funnel":
components_df_list = []
# create the components dataframe
for segment in model.segment_models:
components_df = segment["model"].components_df
for key, val in segment["segment"].items():
components_df[key] = val
components_df_list.append(components_df)
components_df = pd.concat(components_df_list)
components_df["metric_slug"] = self.metric_hub.slug
components_df["forecast_trained_at"] = self.trained_at
funnel_write_results(
summarized,
components_df,
segment_cols=self.segments,
**self.config_data["write_results"],
)
elif self.model_type == "prophet":
forecast_df_legacy = model._predict_legacy(
predict_dates,
self.metric_hub.alias,
model._get_parameters(),
)
summary_df_legacy = prophet_summarize_legacy(summarized)
prophet_write_results(
summarized,
summary_df_legacy,
forecast_df_legacy,
**self.config_data["write_results"],
)

def _default_start_date(self, observed_df) -> str:
"""The first day after the last date in the observed dataset."""
if self.config_data["forecast_model"]["predict_historical_dates"]:
return observed_df["submission_date"].min()
else:
return observed_df["submission_date"].max() + timedelta(days=1)

def _default_end_date(self) -> str:
"""78 weeks (18 months) ahead of the current UTC date."""
return (
datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(weeks=78)
).date()


def main() -> None:
# Load the config
config_path = CLI().args.config
config = load_yaml(filepath=CLI().args.config)
model_type = config["forecast_model"]["model_type"]

pipeline = KPIPipeline(config_path)
if model_type in MODELS:
metric_hub = MetricHub(**config["metric_hub"])
model = MODELS[model_type](metric_hub=metric_hub, **config["forecast_model"])

observed_df = pipeline.get_historical_data()
fit_model = pipeline.fit(observed_df=observed_df)
predict_dates = pipeline.get_predict_dates(observed_df)
summarized = pipeline.predict_and_summarize(
fit_model, predict_dates.copy(), observed_df
)
pipeline.write_results(fit_model, summarized, predict_dates.copy())
model.fit()
model.predict()
model.summarize(**config["summarize"])
model.write_results(**config["write_results"])

with open("main_model.pkl", "wb") as f:
pickle.dump(fit_model, f)
else:
raise ValueError(f"Don't know how to forecast using {model_type}.")


if __name__ == "__main__":
Expand Down
12 changes: 5 additions & 7 deletions jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
model_type: prophet
metric_hub:
app_name: "firefox_desktop"
slug: "daily_active_users_v2"
Expand All @@ -8,16 +7,17 @@ metric_hub:
end_date: NULL

forecast_model:
forecast_start: NULL
forecast_end: NULL
model_type: "prophet"
start_date: NULL
end_date: NULL
use_all_us_holidays: False
predict_historical_dates: False
number_of_simulations: 1000
parameters:
seasonality_prior_scale: 0.00825
changepoint_prior_scale: 0.15983
weekly_seasonality: True
yearly_seasonality: True
use_all_us_holidays: False


summarize:
periods: ["day", "month"]
Expand All @@ -30,5 +30,3 @@ write_results:
dataset: "telemetry_derived"
dataset_legacy: "telemetry_derived"
table: "kpi_forecasts_v0"
forecast_table_legacy: "kpi_automated_forecast_v1"
confidences_table_legacy: "kpi_automated_forecast_confidences_v1"
11 changes: 5 additions & 6 deletions jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
model_type: prophet
metric_hub:
app_name: "multi_product"
slug: "mobile_daily_active_users_v1"
Expand All @@ -8,15 +7,17 @@ metric_hub:
end_date: NULL

forecast_model:
forecast_start: NULL
forecast_end: NULL
model_type: "prophet"
start_date: NULL
end_date: NULL
use_all_us_holidays: True
predict_historical_dates: False
number_of_simulations: 1000
parameters:
seasonality_prior_scale: 0.01
changepoint_prior_scale: 0.01
weekly_seasonality: True
yearly_seasonality: True
use_all_us_holidays: True

summarize:
periods: ["day", "month"]
Expand All @@ -29,5 +30,3 @@ write_results:
dataset: "telemetry_derived"
dataset_legacy: "telemetry_derived"
table: "kpi_forecasts_v0"
forecast_table_legacy: "kpi_automated_forecast_v1"
confidences_table_legacy: "kpi_automated_forecast_confidences_v1"
Loading