Skip to content
244 changes: 183 additions & 61 deletions jetstream/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import dask
import dask.delayed
import pytz
from dask.distributed import Client, LocalCluster
from dask.distributed import Client, LocalCluster, as_completed
from dask.graph_manipulation import bind
from google.api_core.exceptions import BadRequest
from google.api_core.exceptions import BadRequest, GoogleAPICallError
from google.cloud import bigquery
from google.cloud.bigquery.job import WriteDisposition
from google.cloud.exceptions import Conflict
Expand Down Expand Up @@ -59,6 +59,24 @@
_dask_cluster = None


@dask.delayed
def _successful_metrics_dict(
metric_table_results: list[str],
all_metrics_by_ds: dict[str, list[str]],
) -> dict[str, list[str]]:
"""Return only the data-source entries whose metric table was successfully computed."""
try:
return {
name: metrics
for (name, metrics), result in zip(
all_metrics_by_ds.items(), metric_table_results, strict=True
)
if result
}
except Exception:
return {}


@attr.s(auto_attribs=True)
class Analysis:
"""Wrapper for analysing experiments."""
Expand Down Expand Up @@ -244,6 +262,12 @@ def publish_view(
metrics_dict: dict[str, list[str]] | None = None,
):
assert self.config.experiment.normandy_slug is not None
if metrics_dict is not None and not metrics_dict:
logger.warning(
f"all metrics queries failed for {window_period.value} {analysis_basis};"
"skipping publish view..."
)
return
normalized_slug = bq_normalize_name(self.config.experiment.normandy_slug)
view_name = "_".join([normalized_slug, window_period.table_suffix])
wildcard_expr = normalized_slug
Expand Down Expand Up @@ -348,7 +372,17 @@ def publish_view(
)

logger.debug(f"View ({view_name}) SQL: {sql}")
self.bigquery.execute(sql)
try:
self.bigquery.execute(sql)
except GoogleAPICallError as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
"analysis_basis": analysis_basis,
},
)
raise

@dask.delayed
def calculate_metrics(
Expand Down Expand Up @@ -428,9 +462,21 @@ def calculate_metrics(
)
logger.info(metrics_sql)

results = self.bigquery.execute(
metrics_sql, res_table_name, experiment_slug=self.config.experiment.normandy_slug
)
try:
results = self.bigquery.execute(
metrics_sql,
res_table_name,
experiment_slug=self.config.experiment.normandy_slug,
)
except GoogleAPICallError as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
"analysis_basis": analysis_basis,
},
)
raise
logger.info(
f"Metric query cost: {results.slot_millis * COST_PER_SLOT_MS}",
)
Expand All @@ -451,7 +497,7 @@ def calculate_metric_for_ds(
) -> str:
"""
Calculate individual metric for a specific experiment.
Returns the BigQuery table results are written to.
Returns the BigQuery table results are written to, or empty str on failure.
"""
window = len(time_limits.analysis_windows)
last_analysis_window = time_limits.analysis_windows[-1]
Expand Down Expand Up @@ -517,18 +563,18 @@ def calculate_metric_for_ds(
f"{results.slot_millis * COST_PER_SLOT_MS}"
)
self._write_sql_output(res_table_name, metrics_sql)
except ValueError as e:
except (ValueError, GoogleAPICallError) as e:
for metric in metrics:
# log an exception for each failed metric because this is how we track errors
logger.exception(
str(e),
exc_info=e,
extra={
"experiment": self.config.experiment.normandy_slug,
"metric": metric.name,
"analysis_basis": analysis_basis,
},
)
return ""

return res_table_name

Expand All @@ -545,58 +591,88 @@ def calculate_statistics(
"""
Run statistics on metric.
"""
return (
Summary.from_config(metric, analysis_length_dates, period)
.run(
segment_data,
self.config.experiment,
analysis_basis,
segment,
if segment_data is None:
return StatisticResultCollection.model_validate([])
try:
return (
Summary.from_config(metric, analysis_length_dates, period)
.run(
segment_data,
self.config.experiment,
analysis_basis,
segment,
)
.set_segment(segment)
.set_analysis_basis(analysis_basis)
)
.set_segment(segment)
.set_analysis_basis(analysis_basis)
)
except Exception as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
"metric": metric.metric.name,
"statistic": metric.statistic.name(),
"analysis_basis": analysis_basis,
"segment": segment,
},
)
return StatisticResultCollection.model_validate([])

