diff --git a/codecarbon/core/cpu.py b/codecarbon/core/cpu.py index 8b50c356e..cd3772e14 100644 --- a/codecarbon/core/cpu.py +++ b/codecarbon/core/cpu.py @@ -144,11 +144,10 @@ def get_cpu_details(self) -> Dict: else: cpu_details[col_name] = cpu_data[col_name].mean() except Exception as e: - logger.info( - f"Unable to read Intel Power Gadget logged file at {self._log_file_path}\n \ - Exception occurred {e}", - exc_info=True, + logger.warning( + f"Unable to read Intel Power Gadget logged file. Exception:\n{e}" ) + logger.debug("Full error:\n", exc_info=True) return cpu_details @@ -210,10 +209,10 @@ def get_cpu_details(self) -> Dict: cpu_details[rapl_file.name] = rapl_file.power_measurement.kW except Exception as e: logger.info( - f"Unable to read Intel RAPL files at {self._rapl_files}\n \ - Exception occurred {e}", - exc_info=True, + f"Unable to read Intel RAPL files at {self._rapl_files}." + + f" Exception:\n {e}" ) + logger.debug("Full error:\n", exc_info=True) return cpu_details diff --git a/codecarbon/core/util.py b/codecarbon/core/util.py index 5ce0c79ea..ad6c2ac3a 100644 --- a/codecarbon/core/util.py +++ b/codecarbon/core/util.py @@ -13,7 +13,6 @@ def suppress(*exceptions): logger.warning( exceptions if len(exceptions) != 1 else exceptions[0], exc_info=True ) - logger.warning("stopping.") pass diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index cc26120ec..c923e67f7 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -45,7 +45,7 @@ # see: https://stackoverflow.com/questions/67202314/ # python-distinguish-default-argument-and-argument-provided-with-default-value -_sentinel = object() +_sentinel = "93dc1470-9879-43ed-b11e-158bcc932a99" class BaseEmissionsTracker(ABC): @@ -56,79 +56,6 @@ class BaseEmissionsTracker(ABC): and `CarbonTracker.` """ - def _set_from_conf( - self, var, name, default=None, return_type=None, prevent_setter=False - ): - """ - Method to standardize private argument setting. Generic flow is: - - * If a value for the variable `var` with name `name` is provided in the - __init__ constructor: set the the private attribute `self._{name}` to - that value - - * If no value is provided for `var`, i.e. `var is _sentinel` is True then - we try to assign a value to it: - - * If there is a value for `name` in the external configuration (config - files or env variables), then we use it - * Otherwise `self._{name}` is set to the `default` value - - Additionally, if `return_type` is provided and one of `float` `int` or `bool`, - the value for `self._{name}` will be parsed to this type. - - Use `prevent_setter=True` for debugging purposes only. - - Args: - var (Any): The variable's value to set as private attribute - name (str): The variable's name such that `self._{name}` will be set - to `var` - default (Any, optional): The value to use for self._name if no value - is provided in the constructor and no value is found in the external - configuration. - Defaults to None. - return_type (Any, optional): A type to parse the value to. Defaults to None. - prevent_setter (bool, optional): Whether to set the private attribute or - simply return the value. For debugging. Defaults to False. - - Returns: - [Any]: The value used for `self._{name}` - """ - # Check the hierarchical configuration has been read parsed and set. - assert hasattr(self, "_external_conf") - assert isinstance(self._external_conf, dict) - - # Store final values in _conf - if not hasattr(self, "_conf"): - self._conf = {} - - value = _sentinel - - # a value for the keyword argument `name` is provided in the constructor: - # use it - if var is not _sentinel: - value = var - else: - - # no value provided in the constructor for `name`: check in the conf - # (using the provided default value) - value = self._external_conf.get(name, default) - - # parse to `return_type` if needed - if return_type is not None: - if return_type is bool: - value = str(value).lower() == "true" - else: - assert callable(return_type) - value = return_type(value) - - # store final value - self._conf[name] = value - # set `self._{name}` to `value` - if not prevent_setter: - setattr(self, f"_{name}", value) - # return final value (why not?) - return value - def __init__( self, project_name: Optional[str] = _sentinel, @@ -199,6 +126,7 @@ def __init__( set_log_level(self._log_level) self._start_time: Optional[float] = None + self._end_time: Optional[float] = None self._last_measured_time: float = time.time() self._total_energy: Energy = Energy.from_energy(kWh=0) self._total_cpu_energy: Energy = Energy.from_energy(kWh=0) @@ -214,6 +142,8 @@ def __init__( self._measure_occurrence: int = 0 self._cloud = None self._previous_emissions = None + self._previous_duration: float = 0 + self.final_emissions_data = None if isinstance(self._gpu_ids, str): self._gpu_ids = parse_gpu_ids(self._gpu_ids) @@ -287,6 +217,79 @@ def __init__( ) self.persistence_objs.append(self._cc_api__out) + def _set_from_conf( + self, var, name, default=None, return_type=None, prevent_setter=False + ): + """ + Method to standardize private argument setting. Generic flow is: + + * If a value for the variable `var` with name `name` is provided in the + __init__ constructor: set the the private attribute `self._{name}` to + that value + + * If no value is provided for `var`, i.e. `var is _sentinel` is True then + we try to assign a value to it: + + * If there is a value for `name` in the external configuration (config + files or env variables), then we use it + * Otherwise `self._{name}` is set to the `default` value + + Additionally, if `return_type` is provided and one of `float` `int` or `bool`, + the value for `self._{name}` will be parsed to this type. + + Use `prevent_setter=True` for debugging purposes only. + + Args: + var (Any): The variable's value to set as private attribute + name (str): The variable's name such that `self._{name}` will be set + to `var` + default (Any, optional): The value to use for self._name if no value + is provided in the constructor and no value is found in the external + configuration. + Defaults to None. + return_type (Any, optional): A type to parse the value to. Defaults to None. + prevent_setter (bool, optional): Whether to set the private attribute or + simply return the value. For debugging. Defaults to False. + + Returns: + [Any]: The value used for `self._{name}` + """ + # Check the hierarchical configuration has been read parsed and set. + assert hasattr(self, "_external_conf") + assert isinstance(self._external_conf, dict) + + # Store final values in _conf + if not hasattr(self, "_conf"): + self._conf = {} + + value = _sentinel + + # a value for the keyword argument `name` is provided in the constructor: + # use it + if var is not _sentinel: + value = var + else: + + # no value provided in the constructor for `name`: check in the conf + # (using the provided default value) + value = self._external_conf.get(name, default) + + # parse to `return_type` if needed + if return_type is not None: + if return_type is bool: + value = str(value).lower() == "true" + else: + assert callable(return_type) + value = return_type(value) + + # store final value + self._conf[name] = value + # set `self._{name}` to `value` + if not prevent_setter: + setattr(self, f"_{name}", value) + # return final value (why not?) + return value + @suppress(Exception) def start(self) -> None: """ @@ -294,19 +297,64 @@ def start(self) -> None: Currently, Nvidia GPUs are supported. :return: None """ + resuming = False if self._start_time is not None: - logger.warning("Already started tracking") - return + if self._end_time is None: + logger.warning( + "You can only resume a tracker which has been started then stopped." + + " The current tracker has been started but not stopped." + + " Ignoring start() call." + ) + return + logger.info("Resuming tracker.") + resuming = True + self._previous_duration = self._previous_duration + ( + self._end_time - self._start_time + ) + self._scheduler = BackgroundScheduler() + self._scheduler.add_job( + self._measure_power, + "interval", + seconds=self._measure_power_secs, + max_instances=1, + ) + + if not resuming: + logger.info("Starting tracker.") self._last_measured_time = self._start_time = time.time() self._scheduler.start() @suppress(Exception) - def stop(self) -> Optional[float]: + def flush(self) -> Optional[float]: + """ + Write emission to disk or call the API depending of the configuration + but keep running the experiment. + :return: CO2 emissions in kgs + """ + if self._start_time is None: + logger.error("Need to first start the tracker") + return None + + # Run to calculate the power used from last + # scheduled measurement to shutdown + self._measure_power() + + emissions_data = self._prepare_emissions_data() + for persistence in self.persistence_objs: + if isinstance(persistence, CodeCarbonAPIOutput): + emissions_data = self._prepare_emissions_data(delta=True) + persistence.out(emissions_data) + + return emissions_data.emissions + + @suppress(Exception) + def stop(self) -> Optional[EmissionsData]: """ Stops tracking the experiment :return: CO2 emissions in kgs """ + logger.info("Stopping tracker") if self._start_time is None: logger.error("Need to first start the tracker") return None @@ -317,16 +365,18 @@ def stop(self) -> Optional[float]: # scheduled measurement to shutdown self._measure_power() + self._end_time = time.time() + emissions_data = self._prepare_emissions_data() for persistence in self.persistence_objs: if isinstance(persistence, CodeCarbonAPIOutput): emissions_data = self._prepare_emissions_data(delta=True) + persistence.out(emissions_data, self.final_emissions_data) - persistence.out(emissions_data) - - self.final_emissions = emissions_data.emissions - return emissions_data.emissions + self.final_emissions_data = emissions_data + # emissions are in emissions_data.emissions + return emissions_data def _prepare_emissions_data(self, delta=False) -> EmissionsData: """ @@ -375,6 +425,8 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData: cloud_region=cloud_region, ) if delta: + # sending to the API: we need to upload the relative change + # in measurements not the current absolute value if self._previous_emissions is None: self._previous_emissions = total_emissions else: @@ -386,6 +438,10 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData: # TODO : the API call succeeded self._previous_emissions = total_emissions total_emissions = delta_emissions + else: + # not sending to the API: duration has to be incremented by potential + # previous duration (if tracker was re-started) + total_emissions.duration += self._previous_duration logger.debug(total_emissions) return total_emissions diff --git a/codecarbon/output.py b/codecarbon/output.py index 0da9e0d0b..3ae792a62 100644 --- a/codecarbon/output.py +++ b/codecarbon/output.py @@ -9,7 +9,9 @@ from abc import ABC, abstractmethod from collections import OrderedDict from dataclasses import dataclass +from typing import Optional +import pandas as pd import requests # from core.schema import EmissionCreate, Emission @@ -70,7 +72,7 @@ class BaseOutput(ABC): """ @abstractmethod - def out(self, data: EmissionsData): + def out(self, data: EmissionsData, previous_data: Optional[EmissionsData] = None): pass @@ -89,18 +91,67 @@ def has_valid_headers(self, data: EmissionsData): list_of_column_names = list(dict_from_csv.keys()) return list(data.values.keys()) == list_of_column_names - def out(self, data: EmissionsData): + def out(self, data: EmissionsData, previous_data: Optional[EmissionsData] = None): file_exists: bool = os.path.isfile(self.save_file_path) if file_exists and not self.has_valid_headers(data): logger.info("Backing up old emission file") - os.rename(self.save_file_path, self.save_file_path + ".bak") + new_name = self.save_file_path + ".bak" + idx = 1 + while os.path.isfile(new_name): + new_name = self.save_file_path + f"_{idx}.bak" + idx += 1 + os.rename(self.save_file_path, new_name) file_exists = False - with open(self.save_file_path, "a+") as f: - writer = csv.DictWriter(f, fieldnames=data.values.keys()) - if not file_exists: - writer.writeheader() - writer.writerow(data.values) + if not file_exists: + df = pd.DataFrame(columns=data.values.keys()) + else: + df = pd.read_csv(self.save_file_path) + + if previous_data is None: + df = df.append(dict(data.values), ignore_index=True) + logger.info("Appending emissions data to {}".format(self.save_file_path)) + else: + loc = (df.timestamp == previous_data.timestamp) & ( + df.project_name == previous_data.project_name + ) + if len(df.loc[loc]) < 1: + message = ( + "Looking for ({}, {}) in previous emissions data (tracker was" + + " re-started) but CodeCarbon could not find a matching emissions " + + "line in {}. Appending." + ) + logger.warning( + message.format( + previous_data.timestamp, + previous_data.project_name, + self.save_file_path, + ) + ) + df = df.append(dict(data.values), ignore_index=True) + elif len(df.loc[loc]) > 1: + message = ( + "Looking for ({}, {}) in previous emissions data (tracker was" + + " re-started) but CodeCarbon found more than 1 matching emissions" + + " line in {}. Appending." + ) + logger.warning( + message.format( + previous_data.timestamp, + previous_data.project_name, + self.save_file_path, + ) + ) + df = df.append(dict(data.values), ignore_index=True) + else: + logger.info( + "Updating line ({}, {})".format( + previous_data.timestamp, previous_data.project_name + ) + ) + df.at[loc, data.values.keys()] = data.values.values() + + df.to_csv(self.save_file_path, index=False) class HTTPOutput(BaseOutput): @@ -113,7 +164,7 @@ class HTTPOutput(BaseOutput): def __init__(self, endpoint_url: str): self.endpoint_url: str = endpoint_url - def out(self, data: EmissionsData): + def out(self, data: EmissionsData, previous_data: Optional[EmissionsData] = None): try: payload = dataclasses.asdict(data) payload["user"] = getpass.getuser() @@ -140,7 +191,7 @@ def __init__(self, endpoint_url: str, experiment_id: str, api_key: str): api_key=api_key, ) - def out(self, data: EmissionsData): + def out(self, data: EmissionsData, previous_data: Optional[EmissionsData] = None): try: self.api.add_emission(dataclasses.asdict(data)) except Exception as e: diff --git a/examples/mnist.py b/examples/mnist.py index b19b9e6fb..b11e18d3e 100644 --- a/examples/mnist.py +++ b/examples/mnist.py @@ -24,5 +24,6 @@ tracker = EmissionsTracker() tracker.start() model.fit(x_train, y_train, epochs=100) -emissions: float = tracker.stop() +emissions_data = tracker.stop() +emissions: float = emissions_data.emissions print(f"Emissions: {emissions} kg") diff --git a/examples/mnist_callback.py b/examples/mnist_callback.py new file mode 100644 index 000000000..994075d5e --- /dev/null +++ b/examples/mnist_callback.py @@ -0,0 +1,45 @@ +import tensorflow as tf +from tensorflow.keras.callbacks import Callback + +from codecarbon import EmissionsTracker + +""" +This sample code show how to use CodeCarbon as a Keras Callback +to save emissions after each epoch. +""" + + +class CodeCarbonCallBack(Callback): + def __init__(self, codecarbon_tracker): + self.codecarbon_tracker = codecarbon_tracker + pass + + def on_epoch_end(self, epoch, logs=None): + self.codecarbon_tracker.flush() + + +mnist = tf.keras.datasets.mnist + +(x_train, y_train), (x_test, y_test) = mnist.load_data() +x_train, x_test = x_train / 255.0, x_test / 255.0 + + +model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(28, 28)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(10), + ] +) + +loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) + +model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"]) + +tracker = EmissionsTracker() +tracker.start() +codecarbon_cb = CodeCarbonCallBack(tracker) +model.fit(x_train, y_train, epochs=4, callbacks=[codecarbon_cb]) +emissions: float = tracker.stop() +print(f"Emissions: {emissions} kg") diff --git a/examples/mnist_context_manager.py b/examples/mnist_context_manager.py index de67a2819..8fe6aa1fd 100644 --- a/examples/mnist_context_manager.py +++ b/examples/mnist_context_manager.py @@ -26,4 +26,4 @@ def train_model(): if __name__ == "__main__": with EmissionsTracker(project_name="mnist") as tracker: model = train_model() - print(tracker.final_emissions) + print(tracker.final_emissions_data.emissions) diff --git a/examples/mnist_grid_search.py b/examples/mnist_grid_search.py index 3afbdbd61..8210a3b55 100644 --- a/examples/mnist_grid_search.py +++ b/examples/mnist_grid_search.py @@ -31,7 +31,8 @@ def main(): tracker = EmissionsTracker(project_name="mnist_grid_search") tracker.start() grid_result = grid.fit(x_train, y_train) - emissions = tracker.stop() + emissions_data = tracker.stop() + emissions = emissions_data.emissions print(f"Best Accuracy : {grid_result.best_score_} using {grid_result.best_params_}") print(f"Emissions : {emissions} kg CO₂") diff --git a/examples/mnist_random_search.py b/examples/mnist_random_search.py index e28303fa8..cfc84efe0 100644 --- a/examples/mnist_random_search.py +++ b/examples/mnist_random_search.py @@ -45,7 +45,8 @@ def main(): tracker = EmissionsTracker(project_name="mnist_random_search") tracker.start() tuner.search(x_train, y_train, epochs=10, validation_data=(x_test, y_test)) - emissions = tracker.stop() + emissions_data = tracker.stop() + emissions = emissions_data.emissions print(f"Emissions : {emissions} kg CO₂") diff --git a/tests/test_emissions_tracker.py b/tests/test_emissions_tracker.py index 2f9e682c8..03f760d7a 100644 --- a/tests/test_emissions_tracker.py +++ b/tests/test_emissions_tracker.py @@ -6,6 +6,7 @@ from pathlib import Path from unittest import mock +import numpy as np import pandas as pd import requests import responses @@ -80,7 +81,7 @@ def test_carbon_tracker_TWO_GPU_PRIVATE_INFRA_CANADA( # WHEN tracker.start() heavy_computation() - emissions = tracker.stop() + emissions = tracker.stop().emissions # THEN self.assertGreaterEqual( @@ -116,7 +117,7 @@ def raise_timeout_exception(*args, **kwargs): # WHEN tracker.start() heavy_computation(run_time_secs=2) - emissions = tracker.stop() + emissions = tracker.stop().emissions self.assertEqual(1, mocked_requests_get.call_count) self.assertIsInstance(emissions, float) self.assertAlmostEqual(1.1037980397280433e-05, emissions, places=2) @@ -386,8 +387,10 @@ def test_carbon_tracker_online_context_manager_TWO_GPU_PRIVATE_INFRA_CANADA( self.assertEqual( "https://get.geojs.io/v1/ip/geo.json", responses.calls[0].request.url ) - self.assertIsInstance(tracker.final_emissions, float) - self.assertAlmostEqual(tracker.final_emissions, 6.262572537957655e-05, places=2) + self.assertIsInstance(tracker.final_emissions_data.emissions, float) + self.assertAlmostEqual( + tracker.final_emissions_data.emissions, 6.262572537957655e-05, places=2 + ) @responses.activate def test_carbon_tracker_offline_context_manager( @@ -407,4 +410,168 @@ def test_carbon_tracker_offline_context_manager( self.assertEqual("United States", emissions_df["country_name"].values[0]) self.assertEqual("USA", emissions_df["country_iso_code"].values[0]) - self.assertIsInstance(tracker.final_emissions, float) + self.assertIsInstance(tracker.final_emissions_data.emissions, float) + + def test_offline_tracker_start_stop_start_stop( + self, + mocked_get_cloud_metadata, + mocked_get_gpu_details, + mocked_is_gpu_details_available, + mock_setup_intel_cli, + mock_log_values, + ): + tracker = OfflineEmissionsTracker( + measure_power_secs=2, country_iso_code="USA", output_dir=self.temp_path + ) + + tracker.start() + heavy_computation(run_time_secs=5) + tracker.stop() + + tracker.start() + heavy_computation(run_time_secs=5) + tracker.stop() + + def test_online_tracker_start_stop_start_stop( + self, + mocked_get_cloud_metadata, + mocked_get_gpu_details, + mocked_is_gpu_details_available, + mock_setup_intel_cli, + mock_log_values, + ): + + tracker = EmissionsTracker(measure_power_secs=1, save_to_file=False) + + tracker.start() + heavy_computation(run_time_secs=1.2) + tracker.stop() + + tracker.start() + heavy_computation(run_time_secs=1.2) + tracker.stop() + + def test_tracker_resume( + self, + mocked_get_cloud_metadata, + mocked_get_gpu_details, + mocked_is_gpu_details_available, + mock_setup_intel_cli, + mock_log_values, + ): + + run_time = 3 + pause_time = 2 + keys = [ + "timestamp", + "project_name", + "duration", + "emissions", + "energy_consumed", + ] + for tracker, name in [ + ( + OfflineEmissionsTracker( + measure_power_secs=1, + country_iso_code="USA", + output_dir=self.temp_path, + output_file="resume_emissions_offline.csv", + ), + "resume_emissions_offline.csv", + ), + ( + EmissionsTracker( + measure_power_secs=1, + output_dir=self.temp_path, + output_file="resume_emissions.csv", + ), + "resume_emissions.csv", + ), + ]: + + with self.subTest(tracker=tracker): + + path = self.temp_path / name + + tracker.start() + heavy_computation(run_time_secs=run_time) + first_data = tracker.stop() + df1 = pd.read_csv(path) + + heavy_computation(run_time_secs=pause_time) + + tracker.start() + heavy_computation(run_time_secs=run_time) + second_data = tracker.stop() + df2 = pd.read_csv(path) + + dict_df = dict(df2.iloc[-1].fillna("")) + dict_data = dict(second_data.values) + + self.assertAlmostEqual(second_data.duration, 2 * run_time, delta=0.1) + self.assertGreater(second_data.emissions, first_data.emissions) + self.assertGreater( + second_data.energy_consumed, first_data.energy_consumed + ) + self.assertEqual(len(df1), len(df2)) + for k in keys: + if isinstance(dict_df[k], (float, int)): + self.assertAlmostEqual(dict_df[k], dict_data[k]) + else: + self.assertEqual(dict_df[k], dict_data[k]) + + def test_tracker_resume_update_row( + self, + mocked_get_cloud_metadata, + mocked_get_gpu_details, + mocked_is_gpu_details_available, + mock_setup_intel_cli, + mock_log_values, + ): + + n_trackers = 3 + n_loops = 3 + + trackers = [ + EmissionsTracker( + measure_power_secs=1, + output_dir=self.temp_path, + output_file="resume_emissions_update_row.csv", + log_level="error", + ) + for _ in range(n_trackers) + ] + + for i in range(n_loops): + + print("\nLoop", i) + + for t, tracker in enumerate(trackers): + print("Tracker", t) + tracker.start() + heavy_computation(2) + tracker.stop() + + id_tracker = np.random.randint(0, n_trackers) + tracker = trackers[id_tracker] + keys = [ + "timestamp", + "project_name", + "duration", + "emissions", + "energy_consumed", + ] + path = self.temp_path / "resume_emissions_update_row.csv" + + df = pd.read_csv(path) + + self.assertEqual(len(df), len(trackers)) + + dict_df = dict(df.iloc[id_tracker]) + dict_data = dict(trackers[id_tracker].final_emissions_data.values) + + for k in keys: + if isinstance(dict_df[k], (float, int)): + self.assertAlmostEqual(dict_df[k], dict_data[k]) + else: + self.assertEqual(dict_df[k], dict_data[k]) diff --git a/tests/test_emissions_tracker_constant.py b/tests/test_emissions_tracker_constant.py index f657132b6..e586862cf 100644 --- a/tests/test_emissions_tracker_constant.py +++ b/tests/test_emissions_tracker_constant.py @@ -28,7 +28,7 @@ def test_carbon_tracker_online_constant(self): tracker = EmissionsTracker() tracker.start() heavy_computation(run_time_secs=1) - emissions = tracker.stop() + emissions = tracker.stop().emissions assert isinstance(emissions, float) self.assertNotEqual(emissions, 0.0) self.assertAlmostEqual(emissions, 6.262572537957655e-05, places=2) @@ -38,7 +38,7 @@ def test_carbon_tracker_offline_constant(self): tracker = OfflineEmissionsTracker(country_iso_code="USA") tracker.start() heavy_computation(run_time_secs=1) - emissions = tracker.stop() + emissions = tracker.stop().emissions assert isinstance(emissions, float) self.assertNotEqual(emissions, 0.0) self.assertAlmostEqual(emissions, 6.262572537957655e-05, places=2)