diff --git a/.gitignore b/.gitignore index 68bc17f9..f875d659 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index f63cdc45..2d2801ec 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ We provide two methods to access location data: ```python from watttime import WattTimeMaps -wt = WattTimeMaps(username, password) +wt = WattTimeMaps() # get BA region for a given location wt.region_from_loc( diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py new file mode 100644 index 00000000..2974dd6a --- /dev/null +++ b/tests/test_optimizer.py @@ -0,0 +1,963 @@ +import os +from datetime import datetime, timedelta +import unittest +import pandas as pd +from pytz import UTC +import pytz +from watttime.api import RecalculatingWattTimeOptimizer, WattTimeOptimizer, WattTimeForecast, RecalculatingWattTimeOptimizerWithContiguity + + +def get_usage_plan_mean_power(usage_plan): + usage_plan_when_active = usage_plan[usage_plan["usage"] != 0].copy() + usage_plan_when_active["power_kw"] = ( + usage_plan_when_active["energy_usage_mwh"] + / (usage_plan_when_active["usage"] / 60) + * 1000 + ) + + return usage_plan_when_active["power_kw"].mean() + + +def get_contiguity_info(usage_plan): + """ + Extract contiguous non-zero components from a DataFrame column 'usage' + and compute the sum for each component. + + Args: + usage_plan (pd.DataFrame): DataFrame with a column named 'usage'. + + Returns: + List[Dict]: A list of dictionaries, each containing the indices and sum + of a contiguous non-zero component. + """ + components = [] + current_component = [] + current_sum = 0 + + for index, value in usage_plan["usage"].items(): + if value != 0: + current_component.append(index) + current_sum += value + else: + if current_component: + components.append({"indices": current_component, "sum": current_sum}) + current_component = [] + current_sum = 0 + + # Add the last component if the dataframe ends with a non-zero sequence + if current_component: + components.append({"indices": current_component, "sum": current_sum}) + + return components + + +def pretty_format_usage(usage_plan): + return "".join(["." if usage == 0 else "E" for usage in usage_plan["usage"]]) + + +class TestWattTimeOptimizer(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Initialize WattTimeOptimizer before running any tests.""" + username = os.getenv("WATTTIME_USER") + password = os.getenv("WATTTIME_PASSWORD") + cls.wt_opt = WattTimeOptimizer(username, password) + cls.region = "PJM_NJ" + cls.usage_power_kw = 12 + now = datetime.now(UTC) + cls.window_start_test = now + timedelta(minutes=10) + cls.window_end_test = now + timedelta(minutes=720) + + def test_baseline_plan(self): + """Test the baseline plan.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=240, + usage_power_kw=self.usage_power_kw, + optimization_method="baseline", + ) + print("Using Baseline Plan\n", pretty_format_usage(usage_plan)) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 240) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 240 * self.usage_power_kw / 60 + ) + # Check number of components (1 for baseline) + self.assertEqual(len(get_contiguity_info(usage_plan)), 1) + + def test_simple_plan(self): + """Test the simple plan.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=240, + usage_power_kw=self.usage_power_kw, + optimization_method="simple", + ) + print("Using Simple Plan\n", pretty_format_usage(usage_plan)) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 240) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 240 * self.usage_power_kw / 60 + ) + + def test_dp_fixed_power_rate(self): + """Test the sophisticated plan with a fixed power rate.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=240, + usage_power_kw=self.usage_power_kw, + optimization_method="sophisticated", + ) + print("Using DP Plan w/ fixed power rate\n", pretty_format_usage(usage_plan)) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 240) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 240 * self.usage_power_kw / 60 + ) + + def test_dp_fixed_power_rate_with_uncertainty(self): + """Test the sophisticated plan with fixed power rate and time uncertainty.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=240, + usage_power_kw=self.usage_power_kw, + usage_time_uncertainty_minutes=180, + optimization_method="sophisticated", + ) + print("Using DP Plan w/ fixed power rate and charging uncertainty") + print(usage_plan["emissions_co2e_lb"].sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 240) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 240 * self.usage_power_kw / 60 + ) + + def test_dp_variable_power_rate(self): + """Test the plan with variable power rate.""" + usage_power_kw_df = pd.DataFrame( + [[0, 12], [20, 12], [40, 12], [100, 12], [219, 12], [220, 2.4], [320, 2.4]], + columns=["time", "power_kw"], + ) + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=320, + usage_power_kw=usage_power_kw_df, + optimization_method="auto", + ) + print("Using DP Plan w/ variable power rate") + print(usage_plan["emissions_co2e_lb"].sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 320) + # Check power + usage_plan_nonzero_entries = usage_plan[usage_plan["usage"] > 0] + power_kwh_array = ( + usage_plan_nonzero_entries["energy_usage_mwh"].values * 1e3 * 60 / 5 + ) + self.assertAlmostEqual(power_kwh_array[: 220 // 5].mean(), 12.0) + self.assertAlmostEqual(power_kwh_array[220 // 5 :].mean(), 2.4) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 220 * 12 / 60 + 100 * 2.4 / 60 + ) + + def test_dp_non_round_usage_time(self): + """Test auto mode with non-round usage time minutes.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=7, + usage_power_kw=self.usage_power_kw, + optimization_method="auto", + ) + print("Using auto mode, but with a non-round usage time minutes") + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 7) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 7 * self.usage_power_kw / 60 + ) + + def test_dp_input_time_energy(self): + """Test auto mode with a usage time and energy required.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=120, + energy_required_kwh=17, + optimization_method="auto", + ) + print("Using auto mode, with energy required in kWh") + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 120) + # Check power + self.assertAlmostEqual(get_usage_plan_mean_power(usage_plan), 8.5) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 120 * 8.5 / 60 + ) + + def test_dp_input_constant_power_energy(self): + """Test auto mode with a constant power and energy required.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_power_kw=5, + energy_required_kwh=15, + optimization_method="auto", + ) + print("Using auto mode, with energy required in kWh") + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 180) + # Check power + self.assertAlmostEqual(get_usage_plan_mean_power(usage_plan), 5) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 180 * 5 / 60 + ) + + def test_dp_two_intervals_unbounded(self): + """Test auto mode with two intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(0, 999999), (0, 999999)], + optimization_method="auto", + ) + print( + "Using auto mode with two unbounded intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + # Check number of components + self.assertLessEqual(len(get_contiguity_info(usage_plan)), 2) + + def test_dp_two_intervals_flexible_length(self): + """Test auto mode with two variable length intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(60, 100), (60, 100)], + optimization_method="auto", + ) + print( + "Using auto mode with two flexible intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 2) + if len(contiguity_info) == 2: + # Check first component length + self.assertGreaterEqual(contiguity_info[0]["sum"], 60) + self.assertLessEqual(contiguity_info[0]["sum"], 100) + # Check second component length + self.assertGreaterEqual(contiguity_info[1]["sum"], 60) + self.assertLessEqual(contiguity_info[1]["sum"], 100) + else: + # Check combined component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 160) + + def test_dp_two_intervals_one_sided_length(self): + """Test auto mode with two variable length intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(30, None), (30, None), (30, None), (30, None)], + optimization_method="auto", + ) + print( + "Using auto mode with one-sided intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 4) + for i in range(len(contiguity_info)): + # Check component length + self.assertGreaterEqual(contiguity_info[i]["sum"], 30) + + def test_dp_two_intervals_one_sided_length_use_all_false(self): + """Test auto mode with two variable length intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(40, None), (40, None), (40, None), (40, None)], + use_all_intervals=False, + optimization_method="auto", + ) + print( + "Using auto mode with one-sided intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 4) + for i in range(len(contiguity_info)): + # Check component length + self.assertGreaterEqual(contiguity_info[i]["sum"], 40) + + def test_dp_two_intervals_exact_input_a(self): + """Test auto mode with two intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(60, 60), (100, 100)], + optimization_method="auto", + ) + print( + "Using auto mode with two exact intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 2) + if len(contiguity_info) == 2: + # Check first component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 60) + # Check second component length + self.assertAlmostEqual(contiguity_info[1]["sum"], 100) + else: + # Check combined component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 160) + + def test_dp_two_intervals_exact_input_b(self): + """Test auto mode with two intervals.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[60, 100], + optimization_method="auto", + ) + print("Using auto mode, but with two intervals") + print(pretty_format_usage(usage_plan)) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 2) + if len(contiguity_info) == 2: + # Check first component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 60) + # Check second component length + self.assertAlmostEqual(contiguity_info[1]["sum"], 100) + else: + # Check combined component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 160) + + def test_dp_two_intervals_exact_unround(self): + """Test auto mode with two intervals, specified via list of tuple.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(67, 67), (93, 93)], + optimization_method="auto", + ) + print( + "Using auto mode with two exact unround intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 2) + if len(contiguity_info) == 2: + # Check first component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 67) + # Check second component length + self.assertAlmostEqual(contiguity_info[1]["sum"], 93) + else: + # Check combined component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 160) + + def test_dp_two_intervals_exact_unround_alternate_input(self): + """Test auto mode with two intervals, specified via list of ints.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[67, 93], + optimization_method="auto", + ) + print( + "Using auto mode with two exact unround intervals\n", + pretty_format_usage(usage_plan), + ) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 160) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 160 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertLessEqual(len(contiguity_info), 2) + if len(contiguity_info) == 2: + # Check first component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 67) + # Check second component length + self.assertAlmostEqual(contiguity_info[1]["sum"], 93) + else: + # Check combined component length + self.assertAlmostEqual(contiguity_info[0]["sum"], 160) + + def test_dp_two_intervals_exact_inconsistent_b(self): + """Test auto mode with one interval that is inconsistent with usage_time_required.""" + usage_plan = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=self.window_start_test, + usage_window_end=self.window_end_test, + usage_time_required_minutes=160, + usage_power_kw=self.usage_power_kw, + charge_per_interval=[(65, 65)], + optimization_method="auto", + ) + print("Using auto mode, but with two intervals") + print(pretty_format_usage(usage_plan)) + print(usage_plan.sum()) + + # Check time required + self.assertAlmostEqual(usage_plan["usage"].sum(), 65) + # Check power + self.assertAlmostEqual( + get_usage_plan_mean_power(usage_plan), self.usage_power_kw + ) + # Check energy required + self.assertAlmostEqual( + usage_plan["energy_usage_mwh"].sum() * 1000, 65 * self.usage_power_kw / 60 + ) + + contiguity_info = get_contiguity_info(usage_plan) + # Check number of components + self.assertEqual(len(contiguity_info), 1) + +def convert_to_utc(local_time_str, local_tz_str): + local_time = datetime.strptime( + local_time_str.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S" + ) + local_tz = pytz.timezone(local_tz_str) + local_time = local_tz.localize(local_time) + return local_time.astimezone(pytz.utc) + + +class TestRecalculatingOptimizer(unittest.TestCase): + def setUp(self): + self.region = "PJM_NJ" + self.username = os.getenv("WATTTIME_USER") + self.password = os.getenv("WATTTIME_PASSWORD") + self.static_start_time = convert_to_utc( + datetime(2024, 1, 1, hour=20, second=1), local_tz_str="America/New_York" + ) + self.static_end_time = convert_to_utc( + datetime(2024, 1, 2, hour=8, second=1), local_tz_str="America/New_York" + ) + + self.wth = WattTimeForecast(self.username, self.password) + self.curr_fcst_data = self.wth.get_historical_forecast_pandas( + start=self.static_start_time - timedelta(minutes=5), + end=self.static_end_time, + region=self.region, + signal_type="co2_moer", + horizon_hours=72, + ) + self.data_times = self.curr_fcst_data["generated_at"] + + def test_init_recalculating_optimizer(self) -> None: + """Test init""" + fcst_data = self.curr_fcst_data[ + self.curr_fcst_data["generated_at"] < self.static_start_time + ] + basic_schedule = WattTimeOptimizer( + self.username, self.password + ).get_optimal_usage_plan( + region=self.region, + usage_window_start=self.static_start_time, + usage_window_end=self.static_end_time, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="auto", + moer_data_override=fcst_data, + ) + + recalculating_optimizer = RecalculatingWattTimeOptimizer( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="auto", + ) + + starting_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, self.static_end_time, curr_fcst_data=fcst_data + ) + + self.assertEqual( + basic_schedule["usage"].tolist(), starting_schedule["usage"].tolist() + ) + self.assertEqual(basic_schedule["usage"].sum(), 240) + + def test_get_single_combined_schedule(self) -> None: + """Test get_combined with single schedule""" + recalculating_optimizer = RecalculatingWattTimeOptimizer( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="auto", + ) + + newest_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, + self.static_end_time, + ) + combined_schedule = recalculating_optimizer.get_combined_schedule() + + self.assertEqual( + newest_schedule["usage"].tolist(), combined_schedule["usage"].tolist() + ) + self.assertEqual(combined_schedule["usage"].sum(), 240) + + def test_multiple_schedules_combined(self) -> None: + """Test combining two schedules""" + recalculating_optimizer = RecalculatingWattTimeOptimizer( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="auto", + ) + first_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, + self.static_end_time, + ) + first_combined_schedule = recalculating_optimizer.get_combined_schedule() + second_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time + timedelta(hours=7), + self.static_end_time, + ) + second_combined_schedule = recalculating_optimizer.get_combined_schedule() + + self.assertNotEqual( + first_combined_schedule["usage"].tolist(), + second_combined_schedule["usage"].tolist(), + ) + self.assertEqual( + first_combined_schedule["usage"].tolist()[: 12 * 7], + second_combined_schedule["usage"].tolist()[: 12 * 7], + ) + self.assertEqual(first_combined_schedule["usage"].sum(), 240) + self.assertEqual(second_combined_schedule["usage"].sum(), 240) + + def test_schedule_times(self) -> None: + recalculating_optimizer = RecalculatingWattTimeOptimizer( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=30, + usage_power_kw=2, + optimization_method="auto", + ) + + start_time = self.static_start_time + end_time = self.static_end_time + timedelta(hours=2) + + for i in range(2 * 2): + start_time = start_time + timedelta(minutes=30) + schedule = recalculating_optimizer.get_new_schedule(start_time, end_time) + self.assertTrue(schedule.index.is_unique) + self.assertEquals( + schedule.index[0].to_pydatetime(), + start_time + timedelta(minutes=4, seconds=59), + ) + + self.assertTrue(recalculating_optimizer.get_combined_schedule().index.is_unique) + + def test_override_data_behavior(self) -> None: + """Test combining schedules with overriden data""" + recalculating_optimizer = RecalculatingWattTimeOptimizer( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="auto", + ) + last_data_time = self.data_times[self.data_times < self.static_start_time].max() + first_query_time_data = self.curr_fcst_data[ + self.curr_fcst_data["generated_at"] == last_data_time + ] + first_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, self.static_end_time, first_query_time_data + ) + first_combined_schedule = recalculating_optimizer.get_combined_schedule() + + last_data_time = self.data_times[ + self.data_times < self.static_start_time + timedelta(hours=7) + ].max() + second_query_time_data = self.curr_fcst_data[ + self.curr_fcst_data["generated_at"] == last_data_time + ] + second_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time + timedelta(hours=7), + self.static_end_time, + second_query_time_data, + ) + + second_combined_schedule = recalculating_optimizer.get_combined_schedule() + self.assertNotEqual( + first_combined_schedule["usage"].tolist(), + second_combined_schedule["usage"].tolist(), + ) + self.assertEqual( + first_combined_schedule["usage"].tolist()[: 12 * 7], + second_combined_schedule["usage"].tolist()[: 12 * 7], + ) + + self.assertEqual(first_combined_schedule["usage"].sum(), 240) + self.assertEqual(second_combined_schedule["usage"].sum(), 240) + + +def check_num_intervals(schedule: pd.DataFrame) -> int: + charging_indicator = schedule["usage"].apply(lambda x: 1 if x > 0 else 0) + intervals = charging_indicator.diff().value_counts().get(1, 0) + if charging_indicator[0] > 0: + intervals += 1 + return intervals + + +class TestRecalculatingOptimizerWithConstraints(unittest.TestCase): + def setUp(self): + self.region = "PJM_NJ" + self.username = os.getenv("WATTTIME_USER") + self.password = os.getenv("WATTTIME_PASSWORD") + + self.static_start_time = convert_to_utc( + datetime(2024, 1, 1, hour=20, second=1), local_tz_str="America/New_York" + ) + self.static_end_time = convert_to_utc( + datetime(2024, 1, 2, hour=8, second=1), local_tz_str="America/New_York" + ) + + self.wth = WattTimeForecast(self.username, self.password) + self.curr_fcst_data = self.wth.get_historical_forecast_pandas( + start=self.static_start_time - timedelta(minutes=5), + end=self.static_end_time, + region=self.region, + signal_type="co2_moer", + horizon_hours=72, + ) + self.data_times = self.curr_fcst_data["generated_at"] + + def test_recalculating_optimizer_adjust_num_intervals(self) -> None: + recalculating_optimizer = RecalculatingWattTimeOptimizerWithContiguity( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="sophisticated", + charge_per_interval=[140, 100], + ) + + initial_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, + self.static_end_time, + ) + self.assertTrue(check_num_intervals(initial_schedule) <= 2) + + first_interval_end_time = initial_schedule[ + initial_schedule["usage"].diff() < 0 + ].index[0] + + next_schedule = recalculating_optimizer.get_new_schedule( + first_interval_end_time, + self.static_end_time, + ) + + self.assertTrue(check_num_intervals(next_schedule) == 1) + self.assertEqual( + recalculating_optimizer.get_combined_schedule()["usage"].sum(), 240 + ) + + def test_recalculating_optimizer_mid_interval(self) -> None: + recalculating_optimizer = RecalculatingWattTimeOptimizerWithContiguity( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="sophisticated", + charge_per_interval=[120, 120], + ) + + initial_schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time, + self.static_end_time, + ) + self.assertTrue(check_num_intervals(initial_schedule) <= 2) + + mid_interval_time = initial_schedule[ + initial_schedule["usage"].diff() < 0 + ].index[0] - timedelta(minutes=10) + + next_schedule = recalculating_optimizer.get_new_schedule( + mid_interval_time, + self.static_end_time, + ) + + # Check that remaining schedule before interval end is the same + self.assertTrue( + initial_schedule[initial_schedule.index >= mid_interval_time] + .head(2) + .equals(next_schedule.head(2)) + ) + self.assertEqual(next_schedule.index[0], mid_interval_time) + self.assertEqual( + recalculating_optimizer.get_combined_schedule()["usage"].sum(), 240 + ) + + def test_init_recalculating_contiguity_optimizer(self) -> None: + """Test init""" + + recalculating_optimizer = RecalculatingWattTimeOptimizerWithContiguity( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=240, + usage_power_kw=2, + optimization_method="sophisticated", + charge_per_interval=[100, 140], + ) + + for i in range(12): + schedule = recalculating_optimizer.get_new_schedule( + self.static_start_time + timedelta(hours=i), + self.static_end_time, + ) + + self.assertTrue( + check_num_intervals(recalculating_optimizer.get_combined_schedule()) <= 2 + ) + self.assertEqual( + recalculating_optimizer.get_combined_schedule()["usage"].sum(), 240 + ) + + def test_frequent_recalculating_with_contiguity(self) -> None: + recalculating_optimizer = RecalculatingWattTimeOptimizerWithContiguity( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=30, + usage_power_kw=2, + optimization_method="sophisticated", + charge_per_interval=[15, 15], + ) + start_time = self.static_start_time + end_time = self.static_end_time + timedelta(hours=2) + + for i in range(12 * 2): + start_time = start_time + timedelta(minutes=5) + schedule = recalculating_optimizer.get_new_schedule(start_time, end_time) + + self.assertTrue( + check_num_intervals(recalculating_optimizer.get_combined_schedule()) <= 2 + ) + self.assertEqual( + recalculating_optimizer.get_combined_schedule()["usage"].sum(), 30 + ) + + def test_schedule_times(self) -> None: + recalculating_optimizer = RecalculatingWattTimeOptimizerWithContiguity( + region=self.region, + watttime_username=self.username, + watttime_password=self.password, + usage_time_required_minutes=30, + usage_power_kw=2, + optimization_method="sophisticated", + charge_per_interval=[15, 15], + ) + + start_time = self.static_start_time + end_time = self.static_end_time + timedelta(hours=2) + + for i in range(2 * 2): + start_time = start_time + timedelta(minutes=30) + schedule = recalculating_optimizer.get_new_schedule(start_time, end_time) + self.assertTrue(schedule.index.is_unique) + self.assertEqual( + schedule.index[0].to_pydatetime(), + start_time + timedelta(minutes=4, seconds=59), + ) + + self.assertTrue(recalculating_optimizer.get_combined_schedule().index.is_unique) + +if __name__ == "__main__": + unittest.main() + # TestWattTimeOptimizer.setUpClass() + # TestWattTimeOptimizer().test_dp_non_round_usage_time() diff --git a/watttime/api.py b/watttime/api.py index bc5b18c0..1476dbd2 100644 --- a/watttime/api.py +++ b/watttime/api.py @@ -1,5 +1,6 @@ import os import time +import math from datetime import date, datetime, timedelta from functools import cache from pathlib import Path @@ -8,7 +9,10 @@ import pandas as pd import requests from dateutil.parser import parse -from pytz import UTC +from pytz import UTC, timezone +from watttime.optimizer.alg import optCharger, moer +from itertools import accumulate +import bisect class WattTimeBase: @@ -189,19 +193,19 @@ def get_historical_jsons( """ Base function to scrape historical data, returning a list of .json responses. - Args: - start (datetime): inclusive start, with a UTC timezone. - end (datetime): inclusive end, with a UTC timezone. - region (str): string, accessible through the /my-access endpoint, or use the free region (CAISO_NORTH) - signal_type (str, optional): one of ['co2_moer', 'co2_aoer', 'health_damage']. Defaults to "co2_moer". - model (Optional[Union[str, date]], optional): Optionally provide a model, used for versioning models. - Defaults to None. + Args: + start (datetime): inclusive start, with a UTC timezone. + end (datetime): inclusive end, with a UTC timezone. + region (str): string, accessible through the /my-access endpoint, or use the free region (CAISO_NORTH) + signal_type (str, optional): one of ['co2_moer', 'co2_aoer', 'health_damage']. Defaults to "co2_moer". + model (Optional[Union[str, date]], optional): Optionally provide a model, used for versioning models. + Defaults to None. - Raises: - Exception: Scraping failed for some reason + Raises: + Exception: Scraping failed for some reason - Returns: - List[dict]: A list of dictionary representations of the .json response object + Returns: + List[dict]: A list of dictionary representations of the .json response object """ if not self._is_token_valid(): self._login() @@ -224,7 +228,7 @@ def get_historical_jsons( rsp.raise_for_status() j = rsp.json() responses.append(j) - except Exception as e: + except Exception: raise Exception( f"\nAPI Response Error: {rsp.status_code}, {rsp.text} [{rsp.headers.get('x-request-id')}]" ) @@ -493,7 +497,7 @@ def get_historical_forecast_json( rsp.raise_for_status() j = rsp.json() responses.append(j) - except Exception as e: + except Exception: raise Exception( f"\nAPI Response Error: {rsp.status_code}, {rsp.text} [{rsp.headers.get('x-request-id')}]" ) @@ -542,6 +546,411 @@ def get_historical_forecast_pandas( return out +OPT_INTERVAL = 5 +MAX_PREDICTION_HOURS = 72 + + +class WattTimeOptimizer(WattTimeForecast): + """ + This class inherits from WattTimeForecast, with additional methods to generate + optimal usage plans for energy consumption based on various parameters and + constraints. + + Additional Methods: + -------- + get_optimal_usage_plan(region, usage_window_start, usage_window_end, + usage_time_required_minutes, usage_power_kw, + usage_time_uncertainty_minutes, optimization_method, + moer_data_override) + Generates an optimal usage plan for energy consumption. + """ + + OPT_INTERVAL = 5 + MAX_PREDICTION_HOURS = 72 + MAX_INT = 99999999999999999 + + def get_optimal_usage_plan( + self, + region: str, + usage_window_start: datetime, + usage_window_end: datetime, + usage_time_required_minutes: Optional[Union[int, float]] = None, + usage_power_kw: Optional[Union[int, float, pd.DataFrame]] = None, + energy_required_kwh: Optional[Union[int, float]] = None, + usage_time_uncertainty_minutes: Optional[Union[int, float]] = 0, + charge_per_interval: Optional[list] = None, + use_all_intervals: bool = True, + constraints: Optional[dict] = None, + optimization_method: Optional[ + Literal["baseline", "simple", "sophisticated", "auto"] + ] = "baseline", + moer_data_override: Optional[pd.DataFrame] = None, + ) -> pd.DataFrame: + """ + Generates an optimal usage plan for energy consumption based on given parameters. + + This method calculates the most efficient energy usage schedule within a specified + time window, considering factors such as regional data, power requirements, and + optimization methods. + + You should pass in exactly 2 of 3 parameters of (usage_time_required_minutes, usage_power_kw, energy_required_kwh) + + Parameters: + ----------- + region : str + The region for which forecast data is requested. + usage_window_start : datetime + Start time of the window when power consumption is allowed. + usage_window_end : datetime + End time of the window when power consumption is allowed. + usage_time_required_minutes : Optional[Union[int, float]], default=None + Required usage time in minutes. + usage_power_kw : Optional[Union[int, float, pd.DataFrame]], default=None + Power usage in kilowatts. Can be a constant value or a DataFrame for variable power. + energy_required_kwh : Optional[Union[int, float]], default=None + Energy required in kwh + usage_time_uncertainty_minutes : Optional[Union[int, float]], default=0 + Uncertainty in usage time, in minutes. + charge_per_interval : Optional[list], default=None + Either a list of length-2 tuples representing minimium and maximum (inclusive) charging minutes per interval, + or a list of ints representing both the min and max. + use_all_intervals : Optional[bool], default=False + If true, use all intervals provided by charge_per_interval; if false, can use the first few intervals and skip the rest. + constraints : Optional[dict], default=None + A dictionary containing contraints on how much usage must be used before the given time point + optimization_method : Optional[Literal["baseline", "simple", "sophisticated", "auto"]], default="baseline" + The method used for optimization. + moer_data_override : Optional[pd.DataFrame], default=None + Pre-generated MOER (Marginal Operating Emissions Rate) DataFrame, if available. + + Returns: + -------- + pd.DataFrame + A DataFrame representing the optimal usage plan, including columns for + predicted MOER, usage, CO2 emissions, and energy usage. + + Raises: + ------- + AssertionError + If input parameters do not meet specified conditions (e.g., timezone awareness, + valid time ranges, supported optimization methods). + + Notes: + ------ + - The method uses WattTime forecast data unless overridden by moer_data_override. + - It supports various optimization methods and can handle both constant and variable power usage. + - The resulting plan aims to minimize emissions while meeting the specified energy requirements. + """ + + def is_tz_aware(dt): + return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None + + def minutes_to_units(x, floor=False): + if x: + if floor: + return int(x // self.OPT_INTERVAL) + else: + return int(math.ceil(x / self.OPT_INTERVAL)) + return x + + assert is_tz_aware(usage_window_start), "Start time is not tz-aware" + assert is_tz_aware(usage_window_end), "End time is not tz-aware" + + if constraints is None: + constraints = {} + else: + # Convert constraints to a standardized format + raw_constraints = constraints.copy() + constraints = {} + + for ( + constraint_time_clock, + constraint_usage_minutes, + ) in raw_constraints.items(): + constraint_time_minutes = ( + constraint_time_clock - usage_window_start + ).total_seconds() / 60 + constraint_time_units = minutes_to_units(constraint_time_minutes) + constraint_usage_units = minutes_to_units(constraint_usage_minutes) + + constraints.update( + {constraint_time_units: (constraint_usage_units, None)} + ) + + num_inputs = 0 + for input in (usage_time_required_minutes, usage_power_kw, energy_required_kwh): + if input is not None: + num_inputs += 1 + assert ( + num_inputs == 2 + ), "Exactly 2 of 3 inputs in (usage_time_required_minutes, usage_power_kw, energy_required_kwh) required" + if usage_power_kw is None: + usage_power_kw = energy_required_kwh / usage_time_required_minutes * 60 + print("Implied usage_power_kw =", usage_power_kw) + if usage_time_required_minutes is None: + if type(usage_power_kw) in (float, int) and type(energy_required_kwh) in ( + float, + int, + ): + usage_time_required_minutes = energy_required_kwh / usage_power_kw * 60 + print("Implied usage time required =", usage_time_required_minutes) + else: + # TODO: Implement and test + raise NotImplementedError( + "When usage_time_required_minutes is None, only float or int usage_power_kw and energy_required_kwh is supported." + ) + + # Perform these checks if we are using live data + if moer_data_override is None: + datetime_now = datetime.now(UTC) + assert ( + usage_window_end > datetime_now + ), "Error, Window end is before current datetime" + assert usage_window_end - datetime_now < timedelta( + hours=self.MAX_PREDICTION_HOURS + ), "End time is too far in the future" + assert optimization_method in ("baseline", "simple", "sophisticated", "auto"), ( + "Unsupported optimization method:" + optimization_method + ) + if moer_data_override is None: + forecast_df = self.get_forecast_pandas( + region=region, + signal_type="co2_moer", + horizon_hours=self.MAX_PREDICTION_HOURS, + ) + else: + forecast_df = moer_data_override.copy() + forecast_df = forecast_df.set_index("point_time") + forecast_df.index = pd.to_datetime(forecast_df.index) + + # relevant_forecast_df = forecast_df[usage_window_start:usage_window_end] + relevant_forecast_df = forecast_df[forecast_df.index >= usage_window_start] + relevant_forecast_df = relevant_forecast_df[ + relevant_forecast_df.index < usage_window_end + ] + relevant_forecast_df = relevant_forecast_df.rename( + columns={"value": "pred_moer"} + ) + result_df = relevant_forecast_df[["pred_moer"]] + moer_values = relevant_forecast_df["pred_moer"].values + + m = moer.Moer(mu=moer_values) + + model = optCharger.OptCharger() + + total_charge_units = minutes_to_units(usage_time_required_minutes) + if optimization_method in ("sophisticated", "auto"): + # Give a buffer time equal to the uncertainty + buffer_time = usage_time_uncertainty_minutes + buffer_periods = minutes_to_units(buffer_time) if buffer_time else 0 + buffer_enforce_time = max( + total_charge_units, len(moer_values) - buffer_periods + ) + constraints.update({buffer_enforce_time: (total_charge_units, None)}) + else: + assert ( + usage_time_uncertainty_minutes == 0 + ), "usage_time_uncertainty_minutes is only supported in optimization_method='sophisticated' or 'auto'" + + if type(usage_power_kw) in (int, float): + # Convert to the MWh used in an optimization interval + # expressed as a function to meet the parameter requirements for OptC function + emission_multiplier_fn = ( + lambda sc, ec: float(usage_power_kw) * 0.001 * self.OPT_INTERVAL / 60.0 + ) + else: + usage_power_kw = usage_power_kw.copy() + # Resample usage power dataframe to an OPT_INTERVAL frequency + usage_power_kw["time_step"] = usage_power_kw["time"] / self.OPT_INTERVAL + usage_power_kw_new_index = pd.DataFrame( + index=[float(x) for x in range(total_charge_units + 1)] + ) + usage_power_kw = pd.merge_asof( + usage_power_kw_new_index, + usage_power_kw.set_index("time_step"), + left_index=True, + right_index=True, + direction="backward", + allow_exact_matches=True, + ) + + def emission_multiplier_fn(sc: float, ec: float) -> float: + """ + Calculate the approximate mean power in the given time range, + in units of MWh used per optimizer time unit. + + sc and ec are float values representing the start and end time of + the time range, in optimizer time units. + """ + value = ( + usage_power_kw[sc : max(sc, ec - 1e-12)]["power_kw"].mean() + * 0.001 + * self.OPT_INTERVAL + / 60.0 + ) + return value + + if charge_per_interval: + # Handle the charge_per_interval input by converting it from minutes to units, rounding up + converted_charge_per_interval = [] + for c in charge_per_interval: + if isinstance(c, int): + converted_charge_per_interval.append(minutes_to_units(c)) + else: + assert ( + len(c) == 2 + ), "Length of tuples in charge_per_interval is not 2" + interval_start_units = minutes_to_units(c[0]) if c[0] else 0 + interval_end_units = ( + minutes_to_units(c[1]) if c[1] else self.MAX_INT + ) + converted_charge_per_interval.append( + (interval_start_units, interval_end_units) + ) + # print("Charge per interval:", converted_charge_per_interval) + else: + converted_charge_per_interval = None + model.fit( + total_charge=total_charge_units, + total_time=len(moer_values), + moer=m, + constraints=constraints, + charge_per_interval=converted_charge_per_interval, + use_all_intervals=use_all_intervals, + emission_multiplier_fn=emission_multiplier_fn, + optimization_method=optimization_method, + ) + + optimizer_result = model.get_schedule() + result_df = self._reconcile_constraints( + optimizer_result, + result_df, + model, + usage_time_required_minutes, + charge_per_interval, + ) + + return result_df + + def _reconcile_constraints( + self, + optimizer_result, + result_df, + model, + usage_time_required_minutes, + charge_per_interval, + ): + # Make a copy of charge_per_interval if necessary + if charge_per_interval is not None: + charge_per_interval = charge_per_interval[::] + for i in range(len(charge_per_interval)): + if type(charge_per_interval[i]) == int: + charge_per_interval[i] = ( + charge_per_interval[i], + charge_per_interval[i], + ) + assert len(charge_per_interval[i]) == 2 + processed_start = ( + charge_per_interval[i][0] + if charge_per_interval[i][0] is not None + else 0 + ) + processed_end = ( + charge_per_interval[i][1] + if charge_per_interval[i][1] is not None + else self.MAX_INT + ) + + charge_per_interval[i] = (processed_start, processed_end) + + if not charge_per_interval: + # Handle case without charge_per_interval constraints + total_usage_intervals = sum(optimizer_result) + current_usage_intervals = 0 + usage_list = [] + for to_charge_binary in optimizer_result: + current_usage_intervals += to_charge_binary + if current_usage_intervals < total_usage_intervals: + usage_list.append(to_charge_binary * float(self.OPT_INTERVAL)) + else: + # Partial interval + minutes_to_trim = ( + total_usage_intervals * self.OPT_INTERVAL + - usage_time_required_minutes + ) + usage_list.append( + to_charge_binary * float(self.OPT_INTERVAL - minutes_to_trim) + ) + result_df["usage"] = usage_list + else: + # Process charge_per_interval constraints + result_df["usage"] = [ + x * float(self.OPT_INTERVAL) for x in optimizer_result + ] + usage = result_df["usage"].values + sections = [] + interval_ids = model.get_interval_ids() + + def get_min_max_indices(lst, x): + # Find the first occurrence of x + min_index = lst.index(x) + # Find the last occurrence of x + max_index = len(lst) - 1 - lst[::-1].index(x) + return min_index, max_index + + for interval_id in range(0, max(interval_ids) + 1): + assert ( + interval_id in interval_ids + ), "interval_id not found in interval_ids" + sections.append(get_min_max_indices(interval_ids, interval_id)) + + # Adjust sections to satisfy charge_per_interval constraints + for i, (start, end) in enumerate(sections): + section_usage = usage[start : end + 1] + total_minutes = section_usage.sum() + + # Get the constraints for this section + if isinstance(charge_per_interval[i], int): + min_minutes, max_minutes = ( + charge_per_interval[i], + charge_per_interval[i], + ) + else: + min_minutes, max_minutes = charge_per_interval[i] + + # Adjust the section to fit the constraints + if total_minutes < min_minutes: + raise ValueError( + f"Cannot meet the minimum charging constraint of {min_minutes} minutes for section {i}." + ) + elif total_minutes > max_minutes: + # Reduce usage to fit within the max_minutes + excess_minutes = total_minutes - max_minutes + for j in range(len(section_usage)): + if section_usage[j] > 0: + reduction = min(section_usage[j], excess_minutes) + section_usage[j] -= reduction + excess_minutes -= reduction + if excess_minutes <= 0: + break + usage[start : end + 1] = section_usage + result_df["usage"] = usage + + # Recalculate these values approximately, based on the new "usage" column + # Note: This is approximate since it assumes that + # the charging emissions over time of the unrounded values are similar to the rounded values + result_df["emissions_co2e_lb"] = ( + model.get_charging_emissions_over_time() + * result_df["usage"] + / self.OPT_INTERVAL + ) + result_df["energy_usage_mwh"] = ( + model.get_energy_usage_over_time() * result_df["usage"] / self.OPT_INTERVAL + ) + + return result_df + + class WattTimeMaps(WattTimeBase): def get_maps_json( self, @@ -568,3 +977,273 @@ def get_maps_json( rsp = requests.get(url, headers=headers, params=params) rsp.raise_for_status() return rsp.json() + + +class RecalculatingWattTimeOptimizer: + def __init__( + self, + watttime_username: str, + watttime_password: str, + region: str, + usage_time_required_minutes: float, + usage_power_kw: Union[int, float, pd.DataFrame], + optimization_method: Optional[ + Literal["baseline", "simple", "sophisticated", "auto"] + ], + ) -> None: + # Settings that stay consistent across calls to get_optimal_usage_plan + self.region = region + self.total_time_required = usage_time_required_minutes + self.usage_power_kw = usage_power_kw + self.optimization_method = optimization_method + + # Setup for us to track schedule/usage + self.all_schedules = [] # (schedule, ctx) + + # Set up to query for fcsts + self.forecast_generator = WattTimeForecast(watttime_username, watttime_password) + self.wt_opt = WattTimeOptimizer(watttime_username, watttime_password) + + # Set up to query for actual data + self.wt_hist = WattTimeHistorical(watttime_username, watttime_password) + + def _get_curr_fcst_data(self, new_start_time: datetime): + curr_fcst_data = self.forecast_generator.get_historical_forecast_pandas( + start=new_start_time - timedelta(minutes=OPT_INTERVAL), + end=new_start_time, + region=self.region, + signal_type="co2_moer", + horizon_hours=MAX_PREDICTION_HOURS, + ) + most_recent_data_time = curr_fcst_data["generated_at"].iloc[-1] + curr_fcst_data = curr_fcst_data[ + curr_fcst_data["generated_at"] == most_recent_data_time + ] + # Get most recent forecast time using iloc with bounds checking + if len(curr_fcst_data["generated_at"]) > 0: + most_recent_data_time = curr_fcst_data["generated_at"].iloc[-1] + curr_fcst_data = curr_fcst_data[ + curr_fcst_data["generated_at"] == most_recent_data_time + ].copy() + return curr_fcst_data + + def _get_remaining_time_required(self, query_time: datetime): + if len(self.all_schedules) == 0: + return self.total_time_required + + # If there are previously produced schedules, assume we followed each schedule until getting a new one + combined_schedule = self.get_combined_schedule() + + # Calculate remaining time required + usage = int( + combined_schedule[combined_schedule.index < query_time]["usage"].sum() + ) + return self.total_time_required - usage + + def _set_last_schedule_end_time(self, new_schedule_start_time: datetime): + # If there a previously produced schedule, assume we followed that schedule until getting the new one + if len(self.all_schedules) > 0: + # Set end time of last ctx + schedule, ctx = self.all_schedules[-1] + self.all_schedules[-1] = (schedule, (ctx[0], new_schedule_start_time)) + assert ctx[0] < new_schedule_start_time + + def _query_api_for_fcst_data(self, new_start_time: datetime): + # Get new data + curr_fcst_data = self.forecast_generator.get_historical_forecast_pandas( + start=new_start_time - timedelta(minutes=OPT_INTERVAL), + end=new_start_time, + region=self.region, + signal_type="co2_moer", + horizon_hours=MAX_PREDICTION_HOURS, + ) + most_recent_data_time = curr_fcst_data["generated_at"].iloc[-1] + curr_fcst_data = curr_fcst_data[ + curr_fcst_data["generated_at"] == most_recent_data_time + ] + return curr_fcst_data + + def _get_new_schedule( + self, + new_start_time: datetime, + new_end_time: datetime, + curr_fcst_data: pd.DataFrame = None, + charge_per_interval: Optional[list] = None, + ) -> tuple[pd.DataFrame, tuple[str, str]]: + + if curr_fcst_data is None: + curr_fcst_data = self._query_api_for_fcst_data(new_start_time) + + curr_fcst_data["point_time"] = pd.to_datetime(curr_fcst_data["point_time"]) + curr_fcst_data = curr_fcst_data.loc[ + curr_fcst_data["point_time"] >= new_start_time + ] + if curr_fcst_data.shape[0] == 0: + print("error") + new_schedule_start_time = curr_fcst_data["point_time"].iloc[0] + + # Generate new schedule + new_schedule = self.wt_opt.get_optimal_usage_plan( + region=self.region, + usage_window_start=new_start_time - timedelta(minutes=OPT_INTERVAL), + usage_window_end=new_end_time, + usage_time_required_minutes=self._get_remaining_time_required( + new_schedule_start_time + ), + usage_power_kw=self.usage_power_kw, + optimization_method=self.optimization_method, + moer_data_override=curr_fcst_data, + charge_per_interval=charge_per_interval, + ) + new_schedule_ctx = (new_schedule_start_time, new_end_time) + + return new_schedule, new_schedule_ctx + + def get_new_schedule( + self, + new_start_time: datetime, + new_end_time: datetime, + curr_fcst_data: pd.DataFrame = None, + ) -> pd.DataFrame: + schedule, ctx = self._get_new_schedule( + new_start_time, new_end_time, curr_fcst_data + ) + + self._set_last_schedule_end_time(ctx[0]) + self.all_schedules.append((schedule, ctx)) + return schedule + + def get_combined_schedule(self, end_time: datetime = None) -> pd.DataFrame: + schedule_segments = [] + for s, ctx in self.all_schedules: + schedule_segments.append(s[s.index < ctx[1]]) + combined_schedule = pd.concat(schedule_segments) + + if end_time: + # Only keep segments that complete before end_time + last_segment_start_time = end_time + timedelta(minutes=OPT_INTERVAL) + combined_schedule = combined_schedule[ + combined_schedule.index <= last_segment_start_time + ] + + return combined_schedule + + +class RecalculatingWattTimeOptimizerWithContiguity(RecalculatingWattTimeOptimizer): + def __init__( + self, + watttime_username: str, + watttime_password: str, + region: str, + usage_time_required_minutes: float, + usage_power_kw: Union[int, float, pd.DataFrame], + optimization_method: Optional[ + Literal["baseline", "simple", "sophisticated", "auto"] + ], + charge_per_interval: list = [], + ): + self.all_charge_per_interval = charge_per_interval + super().__init__( + watttime_username, + watttime_password, + region, + usage_time_required_minutes, + usage_power_kw, + optimization_method, + ) + + def get_new_schedule( + self, + new_start_time: datetime, + new_end_time: datetime, + curr_fcst_data: pd.DataFrame = None, + ) -> pd.DataFrame: + if len(self.all_schedules) == 0: + # If no existing schedules, then generate as normal + new_schedule, _ = self._get_new_schedule( + new_start_time, + new_end_time, + curr_fcst_data, + self.all_charge_per_interval, + ) + self.all_schedules.append((new_schedule, (new_start_time, new_end_time))) + return new_schedule + + # Get the schedule that we should previously have followed + curr_combined_schedule = self.get_combined_schedule(new_end_time) + + # Get num charging intervals completed so far + completed_schedule = curr_combined_schedule[ + curr_combined_schedule.index < new_start_time + ] + charging_indicator = ( + completed_schedule["usage"].apply(lambda x: 1 if x > 0 else 0).sum() + ) + num_charging_segments_complete = bisect.bisect_right( + list(accumulate(self.all_charge_per_interval)), charging_indicator * 5 + ) + + # Get the current status + curr_segment = curr_combined_schedule[ + curr_combined_schedule.index <= new_start_time + ].iloc[-1] + if curr_segment["usage"] > 0: + upcoming_segments = curr_combined_schedule[ + curr_combined_schedule.index > new_start_time + ] + upcoming_no_charge_times = upcoming_segments[ + upcoming_segments["usage"] == 0 + ] + + # if we charge for the remaining time, return the existing schedule (starting at new_start_time) + if upcoming_no_charge_times.empty: + return curr_combined_schedule[ + curr_combined_schedule.index >= new_start_time + ] + + next_unplug_time = upcoming_no_charge_times.index[0] + next_unplug_time = next_unplug_time.to_pydatetime() + + # Get the section of old schedule to follow + remaining_old_schedule = curr_combined_schedule[ + curr_combined_schedule.index < next_unplug_time + ] + remaining_old_schedule = remaining_old_schedule[ + remaining_old_schedule.index >= new_start_time + ] + + # Update completed segments to reflect portion of old schedule + additional_charge_segments = ( + remaining_old_schedule["usage"].apply(lambda x: 1 if x > 0 else 0).sum() + ) + num_charging_segments_complete = bisect.bisect_right( + list(accumulate(self.all_charge_per_interval)), + (charging_indicator + additional_charge_segments) * 5, + ) + + # Get schedule for after this segment completes + new_schedule, ctx = self._get_new_schedule( + next_unplug_time, + new_end_time, + curr_fcst_data, + self.all_charge_per_interval[num_charging_segments_complete:], + ) + + # Construct the schedule from start_time + if remaining_old_schedule is not None: + new_schedule = pd.concat([remaining_old_schedule, new_schedule]) + + ctx = (new_schedule.index[0], ctx[1]) + else: + # If not in segment, generate a schedule starting at new_start_time + new_schedule, ctx = self._get_new_schedule( + new_start_time, + new_end_time, + curr_fcst_data, + self.all_charge_per_interval[num_charging_segments_complete:], + ) + + # Update last schedule, add new schedule + self._set_last_schedule_end_time(new_start_time) + self.all_schedules.append((new_schedule, ctx)) + return new_schedule diff --git a/watttime/api_convert.py b/watttime/api_convert.py new file mode 100644 index 00000000..5109d056 --- /dev/null +++ b/watttime/api_convert.py @@ -0,0 +1,194 @@ +import pandas as pd +import numpy as np + + +# This file contains utility functions for converting formats for now +def convert_soc_to_soe(soc_power_df, voltage_curve, battery_capacity_coulombs): + """ + Convert State of Charge (SoC) to State of Energy (SoE) by integrating voltage over SoC. + + Parameters: + soc_power_df (pd.DataFrame): DataFrame with 'SoC' and 'power_kw' columns. + voltage_curve (function): Voltage as a function of SoC. + battery_capacity_coulombs (float): Maximum current capacity of the battery in coulombs. + + Returns: + pd.DataFrame: DataFrame with 'SoE' and 'power_kw' columns. + """ + soc = soc_power_df["SoC"] + + # Voltage at each SoC + voltage = voltage_curve(soc) + + # Calculate differential SoC for numerical integration + delta_soc = np.diff(soc, prepend=0) + charge_per_interval = delta_soc * battery_capacity_coulombs + # Energy is voltage * charge + energy_kwh = np.cumsum(voltage * charge_per_interval * 0.001 / 3600) + + # Normalize so that State of energy goes from 0 to 1 + soe_array = energy_kwh / energy_kwh.iloc[-1] + + # Create a new DataFrame with 'SoE' and 'power_kw' + soe_power_df = pd.DataFrame( + {"SoE": soe_array, "power_kw": soc_power_df["power_kw"]} + ) + + return soe_power_df + + +def convert_soe_to_time(soe_power_df, battery_capacity): + """ + Convert Power vs SoE DataFrame to a Power vs Time DataFrame. + + Parameters: + soe_power_df (pd.DataFrame): DataFrame with 'SoE' and 'power_kw' columns. + battery_capacity (float): Maximum energy capacity of the battery in kWh. + + Returns: + pd.DataFrame: DataFrame with 'time' (in minutes) and 'power_kw' columns. + """ + time_list = [0] # Starting at t = 0 minutes + previous_time = 0 + + for i in range(len(soe_power_df) - 1): + # Calculate the delta SoE + delta_soe = soe_power_df["SoE"].iloc[i + 1] - soe_power_df["SoE"].iloc[i] + + # Energy transferred for this delta SoE + delta_energy = delta_soe * battery_capacity # in kWh + + # Power to use during this step + power_to_use = soe_power_df["power_kw"].iloc[i] + + # Time step for this segment + delta_time_minutes = delta_energy / power_to_use * 60 + + # Add the time to the previous time to get cumulative time + current_time = previous_time + delta_time_minutes + time_list.append(current_time) + previous_time = current_time + + # Convert SoE dataframe to Time dataframe + time_power_df = pd.DataFrame( + {"time": time_list, "power_kw": soe_power_df["power_kw"]} + ) + + return time_power_df + + +def get_usage_power_kw_df(soe_power_df, capacity_kWh): + """ + Output the variable charging curve in the format that optimizer accepts. + That is, dataframe with index "time" in minutes and "power_kw" which + tells us the average power consumption in a five minute interval + after an elapsed amount of time of charging. + + Assumes df is sorted by SoE + """ + + def get_kW_at_SoE(df, soe): + """Linear interpolation to get charging rate at any SoE""" + before_df = df[df["SoE"] <= soe] + # print("Before_df", before_df) + prev_row = before_df.iloc[-1] if len(before_df) > 0 else None + after_df = df[df["SoE"] >= soe] + # print("After_df", after_df) + next_row = after_df.iloc[0] if len(after_df) > 0 else None + if prev_row is None: + return next_row["power_kw"] + if next_row is None: + return prev_row["power_kw"] + + m1 = prev_row["SoE"] + p1 = prev_row["power_kw"] + m2 = next_row["SoE"] + p2 = next_row["power_kw"] + + if m1 == m2: + return 0.5 * (p1 + p2) + + return p1 + (soe - m1) / (m2 - m1) * (p2 - p1) + + # iterate over seconds + result = [] + secs_elapsed = 0 + sub_interval_seconds = 60 + # For now, we assume the starting capacity is 0.0 + charged_kWh = 0.0 + kW_by_second = [] + while charged_kWh < capacity_kWh: + secs_elapsed += sub_interval_seconds + curr_soe = charged_kWh / capacity_kWh + curr_kW = get_kW_at_SoE(soe_power_df, curr_soe) + # print("Debug:", curr_kW, curr_soe, secs_elapsed) + kW_by_second.append(curr_kW) + charged_kWh += curr_kW * sub_interval_seconds / 3600 + + if secs_elapsed % 300 == 0: + result.append((int(secs_elapsed / 60 - 5), pd.Series(kW_by_second).mean())) + kW_by_second = [] + + return pd.DataFrame(columns=["time", "power_kw"], data=result) + + +# Example usage: +soe_power_df = pd.DataFrame( + { + "SoE": np.linspace(0.0, 1.0, 11), # SoE from 0% to 100% + "power_kw": [ + 8, + 10, + 12, + 14, + 16, + 18, + 20, + 22, + 24, + 26, + 28, + ], # Example power values in kW + } +) + +battery_capacity = 100 # Max energy capacity in kWh +result_df = convert_soe_to_time(soe_power_df, battery_capacity) + +print("Old:", result_df) +print("New:", get_usage_power_kw_df(soe_power_df, battery_capacity)) + + +# Example voltage curve for testing +def voltage_curve_test(soc): + return 3.0 + 0.5 * soc + + +# Example SoC dataframe (with SoC ranging from 0.1 to 1.0) +soc_power_df = pd.DataFrame( + { + "SoC": np.linspace(0.0, 1.0, 11), + "power_kw": [ + 8, + 10, + 12, + 14, + 16, + 18, + 20, + 22, + 24, + 26, + 28, + ], # Example power values in kW + } +) + +battery_capacity_coulombs = 1_000_000 # Max energy capacity in kWh + +# Convert SoC to SoE +soe_power_df = convert_soc_to_soe( + soc_power_df, voltage_curve_test, battery_capacity_coulombs +) + +# print(soe_power_df) diff --git a/watttime/optimizer/Optimizer README.md b/watttime/optimizer/Optimizer README.md new file mode 100644 index 00000000..875a3fc2 --- /dev/null +++ b/watttime/optimizer/Optimizer README.md @@ -0,0 +1,264 @@ +# Optimizer README + +## Overview + +This code is built to implement and evaluate an algorithm to produce a charging schedule for devices that minimizes carbon emissions subject to a set of constraints. It is based on Watttime’s forecast of marginal emissions combined with inputs related to device capacity and energy needs. The project presents a few optimization algorithms that operate under different assumptions and produce different results. This optionality is part of the API and the results of different algorithms presented are evaluated using actual and forecasted data from power grids in the US. The evaluation section of the project includes a suite of functions to generate synthetic user data with a few behavioral assumptions that can serve to understand the benefits and limitations of our algorithms and evaluate the magnitude of emissions that would be saved if the algorithm were used. + +* **Running the model with constraints:**: + * Contiguous (single period, fixed length): + + + +```py +## AI model training - estimated runtime is 2 hours and it needs to complete by 12pm + +from datetime import datetime, timedelta +import pandas as pd +from pytz import UTC +from watttime import WattTimeOptimizer +import os + +username = os.getenv("WATTTIME_USER") +password = os.getenv("WATTTIME_PASSWORD") +wt_opt = WattTimeOptimizer(username, password) + +# Suppose that the time now is 12 midnight +now = datetime.now(UTC) +window_start = now +window_end = now + timedelta(minutes=720) + +usage_time_required_minutes=120 +usage_power_kw = 12.0 +region = "CAISO_NORTH" + +usage_plan = wt_opt.get_optimal_usage_plan( + region=region, + usage_window_start=window_start, + usage_window_end=window_end, + usage_time_required_minutes=usage_time_required_minutes, + usage_power_kw=usage_power_kw, + charge_per_interval=[usage_time_required_minutes], + optimization_method="auto", +) + +print(usage_plan.head()) +print(usage_plan["usage"].tolist()) +print(usage_plan.sum()) +``` + +* Contiguous (multiple periods, fixed length): + +```py +## Dishwasher - there are two cycles of length 80 min and 40 min each, and they must be completed in that order. + +from datetime import datetime, timedelta +import pandas as pd +from pytz import UTC +from watttime import WattTimeOptimizer +import os + +username = os.getenv("WATTTIME_USER") +password = os.getenv("WATTTIME_PASSWORD") +wt_opt = WattTimeOptimizer(username, password) + +# Suppose that the time now is 12 midnight +now = datetime.now(UTC) +window_start = now +window_end = now + timedelta(minutes=720) + +usage_time_required_minutes=120 +usage_power_kw = 12.0 +region = "CAISO_NORTH" + +usage_plan = wt_opt.get_optimal_usage_plan( + region=region, + usage_window_start=window_start, + usage_window_end=window_end, + usage_time_required_minutes=usage_time_required_minutes, + usage_power_kw=usage_power_kw, + charge_per_interval=[80,40], + optimization_method="auto", +) + +print(usage_plan.head()) +print(usage_plan["usage"].tolist()) +print(usage_plan.sum()) +``` + + * Contiguous (multiple periods, variable length): + + + +```py +## Compressor - needs to run 120 minutes over the next 12 hours; each cycle needs to be at least 20 minutes long, and any number of contiguous intervals (from one to six) is okay. + +from datetime import datetime, timedelta +import pandas as pd +from pytz import UTC +from watttime import WattTimeOptimizer +import os + +username = os.getenv("WATTTIME_USER") +password = os.getenv("WATTTIME_PASSWORD") +wt_opt = WattTimeOptimizer(username, password) + +# Suppose that the time now is 12 midnight +now = datetime.now(UTC) +window_start = now +window_end = now + timedelta(minutes=720) + +usage_time_required_minutes=120 +usage_power_kw = 12.0 +region = "CAISO_NORTH" + +usage_plan = wt_opt.get_optimal_usage_plan( + region=region, + usage_window_start=window_start, + usage_window_end=window_end, + usage_time_required_minutes=usage_time_required_minutes, + usage_power_kw=usage_power_kw, + # Here _None_ implies that there is no upper bound, and replacing None by 120 would have the exact same effect. + charge_per_interval=[(20,None),(20,None),(20,None),(20,None),(20,None),(20,None)], + optimization_method="auto", + use_all_intervals=False +) + +print(usage_plan.head()) +print(usage_plan["usage"].tolist()) +print(usage_plan.sum()) +``` + +* Partial charging guarantee: + +```py +## I would like to charge 75% by 8am in case of any emergencies (airport, kid bus, roadtrip) + +from datetime import datetime, timedelta +import pandas as pd +from pytz import UTC +from watttime import WattTimeOptimizer +import os + +username = os.getenv("WATTTIME_USER") +password = os.getenv("WATTTIME_PASSWORD") +wt_opt = WattTimeOptimizer(username, password) + +# Suppose that the time now is 12 midnight +now = datetime.now(UTC) +window_start = now +window_end = now + timedelta(minutes=720) +usage_time_required_minutes = 240 +constraint_time = now + timedelta(minutes=480) +constraint_usage_time_required_minutes = 180 +constraints = {constraint_time:constraint_usage_time_required_minutes} +usage_power_kw = 12.0 +region = "CAISO_NORTH" + +usage_plan = wt_opt.get_optimal_usage_plan( + region=region, + usage_window_start=window_start, + usage_window_end=window_end, + usage_time_required_minutes=240, + usage_power_kw=usage_power_kw, + constraints=constraints, + optimization_method="auto", +) + +print(usage_plan.head()) +print(usage_plan["usage"].tolist()) +print(usage_plan.sum()) + +``` +## Optimizer: basic principles and options + +The **basic intuition of the algorithm** is that when a device is plugged in for longer than the time required to fully charge it there exist ways to pick charging vs. non-charging time intervals such that the device draws power from the grid during cleaner intervals and thus minimizes emissions. **The algorithm takes as inputs** user and device parameters such as: the plug-in and plug-out times of the device, as well as the charging curve that determines the time it takes to charge it as well as the power that it needs to draw. **As an output, it produces** a charging schedule that divides the time between plug-in and plug-out time into charging and non-charging intervals such that emissions are minimized. Watttime’s forecast provides the basic building block for these algorithms as it forecasts when those relatively cleaner grid periods occur. + +There are **three different optimization algorithms** that are implemented in the API (alongside a baseline algorithm that just charges the device from the moment it’s plugged in to when it is fully charged, which is what devices do out of the box). We first start with **a simple algorithm** that, under full information about plug out time, uses the forecast to find the lowest possible emission interval that charges the device and outputs a charge schedule based on that. We then follow with a **sophisticated** version of the algorithm which takes into account variable charging curves and implements a dynamic optimization algorithm to adjust for the fact that device charging curves are non-linear. We provide additional functionality in the **fixed contiguous** and **contiguous** versions of the algorithms, which can enforce the charging schedule to be composed of several contiguous intervals; the length of each interval is either fixed or falls in a provided range. + +| optimization\_method | ASAP | Charging curve | Time constraint | Contiguous | +| :---- | :---- | :---- | :---- | :---- | +| baseline | Yes | Constant | No | No | +| simple | No | Constant | No | No | +| sophisticated | No | Variable | Yes | No | +| contiguous | No | Variable | Yes | Intervals at fixed lengths | +| Variable contiguous | No | Variable | Yes | Intervals at variable lengths | +| auto | No | Chooses the fastest algorithm that can still process all inputs | | | + +Evaluating the effectiveness of the algorithm, as well as the conditions that maximize emissions savings, we have implemented a suite of functions that generate synthetic user data that can be evaluated on data from the largest electrical grids in the US. The code here also contains these functions, which can be modified and are meant to capture behavioral assumptions of how users charge devices. + +A final note on device types (this is focused for now on EVs, but altering some of the behavioral assumptions of usage \+ the device charging curves can extend this functionality to other devices.) + +### Raw Inputs + +*What we simulate for each use case* + +- Capacity C + - Might also need init battery capacity if we don’t start from 0% + - Unit: kWh + - Type: energy +- Power usage curve from capacity Q:cp + - Marginal power usage to charge battery when it’s currently at capacity c + - Unit: kW + - Type: power +- Marginal emission rate M:tm + - Unit: lb/MWh + - Type: emission per energy + - We convert this to lb/kWh by multiplying M by 0.001 +- OPT\_INTERVAL + - Smallest interval on which we have constant charging behavior and emissions. + - Currently set to 5 minutes + - We have now discretized time into L=T/ intervals of length . The l-th interval is lt\<(l+1). +- Contiguity +- Constraints + +### API Inputs + +*When calling the API* + +- usage\_time\_required\_minutes Tr + - We compute this using C and Q. See example below. + - Unit: mins +- usage\_power\_kw P:tp + - Marginal power usage to charge battery when it has been charged for t minutes. Converted from Q. + - Unit: kW +- usage\_window\_start, usage\_window\_end + - These are timestamps to specify the charging window + +### Algorithm + +Find schedule s0,...,sL-1 that minimizes total emission 60l=0L-1slPl'=0l-1sl' Ml1000subject to + +* sl0,1 + * sl=1 if we charge on interval l + * sl=0 if we do not charge on interval l + * As an extension for future use cases, suppose we can supercharge the battery by consuming up to K times as much power. The DP algorithm will also be able to handle this optimization and output a schedule with sl0,1,...,K +* l=0L-1sl=Tr + * This just means that we charge for a total of Tr minutes according to this schedule. + +### API Output + + A data frame with \-spaced timestamp index and charging usage. For example, + +| time | usage (min) | energe\_use\_mwh | emissions\_co2e\_lb | +| :---- | :---- | :---- | :---- | +| 2024-7-26 15:00:00+00:00 | 5 | 0.0001 | 1.0 | +| 2024-7-26 15:05:00+00:00 | 5 | 0.0001 | 1.0 | +| 2024-7-26 15:10:00+00:00 | 0 | 0\. | 0.0 | +| 2024-7-26 15:15:00+00:00 | 5 | 0.00005 | 0.5 | + +This would mean that we charge from 15:00-15:10 and then from 15:15-15:20. Note that the last column reflects **forecast emissions** based on forecast emission rate rather than the actuals. To compute actual emissions, you can take the dot product of energey\_use\_mwh and actual emission rates. + +In mathematical terms, we compute these three columns as follows. For the l-th interval tl,l+1, we have + +* usage: sl +* energy\_use\_mwh: 1100060slPl'=0l-1sl', where 60 reflects interval length in hours and Pl'=0l-1sl' is the power. 11000 is a conversion factor from kWh to mWh +* emissions\_co2e\_lb: energy\_us\_mwh \* Mt. + +# FAQs + +## How much co2 can we expect to avoid by using the optimizer? + +The amount of emission you can avoid will vary significantly based on a range of factors. For example: + +* The grid where the charging is occurring. +* The amount of “slack time” available, that is, possible charging time beyond the minimum amount required for charging. \ No newline at end of file diff --git a/watttime/optimizer/alg/__init__.py b/watttime/optimizer/alg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/watttime/optimizer/alg/moer.py b/watttime/optimizer/alg/moer.py new file mode 100644 index 00000000..4ec3c8e3 --- /dev/null +++ b/watttime/optimizer/alg/moer.py @@ -0,0 +1,128 @@ +# moer.py + +import numpy as np + + +class Moer: + """ + Represents Marginal Operating Emissions Rate (MOER) for electricity grid emissions modeling. + + This class handles calculations related to emissions and utilities based on + MOER data, supporting both diagonal and non-diagonal penalty matrices. + + Attributes: + ----------- + __mu : numpy.ndarray + Mean emissions rate for each time step. + __T : int + Total number of time steps. + + Methods: + -------- + __len__() + Returns the number of time steps. + get_emission_at(i, usage) + Calculates emission at a specific time step. + get_emission_interval(start, end, usage) + Calculates sum of emissions for a time interval. + get_emissions(x) + Calculates emissions per interval for a given schedule. + get_total_emission(x) + Calculates total emission for a given schedule. + + """ + + def __init__(self, mu): + """ + Initializes the Moer object. + + Parameters: + ----------- + mu : array-like + Emissions rate for each time step. + """ + self.__mu = np.array(mu).flatten() + self.__T = self.__mu.shape[0] + + def __len__(self): + """ + Returns the length of the time series. + + Returns: + -------- + int + The number of time steps in the series. + """ + return self.__T + + def get_emission_at(self, i, usage): + """ + Calculates the emission at a specific time step. + + Parameters: + ----------- + i : int + The time step index. + usage : float, optional + The power usage. + + Returns: + -------- + float + The calculated emission value. + """ + return self.__mu[i] * usage + + def get_emission_interval(self, start, end, usage): + """ + Calculates emissions for a given time interval. + + Parameters: + ----------- + start : int + The start index of the interval. + end : int + The end index of the interval. + usage : float, optional + The emission multiplier. Default is 1. + + Returns: + -------- + numpy.ndarray + An array of emission values for the specified interval. + """ + return np.dot(self.__mu[start:end], usage) + + def get_emissions(self, usage): + """ + Calculates emissions for a given set of emission multipliers. + + Parameters: + ----------- + usage : array-like + The emission multipliers. + + Returns: + -------- + numpy.ndarray + An array of calculated emission values. + """ + usage = np.array(usage).flatten() + return self.__mu[: usage.shape[0]] * usage + + def get_total_emission(self, usage): + """ + Calculates the total emission for a given set of emission multipliers. + + Parameters: + ----------- + usage : array-like + The emission multipliers. + + Returns: + -------- + float + The total calculated emission. + """ + usage = np.array(usage).flatten() + return np.dot(self.__mu[: usage.shape[0]], usage) diff --git a/watttime/optimizer/alg/optCharger.py b/watttime/optimizer/alg/optCharger.py new file mode 100644 index 00000000..0489fddc --- /dev/null +++ b/watttime/optimizer/alg/optCharger.py @@ -0,0 +1,724 @@ +# optCharger.py +import numpy as np +from .moer import Moer + +TOL = 1e-4 # tolerance +EMISSION_FN_TOL = 1e-9 # emissions functions tolerance in kw + + +class OptCharger: + """ + Represents an Optimal Charger for managing charging schedules. + + This class handles the optimization of charging schedules based on various parameters + such as charge rates, emission overheads, and other constraints. + + Methods: + -------- + __init__() + Initializes the OptCharger object with the given parameters. + """ + + def __init__(self): + """ + Initializes the OptCharger object. + """ + self.__optimal_charging_emission = None + self.__optimal_charging_schedule = None + + def __collect_results(self, moer: Moer): + """ + Translates the optimal charging schedule into a series of emission multiplier values and calculates various emission-related metrics. + + This function processes the optimal charging schedule to generate emission multipliers, + calculates energy and emissions over time, and computes the total emissions including + overhead from starting, stopping, and maintaining the charging process. + + Parameters: + ----------- + moer : Moer + An object representing Marginal Operating Emissions Rate, used for emissions calculations. + + Returns: + -------- + None + The function updates several instance variables with the calculated results. + + Side Effects: + ------------- + Updates the following instance variables: + - __optimal_charging_energy_over_time + - __optimal_charging_emissions_over_time + - __optimal_charging_emission + + The function also populates the emission_multipliers list, which is used in the calculations. + """ + emission_multipliers = [] + current_charge_time_units = 0 + for i in range(len(self.__optimal_charging_schedule)): + if self.__optimal_charging_schedule[i] == 0: + emission_multipliers.append(0.0) + else: + old_charge_time_units = current_charge_time_units + current_charge_time_units += self.__optimal_charging_schedule[i] + power_rate = self.emission_multiplier_fn( + old_charge_time_units, current_charge_time_units + ) + emission_multipliers.append(power_rate) + + self.__optimal_charging_energy_over_time = np.array( + self.__optimal_charging_schedule + ) * np.array(emission_multipliers) + self.__optimal_charging_emissions_over_time = moer.get_emissions( + self.__optimal_charging_energy_over_time + ) + self.__optimal_charging_emission = ( + self.__optimal_charging_emissions_over_time.sum() + ) + + @staticmethod + def __sanitize_emission_multiplier(emission_multiplier_fn, total_charge): + """ + Sanitizes the emission multiplier function to handle edge cases and ensure valid outputs. + + This function wraps the original emission_multiplier_fn to handle cases where the + end charge (ec) exceeds the total charge or when the start charge (sc) is beyond + the total charge limit. + + Parameters: + ----------- + emission_multiplier_fn : callable + The original emission multiplier function to be sanitized. + total_charge : int or float + The maximum total charge value. + + Returns: + -------- + callable + A new lambda function that sanitizes the inputs before calling the original + emission_multiplier_fn. + + Behavior: + --------- + - If sc < total_charge: + - Calls the original function with ec capped at total_charge. + - If sc >= total_charge: + - Returns 1.0, assuming no additional emissions beyond total charge. + + Note: + ----- + This function is useful for preventing out-of-bounds errors and ensuring + consistent behavior when dealing with charge values near or beyond the total + charge limit. + """ + return lambda sc, ec: ( + emission_multiplier_fn(sc, min(ec, total_charge)) + if (sc < total_charge) + else 0.0 + ) + + @staticmethod + def __check_constraint(t_start, c_start, dc, constraints): + # assuming constraints[t] is the bound on total charge after t intervals + for t in range(t_start + 1, t_start + dc): + if (t in constraints) and ( + (c_start + t - t_start < constraints[t][0]) + or (c_start + t - t_start > constraints[t][1]) + ): + return False + return True + + def __greedy_fit(self, total_charge: int, total_time: int, moer: Moer): + """ + Performs a "greedy" fit for charging schedule optimization. + + It charges at the maximum possible rate until the total charge is reached or + the time limit is hit. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + + Calls __collect_results to process the results. + """ + print("== Baseline fit! ==") + schedule = [1] * min(total_charge, total_time) + [0] * max( + 0, total_time - total_charge + ) + self.__optimal_charging_schedule = schedule + self.__collect_results(moer) + + def __simple_fit(self, total_charge: int, total_time: int, moer: Moer): + """ + Performs a "simple" fit for charging schedule optimization. + + This method implements a straightforward optimization strategy. It sorts + time intervals by MOER (Marginal Operating Emissions Rate) and charges + during the cleanest intervals until the total charge is reached. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + + Calls __collect_results to process the results. + """ + print("== Simple fit! ==") + sorted_times = np.argsort(moer.get_emission_interval(0, total_time, 1)) + + charge_to_do = total_charge + schedule, t = [0] * total_time, 0 + while (charge_to_do > 0) and (t < total_time): + charge_to_do -= 1 + schedule[sorted_times[t]] = 1 + t += 1 + self.__optimal_charging_schedule = schedule + self.__collect_results(moer) + + def __diagonal_fit( + self, + total_charge: int, + total_time: int, + moer: Moer, + emission_multiplier_fn, + constraints: dict = {}, + ): + """ + Performs a sophisticated diagonal fit for charging schedule optimization using dynamic programming. + + This method implements a more complex optimization strategy using dynamic programming. + It considers various factors such as emission rates, charging constraints, and overhead costs + to find an optimal charging schedule. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + emission_multiplier_fn : callable + A function that calculates emission multipliers. + constraints : dict, optional + A dictionary of charging constraints for specific time steps. + + Calls __collect_results to process the results. + + Raises: + ------- + Exception + If no valid solution is found. + """ + print("== Sophisticated fit! ==") + # This is a matrix with size = number of charge states x number of actions {not charging = 0, charging = 1} + max_util = np.full((total_charge + 1), np.nan) + max_util[0] = 0.0 + path_history = np.full((total_time, total_charge + 1), -1, dtype=int) + for t in range(1, total_time + 1): + if t in constraints: + min_charge, max_charge = constraints[t] + min_charge = 0 if min_charge is None else max(0, min_charge) + max_charge = ( + total_charge + if max_charge is None + else min(max_charge, total_charge) + ) + else: + min_charge, max_charge = 0, total_charge + # print("=== Time step", t, "===") + new_max_util = np.full(max_util.shape, np.nan) + # print("min_charge, max_charge =",min_charge,max_charge) + for c in range(min_charge, max_charge + 1): + ## not charging + init_val = True + if not np.isnan(max_util[c]): + new_max_util[c] = max_util[c] + path_history[t - 1, c] = c + init_val = False + ## charging + if (c > 0) and not np.isnan(max_util[c - 1]): + # moer.get_emission_at gives lbs/MWh. emission function needs to be how many MWh the interval consumes + # which would be power_in_kW * 0.001 * 5/60 + new_util = max_util[c - 1] - moer.get_emission_at( + t - 1, emission_multiplier_fn(c - 1, c) + ) + if init_val or (new_util > new_max_util[c]): + new_max_util[c] = new_util + path_history[t - 1, c] = c - 1 + init_val = False + max_util = new_max_util + + if np.isnan(max_util[total_charge]): + raise Exception( + "Solution not found! Please check that constraints are satisfiable." + ) + curr_state, t_curr = total_charge, total_time + + schedule_reversed = [] + schedule_reversed.append(curr_state) + while t_curr > 0: + curr_state = path_history[t_curr - 1, curr_state] + schedule_reversed.append(curr_state) + t_curr -= 1 + optimal_path = np.array(schedule_reversed)[::-1] + self.__optimal_charging_schedule = list(np.diff(optimal_path)) + self.__collect_results(moer) + + def __contiguous_fit( + self, + total_charge: int, + total_time: int, + moer: Moer, + emission_multiplier_fn, + charge_per_interval: list = [], + constraints: dict = {}, + ): + """ + Performs a contiguous fit for charging schedule optimization using dynamic programming. + + This method implements a sophisticated optimization strategy that considers contiguous + charging intervals. It uses dynamic programming to find an optimal charging schedule + while respecting the specified length of each charging interval. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + emission_multiplier_fn : callable + A function that calculates emission multipliers. + charge_per_interval : list of int + The exact charging amount per interval. + constraints : dict, optional + A dictionary of charging constraints for specific time steps. Constraints are one-indexed: t:(a,b) means that after t minutes, we have to have charged for between a and b minutes inclusive, so that 1<=t<=total_time + + Calls __collect_results to process the results. + + Raises: + ------- + Exception + If no valid solution is found. + + Note: + ----- + This is the __diagonal_fit() algorithm with further constraint on contiguous charging intervals and their respective length + """ + print("== Fixed contiguous fit! ==") + # print("Charge per interval constraints:", charge_per_interval) + total_interval = len(charge_per_interval) + # This is a matrix with size = number of time states x number of intervals charged so far + max_util = np.full((total_time + 1, total_interval + 1), np.nan) + max_util[0, 0] = 0.0 + path_history = np.full((total_time, total_interval + 1), False, dtype=bool) + cum_charge = [0] + for c in charge_per_interval: + cum_charge.append(cum_charge[-1] + c) + + charge_array_cache = [ + emission_multiplier_fn(x, x + 1) for x in range(0, total_charge + 1) + ] + print("Cumulative charge", cum_charge) + for t in range(1, total_time + 1): + if t in constraints: + min_charge, max_charge = constraints[t] + min_charge = 0 if min_charge is None else max(0, min_charge) + max_charge = ( + total_charge + if max_charge is None + else min(max_charge, total_charge) + ) + constraints[t] = (min_charge, max_charge) + else: + min_charge, max_charge = 0, total_charge + for k in range(0, total_interval + 1): + # print(t,k) + ## not charging + init_val = True + if not np.isnan(max_util[t - 1, k]): + max_util[t, k] = max_util[t - 1, k] + init_val = False + ## charging + if (k > 0) and (charge_per_interval[k - 1] <= t): + dc = charge_per_interval[k - 1] + if not np.isnan( + max_util[t - dc, k - 1] + ) and OptCharger.__check_constraint( + t - dc, cum_charge[k - 1], dc, constraints + ): + marginal_cost = moer.get_emission_interval( + t - dc, + t, + charge_array_cache[cum_charge[k - 1] : cum_charge[k]], + ) + new_util = max_util[t - dc, k - 1] - marginal_cost + if init_val or (new_util > max_util[t, k]): + max_util[t, k] = new_util + path_history[t - 1, k] = True + init_val = False + + if np.isnan(max_util[total_time, total_interval]): + raise Exception( + "Solution not found! Please check that constraints are satisfiable." + ) + curr_state, t_curr = total_interval, total_time + + schedule_reversed = [] + interval_ids_reversed = [] + while t_curr > 0: + delta_interval = path_history[t_curr - 1, curr_state] + if not delta_interval: + ## did not charge + schedule_reversed.append(0) + interval_ids_reversed.append(-1) + t_curr -= 1 + else: + ## charge + dc = charge_per_interval[curr_state - 1] + t_curr -= dc + curr_state -= 1 + if dc > 0: + schedule_reversed.extend([1] * dc) + interval_ids_reversed.extend([curr_state] * dc) + optimal_path = np.array(schedule_reversed)[::-1] + self.__optimal_charging_schedule = list(optimal_path) + self.__interval_ids = list(interval_ids_reversed[::-1]) + self.__collect_results(moer) + + def __variable_contiguous_fit( + self, + total_charge: int, + total_time: int, + moer: Moer, + emission_multiplier_fn, + charge_per_interval: list = [], + use_all_intervals: bool = True, + constraints: dict = {}, + ): + """ + Performs a contiguous fit for charging schedule optimization using dynamic programming. + + This method implements a sophisticated optimization strategy that considers contiguous + charging intervals. It uses dynamic programming to find an optimal charging schedule + while respecting constraints on the length of each charging interval. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + emission_multiplier_fn : callable + A function that calculates emission multipliers. + charge_per_interval : list of (int, int) + The minimium and maximum (inclusive) charging amount per interval. + use_all_intervals : bool + If true, use all intervals provided by charge_per_interval; if false, can use the first few intervals and skip the rest. + constraints : dict, optional + A dictionary of charging constraints for specific time steps. Constraints are one-indexed: t:(a,b) means that after t minutes, we have to have charged for between a and b minutes inclusive, so that 1<=t<=total_time + + Calls __collect_results to process the results. + + Raises: + ------- + Exception + If no valid solution is found. + + Note: + ----- + This is the __diagonal_fit() algorithm with further constraint on contiguous charging intervals and their respective length + """ + print("== Variable contiguous fit! ==") + total_interval = len(charge_per_interval) + # This is a matrix with size = number of time states x number of charge states x number of intervals charged so far + max_util = np.full( + (total_time + 1, total_charge + 1, total_interval + 1), np.nan + ) + max_util[0, 0, 0] = 0.0 + path_history = np.full( + (total_time, total_charge + 1, total_interval + 1, 2), 0, dtype=int + ) + + charge_array_cache = [ + emission_multiplier_fn(x, x + 1) for x in range(0, total_charge + 1) + ] + + for t in range(1, total_time + 1): + if t in constraints: + min_charge, max_charge = constraints[t] + min_charge = 0 if min_charge is None else max(0, min_charge) + max_charge = ( + total_charge + if max_charge is None + else min(max_charge, total_charge) + ) + constraints[t] = (min_charge, max_charge) + else: + min_charge, max_charge = 0, total_charge + for k in range(0, total_interval + 1): + for c in range(min_charge, max_charge + 1): + ## not charging + init_val = True + if not np.isnan(max_util[t - 1, c, k]): + max_util[t, c, k] = max_util[t - 1, c, k] + path_history[t - 1, c, k, :] = [0, 0] + init_val = False + ## charging + if k > 0: + for dc in range( + charge_per_interval[k - 1][0], + min(charge_per_interval[k - 1][1], t, c) + 1, + ): + if not np.isnan( + max_util[t - dc, c - dc, k - 1] + ) and OptCharger.__check_constraint( + t - dc, c - dc, dc, constraints + ): + marginal_cost = moer.get_emission_interval( + t - dc, t, charge_array_cache[c - dc : c] + ) + new_util = ( + max_util[t - dc, c - dc, k - 1] - marginal_cost + ) + if init_val or (new_util > max_util[t, c, k]): + max_util[t, c, k] = new_util + path_history[t - 1, c, k, :] = [dc, 1] + init_val = False + optimal_interval, optimal_util = ( + total_interval, + max_util[total_time, total_charge, total_interval], + ) + if not use_all_intervals: + for k in range(0, total_interval): + if np.isnan(max_util[total_time, total_charge, optimal_interval]) or ( + not np.isnan(max_util[total_time, total_charge, k]) + and max_util[total_time, total_charge, k] + > max_util[total_time, total_charge, optimal_interval] + ): + optimal_interval = k + if np.isnan(max_util[total_time, total_charge, optimal_interval]): + raise Exception( + "Solution not found! Please check that constraints are satisfiable." + ) + curr_state, t_curr = [total_charge, optimal_interval], total_time + + schedule_reversed = [] + interval_ids_reversed = [] + while t_curr > 0: + dc, delta_interval = path_history[ + t_curr - 1, curr_state[0], curr_state[1], : + ] + if delta_interval == 0: + ## did not charge + schedule_reversed.append(0) + interval_ids_reversed.append(-1) + t_curr -= 1 + else: + ## charge + t_curr -= dc + curr_state = [curr_state[0] - dc, curr_state[1] - delta_interval] + if dc > 0: + schedule_reversed.extend([1] * dc) + interval_ids_reversed.extend([curr_state[1]] * dc) + optimal_path = np.array(schedule_reversed)[::-1] + self.__optimal_charging_schedule = list(optimal_path) + self.__interval_ids = list(interval_ids_reversed[::-1]) + self.__collect_results(moer) + + def fit( + self, + total_charge: int, + total_time: int, + moer: Moer, + charge_per_interval=None, + use_all_intervals: bool = True, + constraints: dict = {}, + emission_multiplier_fn=None, + optimization_method: str = "auto", + ): + """ + Fits an optimal charging schedule based on the given parameters and constraints. + + This method serves as the main entry point for the charging optimization process. + It selects the appropriate optimization method based on the input parameters and + constraints. + + Parameters: + ----------- + total_charge : int + The total amount of charge needed. + total_time : int + The total time available for charging. + moer : Moer + An object representing Marginal Operating Emissions Rate. + charge_per_interval : list of int or (int,int), optional + The minimium and maximum (inclusive) charging amount per interval. If int instead of tuple, interpret as both min and max. + use_all_intervals : bool + If true, use all intervals provided by charge_per_interval; if false, can use the first few intervals and skip the rest. This can only be false if charge_per_interval is provided as a range. + constraints : dict, optional + A dictionary of charging constraints for specific time steps. + emission_multiplier_fn : callable, optional + A function that calculates emission multipliers. If None, assumes constant 1kW power usage. + optimization_method : str, optional + The optimization method to use. Can be 'auto', 'baseline', 'simple', or 'sophisticated'. + Default is 'auto'. + + Raises: + ------- + Exception + If the charging task is impossible given the constraints, or if an unsupported + optimization method is specified. + + Note: + ----- + This method chooses between different optimization strategies based on the input + parameters and the characteristics of the problem. + """ + assert len(moer) >= total_time + assert optimization_method in ["baseline", "simple", "sophisticated", "auto"] + + if emission_multiplier_fn is None: + print( + "Warning: No emission_multiplier_fn given. Assuming that device uses constant 1kW of power" + ) + emission_multiplier_fn = lambda sc, ec: 1.0 + constant_emission_multiplier = True + else: + constant_emission_multiplier = ( + np.std( + [ + emission_multiplier_fn(sc, sc + 1) + for sc in list(range(total_charge)) + ] + ) + < EMISSION_FN_TOL + ) + self.emission_multiplier_fn = emission_multiplier_fn + + if total_charge > total_time: + raise Exception( + f"Solution not found! Impossible to charge {total_charge} within {total_time} intervals." + ) + if optimization_method == "baseline": + self.__greedy_fit(total_charge, total_time, moer) + elif ( + not constraints + and not charge_per_interval + and constant_emission_multiplier + and optimization_method == "auto" + ) or (optimization_method == "simple"): + if not constant_emission_multiplier: + print( + "Warning: Emissions function is non-constant. Using the simple algorithm is suboptimal." + ) + self.__simple_fit(total_charge, total_time, moer) + elif not charge_per_interval: + self.__diagonal_fit( + total_charge, + total_time, + moer, + OptCharger.__sanitize_emission_multiplier( + emission_multiplier_fn, total_charge + ), + constraints, + ) + else: + # cpi stands for charge per interval + single_cpi, tuple_cpi, use_fixed_alg = [], [], True + + def convert_input(c): + ## Converts the interval format + if isinstance(c, int): + return c, (c, c), True + if c[0] == c[1]: + return c[0], c, True + return None, c, False + + for c in charge_per_interval: + if use_fixed_alg: + sc, tc, use_fixed_alg = convert_input(c) + single_cpi.append(sc) + tuple_cpi.append(tc) + else: + tuple_cpi.append(convert_input(c)[1]) + if use_fixed_alg: + assert ( + use_all_intervals + ), "Must use all intervals when interval lengths are fixed!" + self.__contiguous_fit( + total_charge, + total_time, + moer, + OptCharger.__sanitize_emission_multiplier( + emission_multiplier_fn, total_charge + ), + single_cpi, + constraints, + ) + else: + self.__variable_contiguous_fit( + total_charge, + total_time, + moer, + OptCharger.__sanitize_emission_multiplier( + emission_multiplier_fn, total_charge + ), + tuple_cpi, + use_all_intervals, + constraints, + ) + + def get_energy_usage_over_time(self) -> list: + """ + Returns list of the energy due to charging at each interval in MWh. + """ + return self.__optimal_charging_energy_over_time + + def get_charging_emissions_over_time(self) -> list: + """ + Returns list of the emissions due to charging at each interval in lbs. + """ + return self.__optimal_charging_emissions_over_time + + def get_total_emission(self) -> float: + """ + Returns the summed emissions due to charging in lbs. + """ + return self.__optimal_charging_emission + + def get_schedule(self) -> list: + """ + Returns list of the optimal charging schedule of units to charge for each interval. + """ + return self.__optimal_charging_schedule + + def get_interval_ids(self) -> list: + """ + Returns list of the interval ids for each interval. Has a value of -1 for non-charging intervals. + Intervals are labeled starting from 0 to n-1 when there are n intervals + + Only defined when charge_per_interval variable is given to some fit function + """ + return self.__interval_ids + + def summary(self): + print("-- Model Summary --") + print( + "Expected charging emissions: %.2f lbs" % self.__optimal_charging_emission + ) + print("Optimal charging schedule:", self.__optimal_charging_schedule) + print("=" * 15) diff --git a/watttime/optimizer/test.py b/watttime/optimizer/test.py new file mode 100644 index 00000000..02784b54 --- /dev/null +++ b/watttime/optimizer/test.py @@ -0,0 +1,149 @@ +from alg import moer, optCharger + +model = optCharger.OptCharger() + +m = moer.Moer( + mu=[10, 10, 10, 1, 13, 3, 2, 3], +) +print("Length of schedule:", len(m)) + +print("greedy algo") +model.fit(total_charge=3, total_time=8, moer=m, optimization_method="baseline") +model.summary() +print("simple sorting algo") +model.fit( + total_charge=3, + total_time=8, + moer=m, +) +model.summary() +print("sophisticated algo that produces same answer as simple") +model.fit(total_charge=3, total_time=8, moer=m, optimization_method="sophisticated") +model.summary() +print("incorrect pairing of simple sorting algo + variable charge rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + emission_multiplier_fn=lambda x, y: [1.0, 2.0, 1.0][x], + optimization_method="simple", +) +model.summary() +print("sophisticated algo + variable charge rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + emission_multiplier_fn=lambda x, y: [1.0, 2.0, 1.0][x], +) +model.summary() +print("sophisticated algo + constraints") +model.fit(total_charge=3, total_time=8, moer=m, constraints={2: (2, None)}) +model.summary() + +m = moer.Moer( + mu=[2, 1, 10, 10, 10, 1, 13, 3], +) + +# Fixed Contiguous +print("One contiguous interval") +model.fit(total_charge=3, total_time=8, moer=m, charge_per_interval=[3]) +model.summary() +print("Two contiguous intervals") +model.fit(total_charge=3, total_time=8, moer=m, charge_per_interval=[2, 1]) +model.summary() +print("Two contiguous intervals, one of which given as intervals + variable power rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + charge_per_interval=[(2, 2), 1], + emission_multiplier_fn=lambda x, y: [1.0, 0.1, 1.0][x], +) +model.summary() +print("Two contiguous intervals, one of which given as intervals + variable power rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + charge_per_interval=[2, (1, 1)], + emission_multiplier_fn=lambda x, y: [1.0, 0.1, 1.0][x], +) +model.summary() +print("Two contiguous intervals, one of which given as intervals + variable power rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + charge_per_interval=[(2, 2), (1, 1)], + emission_multiplier_fn=lambda x, y: [1.0, 0.1, 1.0][x], +) +model.summary() +print("Two contiguous intervals + variable power rate + constraints") +model.fit( + total_charge=4, + total_time=8, + moer=m, + charge_per_interval=[3, 1], + constraints={2: (None, 1), 5: (3, None)}, +) +model.summary() + +# Variable Contiguous +print("One contiguous interval") +model.fit(total_charge=3, total_time=8, moer=m, charge_per_interval=[(0, 3)]) +model.summary() +print("Two contiguous intervals") +model.fit(total_charge=3, total_time=8, moer=m, charge_per_interval=[(1, 2), (0, 3)]) +model.summary() +print("Two contiguous intervals + variable power rate") +model.fit( + total_charge=3, + total_time=8, + moer=m, + charge_per_interval=[(1, 2), (1, 2)], + emission_multiplier_fn=lambda x, y: [1.0, 0.1, 1.0][x], +) +model.summary() +print("Two contiguous intervals + variable power rate") +model.fit(total_charge=4, total_time=8, moer=m, charge_per_interval=[(1, 3), (0, 3)]) +model.summary() +print("Two contiguous intervals + variable power rate + constraints") +model.fit( + total_charge=4, + total_time=8, + moer=m, + charge_per_interval=[(1, 3), (0, 3)], + constraints={2: (None, 1)}, +) +model.summary() +print("Two contiguous intervals + variable power rate + constraints") +model.fit( + total_charge=4, + total_time=8, + moer=m, + charge_per_interval=[(1, 3), (0, 3)], + constraints={2: (None, 1), 5: (3, None)}, +) +model.summary() + +m = moer.Moer( + mu=[10, 1, 1, 1, 10, 1, 1, 1], +) +print("Three contiguous intervals of fixed lengths") +model.fit(total_charge=6, total_time=8, moer=m, charge_per_interval=[2] * 3) +model.summary() +print("Three contiguous intervals of variable lengths") +model.fit(total_charge=6, total_time=8, moer=m, charge_per_interval=[(2, 6)] * 3) +model.summary() +print( + "Three contiguous intervals of variable lengths, but doesnt need to charge all intervals" +) +model.fit( + total_charge=6, + total_time=8, + moer=m, + charge_per_interval=[(2, 6)] * 3, + use_all_intervals=False, +) +model.summary()