Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
137b1fa
add report module and notebook
sam-watttime Jan 22, 2025
7278440
add tests for report module
sam-watttime Jan 22, 2025
f5ce3f0
add .gitkeep for analysis dir
sam-watttime Jan 22, 2025
f1d3f21
update watttime module init
sam-watttime Jan 22, 2025
59c1d8c
add extras to setup.py
sam-watttime Jan 22, 2025
ed2d362
update .gitignore to exclude contents of analysis dir
sam-watttime Jan 22, 2025
c94c399
make report.py a script
sam-watttime Jan 22, 2025
8f1f0b9
add multithreading to historical forecast requests, with rate limitin…
sam-watttime Jan 30, 2025
5760157
Revert "make report.py a script"
sam-watttime Jan 30, 2025
29b9fd3
Revert "update .gitignore to exclude contents of analysis dir"
sam-watttime Jan 30, 2025
00be9b2
Revert "add extras to setup.py"
sam-watttime Jan 30, 2025
ef367a1
Revert "update watttime module init"
sam-watttime Jan 30, 2025
3b26e13
Revert "add .gitkeep for analysis dir"
sam-watttime Jan 30, 2025
cb7a204
Revert "add tests for report module"
sam-watttime Jan 30, 2025
d49526e
Revert "add report module and notebook"
sam-watttime Jan 30, 2025
ac1d434
add bool switch for forecast historical multithreading
sam-watttime Feb 4, 2025
4e6c1f3
add test for multithreaded code path
sam-watttime Feb 4, 2025
820d143
remove unnecessary print statement
sam-watttime Feb 4, 2025
257dbe9
remove processing statement
sam-watttime Feb 4, 2025
9a525f1
abstract out mulitthreading into mixin; add methods to pull list of (…
sam-watttime Feb 11, 2025
19615a4
bump setup.py preparing for release v1.3.3
sam-watttime Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
setup(
name="watttime",
description="An officially maintained python client for WattTime's API providing access to electricity grid emissions data.",
long_description=open('README.md').read(),
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
version="v1.3.2",
version="v1.3.3",
packages=["watttime"],
python_requires=">=3.8",
install_requires=["requests", "pandas>1.0.0", "holidays", "python-dateutil"],
Expand Down
56 changes: 51 additions & 5 deletions tests/test_sdk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
import unittest.mock as mock
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
from dateutil.parser import parse
from pytz import timezone, UTC
import os
Expand All @@ -9,12 +9,11 @@
WattTimeHistorical,
WattTimeMyAccess,
WattTimeForecast,
WattTimeMaps
WattTimeMaps,
)
from pathlib import Path

import pandas as pd
import requests

REGION = "CAISO_NORTH"

Expand Down Expand Up @@ -295,6 +294,7 @@ def test_access_pandas(self):
class TestWattTimeForecast(unittest.TestCase):
def setUp(self):
self.forecast = WattTimeForecast()
self.forecast_mt = WattTimeForecast(multithreaded=True)

def test_get_current_json(self):
json = self.forecast.get_forecast_json(region=REGION)
Expand All @@ -313,23 +313,69 @@ def test_get_current_pandas(self):
self.assertIn("value", df.columns)

def test_historical_forecast_jsons(self):
start = "2023-01-01 00:00Z"
end = "2023-01-03 00:00Z"
start = "2024-01-01 00:00Z"
end = "2024-01-07 00:00Z"
json_list = self.forecast.get_historical_forecast_json(
start, end, region=REGION
)
first_json = json_list[0]
self.assertIsInstance(json_list, list)
self.assertIn("meta", first_json)
self.assertEqual(len(first_json["data"]), 288)
self.assertIn("generated_at", first_json["data"][0])

def test_historical_forecast_jsons_multithreaded(self):
start = "2024-01-01 00:00Z"
end = "2024-01-30 00:00Z"
json_list = self.forecast_mt.get_historical_forecast_json(
start, end, region=REGION
)
first_json = json_list[0]
self.assertIsInstance(json_list, list)
self.assertIn("meta", first_json)
self.assertEqual(len(first_json["data"]), 288)
self.assertIn("generated_at", first_json["data"][0])

def test_historical_forecast_json_list(self):
list_of_dates = [date(2024, 1, 1), date(2024, 2, 1), date(2024, 3, 1)]
json_list = self.forecast.get_historical_forecast_json_list(
list_of_dates, region=REGION
)

# Expect one JSON response per date provided.
self.assertIsInstance(json_list, list)
self.assertEqual(len(json_list), len(list_of_dates))

# Verify the structure of each JSON response.
for j in json_list:
self.assertIsInstance(j, dict)
self.assertIn("meta", j)
self.assertIn("data", j)
# If data is returned, check for the expected fields.
if j["data"]:
self.assertIn("generated_at", j["data"][0])

gen_ats = set([parse(j["data"][0]["generated_at"]).date() for j in json_list])
assert gen_ats == set(list_of_dates)

def test_historical_forecast_pandas(self):
start = "2023-01-01 00:00Z"
end = "2023-01-03 00:00Z"
df = self.forecast.get_historical_forecast_pandas(start, end, region=REGION)
self.assertIsInstance(df, pd.DataFrame)
self.assertGreaterEqual(len(df), 1)
self.assertIn("point_time", df.columns)
self.assertIn("value", df.columns)
self.assertIn("generated_at", df.columns)

def test_historical_forecast_pandas_list(self):
"""
Test the new method get_historical_forecast_pandas_list which accepts a list of dates.
"""
list_of_dates = [date(2024, 1, 1), date(2024, 2, 1)]
df = self.forecast.get_historical_forecast_pandas_list(
list_of_dates, region=REGION
)
self.assertIsInstance(df, pd.DataFrame)
self.assertGreaterEqual(len(df), 1)
self.assertIn("point_time", df.columns)
Expand Down
3 changes: 2 additions & 1 deletion watttime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from watttime.api import *
from watttime.tcy import TCYCalculator
from watttime.tcy import TCYCalculator
from watttime.util import RateLimitedRequesterMixin
178 changes: 130 additions & 48 deletions watttime/api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import os
import time
from datetime import date, datetime, timedelta
import threading
from datetime import date, time, datetime, timedelta
from functools import cache
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import requests
from dateutil.parser import parse
from pytz import UTC

from watttime.util import RateLimitedRequesterMixin


class WattTimeBase:
url_base = "https://api.watttime.org"
Expand Down Expand Up @@ -368,7 +372,37 @@ def get_access_pandas(self) -> pd.DataFrame:
return out


class WattTimeForecast(WattTimeBase):
class WattTimeForecast(WattTimeBase, RateLimitedRequesterMixin):
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
multithreaded: bool = False,
):
super().__init__(username=username, password=password)
RateLimitedRequesterMixin.__init__(self)
self.multithreaded = multithreaded