@dask.delayed
def counts(
self, segment_data: DataFrame, segment: str, analysis_basis: AnalysisBasis
) -> StatisticResultCollection:
"""Count and missing count statistics."""
metric = "identity"
counts = (
Count()
.transform(
segment_data,
metric,
"*",
self.config.experiment.normandy_slug,
analysis_basis,
segment,
if segment_data is None:
return StatisticResultCollection.model_validate([])
try:
metric = "identity"
counts = (
Count()
.transform(
segment_data,
metric,
"*",
self.config.experiment.normandy_slug,
analysis_basis,
segment,
)
.set_segment(segment)
.set_analysis_basis(analysis_basis)
)
.set_segment(segment)
.set_analysis_basis(analysis_basis)
)

other_counts = [
StatisticResult(
metric=metric,
statistic="count",
parameter=None,
branch=b.slug,
comparison=None,
comparison_to_branch=None,
ci_width=None,
point=0,
lower=None,
upper=None,
segment=segment,
analysis_basis=analysis_basis,
)
for b in self.config.experiment.branches
if b.slug not in {c.branch for c in counts}
]
other_counts = [
StatisticResult(
metric=metric,
statistic="count",
parameter=None,
branch=b.slug,
comparison=None,
comparison_to_branch=None,
ci_width=None,
point=0,
lower=None,
upper=None,
segment=segment,
analysis_basis=analysis_basis,
)
for b in self.config.experiment.branches
if b.slug not in {c.branch for c in counts}
]

return StatisticResultCollection.model_validate(counts.root + other_counts)
return StatisticResultCollection.model_validate(counts.root + other_counts)
except Exception as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
"metric": "identity",
"statistic": "count",
"analysis_basis": analysis_basis,
"segment": segment,
},
)
return StatisticResultCollection.model_validate([])

@dask.delayed
def subset_metric_table(
Expand All @@ -609,14 +685,27 @@ def subset_metric_table(
discrete_metrics: bool = False,
) -> DataFrame:
"""Pulls the metric data for this segment/analysis basis"""

if not metric_table_name:
return None
query = self._create_subset_metric_table_query(
metric_table_name, segment, summary, analysis_basis, period, discrete_metrics
)

logger.debug(f"subset_metric_table: {metric_table_name}, {summary.metric.name}\n{query}")

results: DataFrame = self.bigquery.execute(query).to_dataframe()
try:
results: DataFrame = self.bigquery.execute(query).to_dataframe()
except GoogleAPICallError as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
"metric": summary.metric.name,
"analysis_basis": analysis_basis,
"segment": segment,
},
)
return None

return results

Expand Down Expand Up @@ -1096,7 +1185,21 @@ def save_statistics(
# logger.error(f"Expected schema: {StatisticResult.bq_schema}")
# logger.error(f"Data received: {segment_results}")
ve = ValueError(error_msg)
logger.exception(
str(ve),
extra={
"experiment": self.config.experiment.normandy_slug,
},
)
raise ve from e
except Exception as e:
logger.exception(
str(e),
extra={
"experiment": self.config.experiment.normandy_slug,
},
)
raise

def run(
self,
Expand Down Expand Up @@ -1325,6 +1428,15 @@ def run(
segment_data, segment, analysis_basis
).model_dump(warnings=False)

# done with analysis_basis: publish metrics view
# bind ensures publish_view runs after the metric table is written
results.append(
bind(
self.publish_view(period, analysis_basis=analysis_basis.value),
metrics_results,
)
)

else:
# convert metric configurations to mozanalysis metrics
summary_metrics: list[Summary] = [
Expand Down Expand Up @@ -1489,17 +1601,15 @@ def run(
period,
).model_dump(warnings=False)

# done with analysis_basis: publish metric view
results.append(
bind(
# done with analysis_basis: publish metrics view for successful metrics only
filtered_dict = _successful_metrics_dict(metrics_results, all_metrics_by_ds)
results.append(
self.publish_view(
period,
analysis_basis=analysis_basis.value,
metrics_dict=all_metrics_by_ds,
),
[metrics_results],
metrics_dict=filtered_dict,
)
)
)

# done with period: save statistics results to table
result = self.save_statistics(
Expand All @@ -1517,8 +1627,20 @@ def run(
)
)

# submit all tasks, and log errors for failed tasks
result_futures = client.compute(results)
client.gather(result_futures) # block until futures have finished
for future in as_completed(result_futures):
if future.status != "error":
continue
try:
future.result()
except Exception:
logger.exception(
"A task failed during analysis with an unexpected exception",
extra={
"experiment": self.config.experiment.normandy_slug,
},
)

def enrollments_query(self, time_limits: TimeLimits, use_glean_ids: bool = False) -> str:
"""Returns the enrollments SQL query."""
Expand Down
Loading
Loading