1919import geocoder .location
2020import typing
2121
22- CO2_SIGNAL_API_ENDPOINT : str = "https://api.co2signal.com/v1/latest"
22+ CO2_SIGNAL_API_ENDPOINT : str = (
23+ "https://api.electricitymap.org/v3/carbon-intensity/latest"
24+ )
2325
2426
2527class CO2SignalData (pydantic .BaseModel ):
2628 datetime : datetime .datetime
2729 carbon_intensity : float
28- fossil_fuel_percentage : float
2930
3031
3132class CO2SignalResponse (pydantic .BaseModel ):
32- disclaimer : str
3333 country_code : str
34- status : str
3534 data : CO2SignalData
3635 carbon_intensity_units : str
3736
3837 @classmethod
3938 def from_json_response (cls , json_response : dict ) -> "CO2SignalResponse" :
40- _data : dict [str , typing .Any ] = json_response ["data" ]
4139 _co2_signal_data = CO2SignalData (
4240 datetime = datetime .datetime .fromisoformat (
43- _data ["datetime" ].replace ("Z" , "+00:00" )
41+ json_response ["datetime" ].replace ("Z" , "+00:00" )
4442 ),
45- carbon_intensity = _data ["carbonIntensity" ],
46- fossil_fuel_percentage = _data ["fossilFuelPercentage" ],
43+ carbon_intensity = json_response ["carbonIntensity" ],
4744 )
4845 return cls (
49- disclaimer = json_response ["_disclaimer" ],
50- country_code = json_response ["countryCode" ],
51- status = json_response ["status" ],
46+ country_code = json_response ["zone" ],
5247 data = _co2_signal_data ,
53- carbon_intensity_units = json_response [ "units" ][ "carbonIntensity" ] ,
48+ carbon_intensity_units = "gCO2e/kWh" ,
5449 )
5550
5651
@@ -82,18 +77,15 @@ def __init__(self, *args, **kwargs) -> None:
8277 co2_api_endpoint : str
8378 endpoint for CO2 signal API
8479 co2_api_token: str
85- RECOMMENDED. The API token for the CO2 Signal API, default is None.
80+ The API token for the ElectricityMaps API, default is None.
8681 timeout : int
8782 timeout for API
8883 """
8984 super ().__init__ (* args , ** kwargs )
9085 self ._logger = logging .getLogger (self .__class__ .__name__ )
9186
9287 if not self .co2_api_token :
93- self ._logger .warning (
94- "⚠️ No API token provided for CO2 Signal, "
95- "use of a token is strongly recommended."
96- )
88+ raise ValueError ("API token is required for ElectricityMaps API." )
9789
9890 self ._get_user_location_info ()
9991
@@ -109,16 +101,14 @@ def _get_user_location_info(self) -> None:
109101 def get (self ) -> CO2SignalResponse :
110102 """Get the current data"""
111103 _params : dict [str , float | str ] = {
112- "lat" : self ._latitude ,
113- "lon" : self ._longitude ,
114- "countryCode" : self ._two_letter_country_code ,
104+ "zone" : self ._two_letter_country_code ,
115105 }
116106
117107 if self .co2_api_token :
118108 _params ["auth-token" ] = self .co2_api_token .get_secret_value ()
119109
120110 self ._logger .debug (f"🍃 Retrieving carbon intensity data for: { _params } " )
121- _response = requests .get (f"{ self .co2_api_endpoint } " , params = _params )
111+ _response = requests .get (f"{ self .co2_api_endpoint } " , headers = _params )
122112
123113 if _response .status_code != http .HTTPStatus .OK :
124114 try :
0 commit comments