def _parse_historical_forecast_json(
self, json_list: List[Dict[str, Any]]
) -> pd.DataFrame:
"""
Parses the JSON response from the historical forecast API into a pandas DataFrame.

Args:
json_list (List[Dict[str, Any]]): A list of JSON responses from the API.

Returns:
pd.DataFrame: A pandas DataFrame containing the parsed historical forecast data.
"""
out = pd.DataFrame()
for json in json_list:
for entry in json.get("data", []):
_df = pd.json_normalize(entry, record_path=["forecast"])
_df = _df.assign(generated_at=pd.to_datetime(entry["generated_at"]))
out = pd.concat([out, _df], ignore_index=True)
return out

def get_forecast_json(
self,
region: str,
Expand Down Expand Up @@ -450,59 +484,86 @@ def get_historical_forecast_json(
] = "co2_moer",
model: Optional[Union[str, date]] = None,
horizon_hours: int = 24,
) -> List[Dict[str, Any]]:
if not self._is_token_valid():
self._login()

url = f"{self.url_base}/v3/forecast/historical"
headers = {"Authorization": f"Bearer {self.token}"}
params = {
"region": region,
"signal_type": signal_type,
"horizon_hours": horizon_hours,
}

start, end = self._parse_dates(start, end)
chunks = self._get_chunks(start, end, chunk_size=timedelta(days=1))

if model is not None:
params["model"] = model

param_chunks = [{**params, "start": c[0], "end": c[1]} for c in chunks]

if self.multithreaded:
return self._fetch_data_multithreaded(url, headers, param_chunks)
else:
return [
self._make_rate_limited_request(url, headers, p) for p in param_chunks
]

