Skip to content
Merged
126 changes: 103 additions & 23 deletions watttime/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,33 @@ class WattTimeBase:
url_base = "https://api.watttime.org"

def __init__(self, username: Optional[str] = None, password: Optional[str] = None):
"""
Initializes a new instance of the class.

Parameters:
username (Optional[str]): The username to use for authentication. If not provided, the value will be retrieved from the environment variable "WATTTIME_USER".
password (Optional[str]): The password to use for authentication. If not provided, the value will be retrieved from the environment variable "WATTTIME_PASSWORD".
"""
self.username = username or os.getenv("WATTTIME_USER")
self.password = password or os.getenv("WATTTIME_PASSWORD")
self.token = None
self.token_valid_until = None

def _login(self):
"""Login to the WattTime API, which provides a JWT valid for 30 minutes."""
"""
Login to the WattTime API, which provides a JWT valid for 30 minutes

Raises:
Exception: If the login fails and the credentials are incorrect.
"""

url = f"{self.url_base}/login"
rsp = requests.get(
url,
auth=requests.auth.HTTPBasicAuth(self.username, self.password),
timeout=20,
)
rsp.raise_for_status()
self.token = rsp.json().get("token", None)
self.token_valid_until = datetime.now() + timedelta(minutes=30)
if not self.token:
Expand All @@ -41,6 +55,16 @@ def _is_token_valid(self) -> bool:
def _parse_dates(
self, start: Union[str, datetime], end: Union[str, datetime]
) -> Tuple[datetime, datetime]:
"""
Parse the given start and end dates.

Args:
start (Union[str, datetime]): The start date to parse. It can be either a string or a datetime object.
end (Union[str, datetime]): The end date to parse. It can be either a string or a datetime object.

Returns:
Tuple[datetime, datetime]: A tuple containing the parsed start and end dates as datetime objects.
"""
if isinstance(start, str):
start = parse(start)
if isinstance(end, str):
Expand All @@ -61,7 +85,17 @@ def _parse_dates(
def _get_chunks(
self, start: datetime, end: datetime, chunk_size: timedelta = timedelta(days=30)
) -> List[Tuple[datetime, datetime]]:
"""Internal function turns a start and end into 30-day chunks"""
"""
Generate a list of tuples representing chunks of time within a given time range.

Args:
start (datetime): The start datetime of the time range.
end (datetime): The end datetime of the time range.
chunk_size (timedelta, optional): The size of each chunk. Defaults to timedelta(days=30).

Returns:
List[Tuple[datetime, datetime]]: A list of tuples representing the chunks of time.
"""
chunks = []
while start < end:
chunk_end = min(end, start + chunk_size)
Expand All @@ -72,10 +106,18 @@ def _get_chunks(
chunks = [(s, e - timedelta(minutes=5)) for s, e in chunks[0:-1]] + [chunks[-1]]
return chunks

def register(
self, email: str, organization: Optional[str] = None
) -> requests.Response:
"""Register for the WattTime API, if you do not already have an account."""
def register(self, email: str, organization: Optional[str] = None) -> None:
"""
Register a user with the given email and organization.

Parameters:
email (str): The email of the user.
organization (Optional[str], optional): The organization the user belongs to. Defaults to None.

Returns:
None: An error will be raised if registration was unsuccessful.
"""

url = f"{self.url_base}/register"
params = {
"username": self.username,
Expand Down Expand Up @@ -122,6 +164,7 @@ def region_from_loc(
"signal_type": signal_type,
}
rsp = requests.get(url, headers=headers, params=params)
rsp.raise_for_status()
return rsp.json()


Expand Down Expand Up @@ -170,10 +213,11 @@ def get_historical_jsons(
for c in chunks:
params["start"], params["end"] = c
rsp = requests.get(url, headers=headers, params=params)
if rsp.status_code == 200:
try:
rsp.raise_for_status()
j = rsp.json()
responses.append(j)
else:
except Exception as e:
raise Exception(f"\nAPI Response Error: {rsp.status_code}, {rsp.text}")

if len(j["meta"]["warnings"]):
Expand All @@ -192,7 +236,8 @@ def get_historical_pandas(
model_date: Optional[Union[str, date]] = None,
include_meta: bool = False,
):
"""Return a pd.DataFrame with point_time, and values.
"""
Return a pd.DataFrame with point_time, and values.

Args:
See .get_hist_jsons() for shared arguments.
Expand Down Expand Up @@ -222,18 +267,18 @@ def get_historical_csv(
model_date: Optional[Union[str, date]] = None,
):
"""
Retrieves historical data from a specified start date to an end date and saves it as a CSV file.
CSV naming scheme is like "CAISO_NORTH_co2_moer_2022-01-01_2022-01-07.csv"
+
Args:
start (Union[str, datetime]): The start date for retrieving historical data. It can be a string in the format "YYYY-MM-DD" or a datetime object.
end (Union[str, datetime]): The end date for retrieving historical data. It can be a string in the format "YYYY-MM-DD" or a datetime object.
region (str): The region for which historical data is requested.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]]): The type of signal for which historical data is requested. Default is "co2_moer".
model_date (Optional[Union[str, date]]): The date of the model for which historical data is requested. It can be a string in the format "YYYY-MM-DD" or a date object. Default is None.
Retrieves historical data from a specified start date to an end date and saves it as a CSV file.
CSV naming scheme is like "CAISO_NORTH_co2_moer_2022-01-01_2022-01-07.csv"

Returns:
None
Args:
start (Union[str, datetime]): The start date for retrieving historical data. It can be a string in the format "YYYY-MM-DD" or a datetime object.
end (Union[str, datetime]): The end date for retrieving historical data. It can be a string in the format "YYYY-MM-DD" or a datetime object.
region (str): The region for which historical data is requested.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]]): The type of signal for which historical data is requested. Default is "co2_moer".
model_date (Optional[Union[str, date]]): The date of the model for which historical data is requested. It can be a string in the format "YYYY-MM-DD" or a date object. Default is None.

Returns:
None, results are saved to a csv file in the user's home directory.
"""
df = self.get_historical_pandas(start, end, region, signal_type, model_date)

Expand Down Expand Up @@ -263,6 +308,7 @@ def get_access_json(self) -> Dict:
url = "{}/v3/my-access".format(self.url_base)
headers = {"Authorization": "Bearer " + self.token}
rsp = requests.get(url, headers=headers)
rsp.raise_for_status()
return rsp.json()

def get_access_pandas(self) -> pd.DataFrame:
Expand Down Expand Up @@ -334,6 +380,7 @@ def get_forecast_json(
url = "{}/v3/forecast".format(self.url_base)
headers = {"Authorization": "Bearer " + self.token}
rsp = requests.get(url, headers=headers, params=params)
rsp.raise_for_status()
return rsp.json()

def get_forecast_pandas(
Expand All @@ -345,7 +392,8 @@ def get_forecast_pandas(
model_date: Optional[Union[str, date]] = None,
include_meta: bool = False,
) -> pd.DataFrame:
"""Return a pd.DataFrame with point_time, and values.
"""
Return a pd.DataFrame with point_time, and values.

Args:
See .get_forecast_json() for shared arguments.
Expand All @@ -371,6 +419,22 @@ def get_historical_forecast_json(
] = "co2_moer",
model_date: Optional[Union[str, date]] = None,
) -> List[Dict[str, Any]]:
"""
Retrieves the historical forecast data from the API as a list of dictionaries.

Args:
start (Union[str, datetime]): The start date of the historical forecast. Can be a string or a datetime object.
end (Union[str, datetime]): The end date of the historical forecast. Can be a string or a datetime object.
region (str): The region for which to retrieve the forecast data.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]]): The type of signal to retrieve. Defaults to "co2_moer".
model_date (Optional[Union[str, date]]): The date of the model version to use. Defaults to None.

Returns:
List[Dict[str, Any]]: A list of dictionaries representing the forecast data.

Raises:
Exception: If there is an API response error.
"""
if not self._is_token_valid():
self._login()
url = "{}/v3/forecast/historical".format(self.url_base)
Expand All @@ -388,10 +452,11 @@ def get_historical_forecast_json(
for c in chunks:
params["start"], params["end"] = c
rsp = requests.get(url, headers=headers, params=params)
if rsp.status_code == 200:
try:
rsp.raise_for_status()
j = rsp.json()
responses.append(j)
else:
except Exception as e:
raise Exception(f"\nAPI Response Error: {rsp.status_code}, {rsp.text}")

if len(j["meta"]["warnings"]):
Expand All @@ -410,6 +475,20 @@ def get_historical_forecast_pandas(
] = "co2_moer",
model_date: Optional[Union[str, date]] = None,
) -> pd.DataFrame:
"""
Retrieves the historical forecast data as a pandas DataFrame.

Args:
start (Union[str, datetime]): The start date or datetime for the historical forecast.
end (Union[str, datetime]): The end date or datetime for the historical forecast.
region (str): The region for which the historical forecast data is retrieved.
signal_type (Optional[Literal["co2_moer", "co2_aoer", "health_damage"]], optional):
The type of signal for the historical forecast data. Defaults to "co2_moer".
model_date (Optional[Union[str, date]], optional): The model date for the historical forecast data. Defaults to None.

Returns:
pd.DataFrame: A pandas DataFrame containing the historical forecast data.
"""
json_list = self.get_historical_forecast_json(
start, end, region, signal_type, model_date
)
Expand Down Expand Up @@ -446,4 +525,5 @@ def get_maps_json(
headers = {"Authorization": "Bearer " + self.token}
params = {"signal_type": signal_type}
rsp = requests.get(url, headers=headers, params=params)
rsp.raise_for_status()
return rsp.json()