diff --git a/setup.py b/setup.py index 89710b7b..25977d60 100644 --- a/setup.py +++ b/setup.py @@ -3,9 +3,9 @@ setup( name="watttime", description="An officially maintained python client for WattTime's API providing access to electricity grid emissions data.", - long_description=open('README.md').read(), + long_description=open("README.md").read(), long_description_content_type="text/markdown", - version="v1.3.2", + version="v1.3.3", packages=["watttime"], python_requires=">=3.8", install_requires=["requests", "pandas>1.0.0", "holidays", "python-dateutil"], diff --git a/tests/test_sdk.py b/tests/test_sdk.py index bef46fab..e3df9e4c 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -1,6 +1,6 @@ import unittest import unittest.mock as mock -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date from dateutil.parser import parse from pytz import timezone, UTC import os @@ -9,12 +9,11 @@ WattTimeHistorical, WattTimeMyAccess, WattTimeForecast, - WattTimeMaps + WattTimeMaps, ) from pathlib import Path import pandas as pd -import requests REGION = "CAISO_NORTH" @@ -295,6 +294,7 @@ def test_access_pandas(self): class TestWattTimeForecast(unittest.TestCase): def setUp(self): self.forecast = WattTimeForecast() + self.forecast_mt = WattTimeForecast(multithreaded=True) def test_get_current_json(self): json = self.forecast.get_forecast_json(region=REGION) @@ -313,23 +313,69 @@ def test_get_current_pandas(self): self.assertIn("value", df.columns) def test_historical_forecast_jsons(self): - start = "2023-01-01 00:00Z" - end = "2023-01-03 00:00Z" + start = "2024-01-01 00:00Z" + end = "2024-01-07 00:00Z" json_list = self.forecast.get_historical_forecast_json( start, end, region=REGION ) first_json = json_list[0] + self.assertIsInstance(json_list, list) + self.assertIn("meta", first_json) + self.assertEqual(len(first_json["data"]), 288) + self.assertIn("generated_at", first_json["data"][0]) + def test_historical_forecast_jsons_multithreaded(self): + start = "2024-01-01 00:00Z" + end = "2024-01-30 00:00Z" + json_list = self.forecast_mt.get_historical_forecast_json( + start, end, region=REGION + ) + first_json = json_list[0] self.assertIsInstance(json_list, list) self.assertIn("meta", first_json) self.assertEqual(len(first_json["data"]), 288) self.assertIn("generated_at", first_json["data"][0]) + def test_historical_forecast_json_list(self): + list_of_dates = [date(2024, 1, 1), date(2024, 2, 1), date(2024, 3, 1)] + json_list = self.forecast.get_historical_forecast_json_list( + list_of_dates, region=REGION + ) + + # Expect one JSON response per date provided. + self.assertIsInstance(json_list, list) + self.assertEqual(len(json_list), len(list_of_dates)) + + # Verify the structure of each JSON response. + for j in json_list: + self.assertIsInstance(j, dict) + self.assertIn("meta", j) + self.assertIn("data", j) + # If data is returned, check for the expected fields. + if j["data"]: + self.assertIn("generated_at", j["data"][0]) + + gen_ats = set([parse(j["data"][0]["generated_at"]).date() for j in json_list]) + assert gen_ats == set(list_of_dates) + def test_historical_forecast_pandas(self): start = "2023-01-01 00:00Z" end = "2023-01-03 00:00Z" df = self.forecast.get_historical_forecast_pandas(start, end, region=REGION) + self.assertIsInstance(df, pd.DataFrame) + self.assertGreaterEqual(len(df), 1) + self.assertIn("point_time", df.columns) + self.assertIn("value", df.columns) + self.assertIn("generated_at", df.columns) + def test_historical_forecast_pandas_list(self): + """ + Test the new method get_historical_forecast_pandas_list which accepts a list of dates. + """ + list_of_dates = [date(2024, 1, 1), date(2024, 2, 1)] + df = self.forecast.get_historical_forecast_pandas_list( + list_of_dates, region=REGION + ) self.assertIsInstance(df, pd.DataFrame) self.assertGreaterEqual(len(df), 1) self.assertIn("point_time", df.columns) diff --git a/watttime/__init__.py b/watttime/__init__.py index 0cdb9642..15b87ab0 100644 --- a/watttime/__init__.py +++ b/watttime/__init__.py @@ -1,2 +1,3 @@ from watttime.api import * -from watttime.tcy import TCYCalculator \ No newline at end of file +from watttime.tcy import TCYCalculator +from watttime.util import RateLimitedRequesterMixin \ No newline at end of file diff --git a/watttime/api.py b/watttime/api.py index bc5b18c0..4d451dff 100644 --- a/watttime/api.py +++ b/watttime/api.py @@ -1,15 +1,19 @@ import os import time -from datetime import date, datetime, timedelta +import threading +from datetime import date, time, datetime, timedelta from functools import cache from pathlib import Path from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd import requests from dateutil.parser import parse from pytz import UTC +from watttime.util import RateLimitedRequesterMixin + class WattTimeBase: url_base = "https://api.watttime.org" @@ -368,7 +372,37 @@ def get_access_pandas(self) -> pd.DataFrame: return out -class WattTimeForecast(WattTimeBase): +class WattTimeForecast(WattTimeBase, RateLimitedRequesterMixin): + def __init__( + self, + username: Optional[str] = None, + password: Optional[str] = None, + multithreaded: bool = False, + ): + super().__init__(username=username, password=password) + RateLimitedRequesterMixin.__init__(self) + self.multithreaded = multithreaded + + def _parse_historical_forecast_json( + self, json_list: List[Dict[str, Any]] + ) -> pd.DataFrame: + """ + Parses the JSON response from the historical forecast API into a pandas DataFrame. + + Args: + json_list (List[Dict[str, Any]]): A list of JSON responses from the API. + + Returns: + pd.DataFrame: A pandas DataFrame containing the parsed historical forecast data. + """ + out = pd.DataFrame() + for json in json_list: + for entry in json.get("data", []): + _df = pd.json_normalize(entry, record_path=["forecast"]) + _df = _df.assign(generated_at=pd.to_datetime(entry["generated_at"])) + out = pd.concat([out, _df], ignore_index=True) + return out + def get_forecast_json( self, region: str, @@ -450,59 +484,86 @@ def get_historical_forecast_json( ] = "co2_moer", model: Optional[Union[str, date]] = None, horizon_hours: int = 24, + ) -> List[Dict[str, Any]]: + if not self._is_token_valid(): + self._login() + + url = f"{self.url_base}/v3/forecast/historical" + headers = {"Authorization": f"Bearer {self.token}"} + params = { + "region": region, + "signal_type": signal_type, + "horizon_hours": horizon_hours, + } + + start, end = self._parse_dates(start, end) + chunks = self._get_chunks(start, end, chunk_size=timedelta(days=1)) + + if model is not None: + params["model"] = model + + param_chunks = [{**params, "start": c[0], "end": c[1]} for c in chunks] + + if self.multithreaded: + return self._fetch_data_multithreaded(url, headers, param_chunks) + else: + return [ + self._make_rate_limited_request(url, headers, p) for p in param_chunks + ] + + def get_historical_forecast_json_list( + self, + list_of_dates: List[date], + region: str, + signal_type: Optional[ + Literal["co2_moer", "co2_aoer", "health_damage"] + ] = "co2_moer", + model: Optional[Union[str, date]] = None, + horizon_hours: int = 24, ) -> List[Dict[str, Any]]: """ - Retrieves the historical forecast data from the API as a list of dictionaries. + Fetches historical forecast data for a list of specific dates. Args: - start (Union[str, datetime]): The start date of the historical forecast. Can be a string or a datetime object. - end (Union[str, datetime]): The end date of the historical forecast. Can be a string or a datetime object. - region (str): The region for which to retrieve the forecast data. - signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]]): The type of signal to retrieve. Defaults to "co2_moer". - model (Optional[Union[str, date]]): The date of the model version to use. Defaults to None. - horizon_hours (int, optional): The number of hours to forecast. Defaults to 24. Minimum of 0 provides a "nowcast" created with the forecast, maximum of 72. + list_of_dates (List[date]): A list of dates to retrieve historical forecasts for. + region (str): The region for which the forecast is needed. + signal_type (Optional[str]): The type of signal ("co2_moer", "co2_aoer", or "health_damage"). + model (Optional[Union[str, date]]): Model version or date. + horizon_hours (int): Forecast horizon in hours. Returns: - List[Dict[str, Any]]: A list of dictionaries representing the forecast data. - - Raises: - Exception: If there is an API response error. + List[Dict[str, Any]]: A list of JSON responses for each requested date. """ if not self._is_token_valid(): self._login() - url = "{}/v3/forecast/historical".format(self.url_base) - headers = {"Authorization": "Bearer " + self.token} - responses = [] + + url = f"{self.url_base}/v3/forecast/historical" + headers = {"Authorization": f"Bearer {self.token}"} params = { "region": region, "signal_type": signal_type, "horizon_hours": horizon_hours, } - start, end = self._parse_dates(start, end) - chunks = self._get_chunks(start, end, chunk_size=timedelta(days=1)) - - # No model will default to the most recent model version available if model is not None: params["model"] = model - for c in chunks: - params["start"], params["end"] = c - rsp = requests.get(url, headers=headers, params=params) - try: - rsp.raise_for_status() - j = rsp.json() - responses.append(j) - except Exception as e: - raise Exception( - f"\nAPI Response Error: {rsp.status_code}, {rsp.text} [{rsp.headers.get('x-request-id')}]" - ) - - if len(j["meta"]["warnings"]): - print("\n", "Warnings Returned:", params, j["meta"]) - time.sleep(1) # avoid rate limit - - return responses + param_chunks = [ + # add timezone to dates + { + **params, + "start": datetime.combine(d, time(0, 0)).isoformat() + "Z", + "end": datetime.combine(d, time(23, 59)).isoformat() + "Z", + } + for d in list_of_dates + ] + + if self.multithreaded: + return self._fetch_data_multithreaded(url, headers, param_chunks) + else: + return [ + self._make_rate_limited_request(url, headers, p) for p in param_chunks + ] def get_historical_forecast_pandas( self, @@ -522,10 +583,9 @@ def get_historical_forecast_pandas( start (Union[str, datetime]): The start date or datetime for the historical forecast. end (Union[str, datetime]): The end date or datetime for the historical forecast. region (str): The region for which the historical forecast data is retrieved. - signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]], optional): - The type of signal for the historical forecast data. Defaults to "co2_moer". - model (Optional[Union[str, date]], optional): The model date for the historical forecast data. Defaults to None. - horizon_hours (int, optional): The number of hours to forecast. Defaults to 24. Minimum of 0 provides a "nowcast" created with the forecast, maximum of 72. + signal_type (Optional[str]): The type of signal for the historical forecast data. + model (Optional[Union[str, date]]): The model date for the historical forecast data. + horizon_hours (int): The number of hours to forecast. Returns: pd.DataFrame: A pandas DataFrame containing the historical forecast data. @@ -533,13 +593,35 @@ def get_historical_forecast_pandas( json_list = self.get_historical_forecast_json( start, end, region, signal_type, model, horizon_hours ) - out = pd.DataFrame() - for json in json_list: - for entry in json["data"]: - _df = pd.json_normalize(entry, record_path=["forecast"]) - _df = _df.assign(generated_at=pd.to_datetime(entry["generated_at"])) - out = pd.concat([out, _df]) - return out + return self._parse_historical_forecast_json(json_list) + + def get_historical_forecast_pandas_list( + self, + list_of_dates: List[date], + region: str, + signal_type: Optional[ + Literal["co2_moer", "co2_aoer", "health_damage"] + ] = "co2_moer", + model: Optional[Union[str, date]] = None, + horizon_hours: int = 24, + ) -> pd.DataFrame: + """ + Retrieves the historical forecast data for a list of specific dates as a pandas DataFrame. + + Args: + list_of_dates (List[date]): A list of dates to retrieve historical forecasts for. + region (str): The region for which the forecast is needed. + signal_type (Optional[str]): The type of signal. + model (Optional[Union[str, date]]): The model version or date. + horizon_hours (int): Forecast horizon in hours. + + Returns: + pd.DataFrame: A pandas DataFrame containing the historical forecast data. + """ + json_list = self.get_historical_forecast_json_list( + list_of_dates, region, signal_type, model, horizon_hours + ) + return self._parse_historical_forecast_json(json_list) class WattTimeMaps(WattTimeBase): diff --git a/watttime/util.py b/watttime/util.py new file mode 100644 index 00000000..876271df --- /dev/null +++ b/watttime/util.py @@ -0,0 +1,88 @@ +import threading +import time +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, Any, List +import requests + + +class RateLimitedRequesterMixin: + """ + Mixin to handle rate-limited multi-threaded requests. + """ + + def __init__(self, rate_limit: int = 10): + """ + Args: + rate_limit (int): Maximum number of requests per second. + """ + self._rate_limit_lock = threading.Lock() + self._last_request_times = [] + self.rate_limit = rate_limit + + def _make_rate_limited_request( + self, url: str, headers: Dict[str, str], params: Dict[str, Any] + ) -> Dict: + """ + Makes an API request with rate limiting. + + Args: + url (str): API endpoint URL. + headers (Dict[str, str]): Request headers. + params (Dict[str, Any]): Query parameters. + + Returns: + Dict: The JSON response. + """ + while True: + with self._rate_limit_lock: + current_time = time.time() + self._last_request_times = [ + t for t in self._last_request_times if current_time - t < 1.0 + ] + + if len(self._last_request_times) < self.rate_limit: + self._last_request_times.append(current_time) + break + + time.sleep(0.1) + + rsp = requests.get(url, headers=headers, params=params) + rsp.raise_for_status() + return rsp.json() + + def _fetch_data_multithreaded( + self, url: str, headers: Dict[str, str], param_chunks: List[Dict[str, Any]] + ) -> List[Dict]: + """ + Fetch data using multithreading with rate limiting. + + Args: + url (str): API endpoint URL. + headers (Dict[str, str]): Request headers. + param_chunks (List[Dict[str, Any]]): List of parameter sets. + + Returns: + List[Dict]: A list of JSON responses. + """ + responses = [] + with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: + futures = { + executor.submit( + self._make_rate_limited_request, url, headers, params + ): params + for params in param_chunks + } + + for future in as_completed(futures): + try: + responses.append(future.result()) + except Exception as e: + if hasattr(e, "response"): + raise Exception( + f"\nAPI Response Error: {e.response.status_code}, {e.response.text} " + f"[{e.response.headers.get('x-request-id')}]" + ) + raise + + return responses