def get_historical_forecast_json_list(
self,
list_of_dates: List[date],
region: str,
signal_type: Optional[
Literal["co2_moer", "co2_aoer", "health_damage"]
] = "co2_moer",
model: Optional[Union[str, date]] = None,
horizon_hours: int = 24,
) -> List[Dict[str, Any]]:
"""
Retrieves the historical forecast data from the API as a list of dictionaries.
Fetches historical forecast data for a list of specific dates.

Args:
start (Union[str, datetime]): The start date of the historical forecast. Can be a string or a datetime object.
end (Union[str, datetime]): The end date of the historical forecast. Can be a string or a datetime object.
region (str): The region for which to retrieve the forecast data.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]]): The type of signal to retrieve. Defaults to "co2_moer".
model (Optional[Union[str, date]]): The date of the model version to use. Defaults to None.
horizon_hours (int, optional): The number of hours to forecast. Defaults to 24. Minimum of 0 provides a "nowcast" created with the forecast, maximum of 72.
list_of_dates (List[date]): A list of dates to retrieve historical forecasts for.
region (str): The region for which the forecast is needed.
signal_type (Optional[str]): The type of signal ("co2_moer", "co2_aoer", or "health_damage").
model (Optional[Union[str, date]]): Model version or date.
horizon_hours (int): Forecast horizon in hours.

Returns:
List[Dict[str, Any]]: A list of dictionaries representing the forecast data.

Raises:
Exception: If there is an API response error.
List[Dict[str, Any]]: A list of JSON responses for each requested date.
"""
if not self._is_token_valid():
self._login()
url = "{}/v3/forecast/historical".format(self.url_base)
headers = {"Authorization": "Bearer " + self.token}
responses = []

url = f"{self.url_base}/v3/forecast/historical"
headers = {"Authorization": f"Bearer {self.token}"}
params = {
"region": region,
"signal_type": signal_type,
"horizon_hours": horizon_hours,
}

start, end = self._parse_dates(start, end)
chunks = self._get_chunks(start, end, chunk_size=timedelta(days=1))

# No model will default to the most recent model version available
if model is not None:
params["model"] = model

for c in chunks:
params["start"], params["end"] = c
rsp = requests.get(url, headers=headers, params=params)
try:
rsp.raise_for_status()
j = rsp.json()
responses.append(j)
except Exception as e:
raise Exception(
f"\nAPI Response Error: {rsp.status_code}, {rsp.text} [{rsp.headers.get('x-request-id')}]"
)

if len(j["meta"]["warnings"]):
print("\n", "Warnings Returned:", params, j["meta"])
time.sleep(1) # avoid rate limit

return responses
param_chunks = [
# add timezone to dates
{
**params,
"start": datetime.combine(d, time(0, 0)).isoformat() + "Z",
"end": datetime.combine(d, time(23, 59)).isoformat() + "Z",
}
for d in list_of_dates
]

if self.multithreaded:
return self._fetch_data_multithreaded(url, headers, param_chunks)
else:
return [
self._make_rate_limited_request(url, headers, p) for p in param_chunks
]

def get_historical_forecast_pandas(
self,
Expand All @@ -522,24 +583,45 @@ def get_historical_forecast_pandas(
start (Union[str, datetime]): The start date or datetime for the historical forecast.
end (Union[str, datetime]): The end date or datetime for the historical forecast.
region (str): The region for which the historical forecast data is retrieved.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]], optional):
The type of signal for the historical forecast data. Defaults to "co2_moer".
model (Optional[Union[str, date]], optional): The model date for the historical forecast data. Defaults to None.
horizon_hours (int, optional): The number of hours to forecast. Defaults to 24. Minimum of 0 provides a "nowcast" created with the forecast, maximum of 72.
signal_type (Optional[str]): The type of signal for the historical forecast data.
model (Optional[Union[str, date]]): The model date for the historical forecast data.
horizon_hours (int): The number of hours to forecast.

Returns:
pd.DataFrame: A pandas DataFrame containing the historical forecast data.
"""
json_list = self.get_historical_forecast_json(
start, end, region, signal_type, model, horizon_hours
)
out = pd.DataFrame()
for json in json_list:
for entry in json["data"]:
_df = pd.json_normalize(entry, record_path=["forecast"])
_df = _df.assign(generated_at=pd.to_datetime(entry["generated_at"]))
out = pd.concat([out, _df])
return out
return self._parse_historical_forecast_json(json_list)

def get_historical_forecast_pandas_list(
self,
list_of_dates: List[date],
region: str,
signal_type: Optional[
Literal["co2_moer", "co2_aoer", "health_damage"]
] = "co2_moer",
model: Optional[Union[str, date]] = None,
horizon_hours: int = 24,
) -> pd.DataFrame:
"""
Retrieves the historical forecast data for a list of specific dates as a pandas DataFrame.

Args:
list_of_dates (List[date]): A list of dates to retrieve historical forecasts for.
region (str): The region for which the forecast is needed.
signal_type (Optional[str]): The type of signal.
model (Optional[Union[str, date]]): The model version or date.
horizon_hours (int): Forecast horizon in hours.

Returns:
pd.DataFrame: A pandas DataFrame containing the historical forecast data.
"""
json_list = self.get_historical_forecast_json_list(
list_of_dates, region, signal_type, model, horizon_hours
)
return self._parse_historical_forecast_json(json_list)


class WattTimeMaps(WattTimeBase):
Expand Down
Loading