diff --git a/README.md b/README.md index a0989fac6..67fdda91b 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,16 @@ with EmissionsTracker() as tracker: # GPU Intensive code goes here ``` +### Flush data when running +By default, Code Carbon only writes the CVS output file when it stops. +But for a long run you may want to save intermediate data. +It is possible to call the flush() method to do so. + +Have a look to [./examples/mnist_callback.py](./examples/mnist_callback.py) for an example. + +Note that if you use the API it will also call it when you call flush(). +You could set *api_call_interval* to -1, so that it will not be called automatically and then force a call at the end of each epoch to get the emission of each epoch. ### Using comet.ml diff --git a/codecarbon/core/api_client.py b/codecarbon/core/api_client.py index 54b0af76a..d95e62ee5 100644 --- a/codecarbon/core/api_client.py +++ b/codecarbon/core/api_client.py @@ -52,14 +52,19 @@ def add_emission(self, carbon_emission: dict): self._previous_call = time.time() if self.run_id is None: # TODO : raise an Exception ? - logger.error( - "add_emissionadd_emission need a run_id : the initial call may " + logger.debug( + "ApiClient.add_emission need a run_id : the initial call may " + "have failed. Retrying..." ) self._create_run(self.experiment_id) + if self.run_id is None: + logger.error( + "ApiClient.add_emission still no run_id, aborting for this time !" + ) + return False if carbon_emission["duration"] < 1: logger.warning( - "Warning : emission not send because of a duration smaller than 1." + "ApiClient : emissions not sent because of a duration smaller than 1." ) return False emission = EmissionCreate( @@ -80,10 +85,10 @@ def add_emission(self, carbon_emission: dict): payload = dataclasses.asdict(emission) url = self.url + "/emission" r = requests.post(url=url, json=payload, timeout=2) - logger.debug(f"Successful upload emission {payload} to {url}") if r.status_code != 201: self._log_error(url, payload, r) return False + logger.debug(f"ApiClient - Successful upload emission {payload} to {url}") except Exception as e: logger.error(e, exc_info=True) return False @@ -96,7 +101,7 @@ def _create_run(self, experiment_id): """ if self.experiment_id is None: # TODO : raise an Exception ? - logger.error("FATAL The API _create_run need an experiment_id !") + logger.error("ApiClient FATAL The API _create_run needs an experiment_id !") return None try: run = RunCreate( @@ -110,7 +115,7 @@ def _create_run(self, experiment_id): return None self.run_id = r.json()["id"] logger.info( - "Successfully registered your run on the API.\n\n" + "ApiClient Successfully registered your run on the API.\n\n" + f"Run ID: {self.run_id}\n" + f"Experiment ID: {self.experiment_id}\n" ) @@ -134,10 +139,10 @@ def add_experiment(self, experiment: ExperimentCreate): def _log_error(self, url, payload, response): logger.error( - f" Error when calling the API on {url} with : {json.dumps(payload)}" + f"ApiClient Error when calling the API on {url} with : {json.dumps(payload)}" ) logger.error( - f" API return http code {response.status_code} and answer : {response.text}" + f"ApiClient API return http code {response.status_code} and answer : {response.text}" ) def close_experiment(self): diff --git a/codecarbon/core/util.py b/codecarbon/core/util.py index 5ce0c79ea..368d59e47 100644 --- a/codecarbon/core/util.py +++ b/codecarbon/core/util.py @@ -1,5 +1,8 @@ import logging from contextlib import contextmanager +from os.path import expandvars +from pathlib import Path +from typing import Optional, Union from codecarbon.external.logger import logger @@ -36,3 +39,45 @@ def set_log_level(level: str): logger.setLevel(getattr(logging, level)) return logger.error(f"Unknown log level: {level}") + + +def resolve_path(path: Union[str, Path]) -> None: + + """ + Fully resolve a path: + resolve env vars ($HOME etc.) -> expand user (~) -> make absolute + + Args: + path (Union[str, Path]): Path to a file or repository to resolve as + string or pathlib.Path + + Returns: + pathlib.Path: resolved absolute path + """ + return Path(expandvars(str(path))).expanduser().resolve() + + +def backup(file_path: Union[str, Path], ext: Optional[str] = ".bak") -> None: + """ + Resolves the path to a path then backs it up, adding the extension provided. + + Args: + file_path (Union[str, Path]): Path to a file to backup. + ext (Optional[str], optional): extension to append to the filename when + backing it up. Defaults to ".bak". + """ + file_path = resolve_path(file_path) + if not file_path.exists(): + return + assert file_path.is_file() + idx = 0 + parent = file_path.parent + file_name = f"{file_path.name}{ext}" + backup = parent / file_name + + while backup.exists(): + file_name = f"{file_path.name}_{idx}{ext}" + backup = parent / file_name + idx += 1 + + file_path.rename(backup) diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index cc26120ec..15efdf95e 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -5,6 +5,7 @@ import dataclasses import os import time +import uuid from abc import ABC, abstractmethod from datetime import datetime from functools import wraps @@ -146,6 +147,7 @@ def __init__( co2_signal_api_token: Optional[str] = _sentinel, tracking_mode: Optional[str] = _sentinel, log_level: Optional[Union[int, str]] = _sentinel, + on_csv_write: Optional[str] = _sentinel, ): """ :param project_name: Project name for current experiment run, default name @@ -153,6 +155,7 @@ def __init__( :param measure_power_secs: Interval (in seconds) to measure hardware power usage, defaults to 15 :param api_call_interval: Occurrence to wait before calling API : + -1 : only call api on flush() and at the end. 1 : at every measure 2 : every 2 measure, etc... :param api_endpoint: Optional URL of Code Carbon API endpoint for sending @@ -178,6 +181,10 @@ def __init__( :param log_level: Global codecarbon log level. Accepts one of: {"debug", "info", "warning", "error", "critical"}. Defaults to "info". + :param on_csv_write: "append" or "update". Whether to always append a new line + to the csv when writing or to update the existing `run_id` + row (useful when calling`tracker.flush()` manually). + Accepts one of "append" or "update". """ self._external_conf = get_hierarchical_config() @@ -194,6 +201,7 @@ def __init__( self._set_from_conf(save_to_api, "save_to_api", False, bool) self._set_from_conf(save_to_file, "save_to_file", True, bool) self._set_from_conf(tracking_mode, "tracking_mode", "machine") + self._set_from_conf(on_csv_write, "on_csv_write", "append") assert self._tracking_mode in ["machine", "process"] set_log_level(self._log_level) @@ -270,7 +278,10 @@ def __init__( if self._save_to_file: self.persistence_objs.append( - FileOutput(os.path.join(self._output_dir, self._output_file)) + FileOutput( + os.path.join(self._output_dir, self._output_file), + self._on_csv_write, + ) ) if self._emissions_endpoint: @@ -285,7 +296,10 @@ def __init__( experiment_id=experiment_id, api_key=api_key, ) + self.run_id = self._cc_api__out.run_id self.persistence_objs.append(self._cc_api__out) + else: + self.run_id = uuid.uuid4() @suppress(Exception) def start(self) -> None: @@ -301,6 +315,29 @@ def start(self) -> None: self._last_measured_time = self._start_time = time.time() self._scheduler.start() + @suppress(Exception) + def flush(self) -> Optional[float]: + """ + Write emission to disk or call the API depending on the configuration + but keep running the experiment. + :return: CO2 emissions in kgs + """ + if self._start_time is None: + logger.error("Need to first start the tracker") + return None + + # Run to calculate the power used from last + # scheduled measurement to shutdown + self._measure_power() + + emissions_data = self._prepare_emissions_data() + for persistence in self.persistence_objs: + if isinstance(persistence, CodeCarbonAPIOutput): + emissions_data = self._prepare_emissions_data(delta=True) + persistence.out(emissions_data) + + return emissions_data.emissions + @suppress(Exception) def stop(self) -> Optional[float]: """ @@ -325,6 +362,7 @@ def stop(self) -> Optional[float]: persistence.out(emissions_data) + self.final_emissions_data = emissions_data self.final_emissions = emissions_data.emissions return emissions_data.emissions @@ -357,6 +395,7 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData: total_emissions = EmissionsData( timestamp=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), project_name=self._project_name, + run_id=self.run_id, duration=duration.seconds, emissions=emissions, emissions_rate=emissions * 1000 / duration.seconds, @@ -452,7 +491,7 @@ def _measure_power(self) -> None: ) self._last_measured_time = time.time() self._measure_occurrence += 1 - if self._cc_api__out is not None: + if self._cc_api__out is not None and self._api_call_interval != -1: if self._measure_occurrence >= self._api_call_interval: emissions = self._prepare_emissions_data(delta=True) logger.info( @@ -650,6 +689,7 @@ def track_emissions( def _decorate(fn: Callable): @wraps(fn) def wrapped_fn(*args, **kwargs): + fn_result = None if offline and offline is not _sentinel: if (country_iso_code is None or country_iso_code is _sentinel) and ( cloud_provider is None or cloud_provider is _sentinel @@ -670,7 +710,7 @@ def wrapped_fn(*args, **kwargs): co2_signal_api_token=co2_signal_api_token, ) tracker.start() - fn(*args, **kwargs) + fn_result = fn(*args, **kwargs) tracker.stop() else: tracker = EmissionsTracker( @@ -691,7 +731,7 @@ def wrapped_fn(*args, **kwargs): ) tracker.start() try: - fn(*args, **kwargs) + fn_result = fn(*args, **kwargs) finally: logger.info( "\nGraceful stopping: collecting and writing information.\n" @@ -699,6 +739,7 @@ def wrapped_fn(*args, **kwargs): ) tracker.stop() logger.info("Done!\n") + return fn_result return wrapped_fn diff --git a/codecarbon/output.py b/codecarbon/output.py index 0da9e0d0b..6b3bce78b 100644 --- a/codecarbon/output.py +++ b/codecarbon/output.py @@ -10,10 +10,12 @@ from collections import OrderedDict from dataclasses import dataclass +import pandas as pd import requests # from core.schema import EmissionCreate, Emission from codecarbon.core.api_client import ApiClient +from codecarbon.core.util import backup from codecarbon.external.logger import logger @@ -25,6 +27,7 @@ class EmissionsData: timestamp: str project_name: str + run_id: str duration: float emissions: float emissions_rate: float @@ -79,7 +82,13 @@ class FileOutput(BaseOutput): Saves experiment artifacts to a file """ - def __init__(self, save_file_path: str): + def __init__(self, save_file_path: str, on_csv_write: str = "append"): + if on_csv_write not in {"append", "update"}: + raise ValueError( + f"Unknown `on_csv_write` value: {on_csv_write}" + + " (should be one of 'append' or 'update'" + ) + self.on_csv_write: str = on_csv_write self.save_file_path: str = save_file_path def has_valid_headers(self, data: EmissionsData): @@ -93,14 +102,33 @@ def out(self, data: EmissionsData): file_exists: bool = os.path.isfile(self.save_file_path) if file_exists and not self.has_valid_headers(data): logger.info("Backing up old emission file") - os.rename(self.save_file_path, self.save_file_path + ".bak") + backup(self.save_file_path) file_exists = False - with open(self.save_file_path, "a+") as f: - writer = csv.DictWriter(f, fieldnames=data.values.keys()) - if not file_exists: - writer.writeheader() - writer.writerow(data.values) + if not file_exists: + df = pd.DataFrame(columns=data.values.keys()) + df = df.append(dict(data.values), ignore_index=True) + elif self.on_csv_write == "append": + df = pd.read_csv(self.save_file_path) + df = df.append(dict(data.values), ignore_index=True) + else: + df = pd.read_csv(self.save_file_path) + df_run = df.loc[df.run_id == data.run_id] + if len(df_run) < 1: + df = df.append(dict(data.values), ignore_index=True) + elif len(df_run) > 1: + logger.warning( + f"CSV contains more than 1 ({len(len(df_run))})" + + f" rows with current run ID ({data.run_id})." + + "Appending instead of updating." + ) + df = df.append(dict(data.values), ignore_index=True) + else: + df.at[ + df.run_id == data.run_id, data.values.keys() + ] = data.values.values() + + df.to_csv(self.save_file_path, index=False) class HTTPOutput(BaseOutput): @@ -132,6 +160,8 @@ class CodeCarbonAPIOutput(BaseOutput): Send emissions data to HTTP endpoint """ + run_id = None + def __init__(self, endpoint_url: str, experiment_id: str, api_key: str): self.endpoint_url: str = endpoint_url self.api = ApiClient( @@ -139,6 +169,7 @@ def __init__(self, endpoint_url: str, experiment_id: str, api_key: str): endpoint_url=endpoint_url, api_key=api_key, ) + self.run_id = self.api.run_id def out(self, data: EmissionsData): try: diff --git a/examples/README.md b/examples/README.md index e1029edea..6f71b5530 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,6 +9,7 @@ pip install -r requirements-examples.txt ## Examples * [mnist.py](mnist.py): Usage using explicit `CO2Tracker` objects. * [mnist_decorator.py](mnist_decorator.py): Using the `@track_co2` decorator. +* [mnist_callback.py](mnist_callback.py): Using Keras callbacks to save emissions after each epoch. * [comet-mnist.py](comet-mnist.py): Using `CO2Tracker` with [`Comet`](https://www.comet.ml/site) for automatic experiment and emissions tracking. * [api_call_demo.py](api_call_demo.py) : Simplest demo to send computer emissions to CodeCarbon API. * [api_call_debug.py](api_call_debug.py) : Script to send computer emissions to CodeCarbon API. Made for debugging : debug log and send data every 20 seconds. diff --git a/examples/mnist_callback.py b/examples/mnist_callback.py new file mode 100644 index 000000000..adb5242d2 --- /dev/null +++ b/examples/mnist_callback.py @@ -0,0 +1,45 @@ +import tensorflow as tf +from tensorflow.keras.callbacks import Callback + +from codecarbon import EmissionsTracker + +""" +This sample code shows how to use CodeCarbon as a Keras Callback +to save emissions after each epoch. +""" + + +class CodeCarbonCallBack(Callback): + def __init__(self, codecarbon_tracker): + self.codecarbon_tracker = codecarbon_tracker + pass + + def on_epoch_end(self, epoch, logs=None): + self.codecarbon_tracker.flush() + + +mnist = tf.keras.datasets.mnist + +(x_train, y_train), (x_test, y_test) = mnist.load_data() +x_train, x_test = x_train / 255.0, x_test / 255.0 + + +model = tf.keras.models.Sequential( + [ + tf.keras.layers.Flatten(input_shape=(28, 28)), + tf.keras.layers.Dense(128, activation="relu"), + tf.keras.layers.Dropout(0.2), + tf.keras.layers.Dense(10), + ] +) + +loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) + +model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"]) + +tracker = EmissionsTracker() +tracker.start() +codecarbon_cb = CodeCarbonCallBack(tracker) +model.fit(x_train, y_train, epochs=4, callbacks=[codecarbon_cb]) +emissions: float = tracker.stop() +print(f"Emissions: {emissions} kg") diff --git a/tests/test_api_call.py b/tests/test_api_call.py index 41d3ab9db..d945773e3 100644 --- a/tests/test_api_call.py +++ b/tests/test_api_call.py @@ -1,4 +1,5 @@ import dataclasses +from uuid import uuid4 import requests_mock @@ -25,6 +26,7 @@ def test_call_api(): carbon_emission = EmissionsData( timestamp="222", project_name="", + run_id=uuid4(), duration=1.5, emissions=2.0, emissions_rate=2.0, diff --git a/tests/test_data/emissions_valid_headers.csv b/tests/test_data/emissions_valid_headers.csv index 50dcc40f2..01a198319 100644 --- a/tests/test_data/emissions_valid_headers.csv +++ b/tests/test_data/emissions_valid_headers.csv @@ -1,2 +1,2 @@ -timestamp,project_name,duration,emissions,emissions_rate,cpu_power,gpu_power,ram_power,cpu_energy,gpu_energy,ram_energy,energy_consumed,country_name,country_iso_code,region,on_cloud,cloud_provider,cloud_region -2021-07-13T17:36:33,codecarbon,175.7496521,0.001327206,0.000663603,0.27,0,0.154618823,0.001079437,0,0.000618153,0.00169759,Morocco,MAR,casablanca-settat,N,, +timestamp,project_name,run_id,duration,emissions,emissions_rate,cpu_power,gpu_power,ram_power,cpu_energy,gpu_energy,ram_energy,energy_consumed,country_name,country_iso_code,region,on_cloud,cloud_provider,cloud_region +2021-07-13T17:36:33,codecarbon,3b3639f6-776c-4216-97de-bc52fc895bca,175.7496521,0.001327206,0.000663603,0.27,0,0.154618823,0.001079437,0,0.000618153,0.00169759,Morocco,MAR,casablanca-settat,N,, diff --git a/tests/test_emissions_tracker_constant.py b/tests/test_emissions_tracker_constant.py index f657132b6..4a46b85ea 100644 --- a/tests/test_emissions_tracker_constant.py +++ b/tests/test_emissions_tracker_constant.py @@ -1,4 +1,5 @@ import os +import tempfile import time import unittest @@ -17,15 +18,23 @@ def heavy_computation(run_time_secs: int = 3): class TestCarbonTrackerConstant(unittest.TestCase): def setUp(self) -> None: - self.project_name = "project_foo" - self.emissions_file_path = os.path.join(os.getcwd(), "emissions.csv") + self.project_name = "project_TestCarbonTrackerConstant" + self.emissions_file = "emissions-test-TestCarbonTrackerConstant.csv" + self.emissions_path = tempfile.gettempdir() + self.emissions_file_path = os.path.join( + self.emissions_path, self.emissions_file + ) + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) def tearDown(self) -> None: if os.path.isfile(self.emissions_file_path): os.remove(self.emissions_file_path) def test_carbon_tracker_online_constant(self): - tracker = EmissionsTracker() + tracker = EmissionsTracker( + output_dir=self.emissions_path, output_file=self.emissions_file + ) tracker.start() heavy_computation(run_time_secs=1) emissions = tracker.stop() @@ -35,7 +44,11 @@ def test_carbon_tracker_online_constant(self): self.verify_output_file(self.emissions_file_path) def test_carbon_tracker_offline_constant(self): - tracker = OfflineEmissionsTracker(country_iso_code="USA") + tracker = OfflineEmissionsTracker( + country_iso_code="USA", + output_dir=self.emissions_path, + output_file=self.emissions_file, + ) tracker.start() heavy_computation(run_time_secs=1) emissions = tracker.stop() @@ -45,7 +58,11 @@ def test_carbon_tracker_offline_constant(self): self.verify_output_file(self.emissions_file_path) def test_decorator_constant(self): - @track_emissions(project_name=self.project_name) + @track_emissions( + project_name=self.project_name, + output_dir=self.emissions_path, + output_file=self.emissions_file, + ) def dummy_train_model(): return 42 diff --git a/tests/test_emissions_tracker_flush.py b/tests/test_emissions_tracker_flush.py new file mode 100644 index 000000000..227d8683e --- /dev/null +++ b/tests/test_emissions_tracker_flush.py @@ -0,0 +1,82 @@ +import os +import tempfile +import time +import unittest + +from codecarbon.emissions_tracker import ( + EmissionsTracker, + OfflineEmissionsTracker, + track_emissions, +) + + +def heavy_computation(run_time_secs: int = 3): + end_time: float = time.time() + run_time_secs # Run for `run_time_secs` seconds + while time.time() < end_time: + pass + + +class TestCarbonTrackerFlush(unittest.TestCase): + def setUp(self) -> None: + self.project_name = "project_TestCarbonTrackerFlush" + self.emissions_file = "emissions-test-TestCarbonTrackerFlush.csv" + self.emissions_path = tempfile.gettempdir() + self.emissions_file_path = os.path.join( + self.emissions_path, self.emissions_file + ) + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + def tearDown(self) -> None: + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + def test_carbon_tracker_online_flush(self): + tracker = EmissionsTracker( + output_dir=self.emissions_path, output_file=self.emissions_file + ) + tracker.start() + heavy_computation(run_time_secs=1) + tracker.flush() + heavy_computation(run_time_secs=1) + emissions = tracker.stop() + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.assertAlmostEqual(emissions, 6.262572537957655e-05, places=2) + self.verify_output_file(self.emissions_file_path) + + def test_carbon_tracker_offline_flush(self): + tracker = OfflineEmissionsTracker( + country_iso_code="USA", + output_dir=self.emissions_path, + output_file=self.emissions_file, + ) + tracker.start() + heavy_computation(run_time_secs=1) + tracker.flush() + heavy_computation(run_time_secs=1) + emissions = tracker.stop() + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.assertAlmostEqual(emissions, 6.262572537957655e-05, places=2) + self.verify_output_file(self.emissions_file_path) + + def test_decorator_flush(self): + @track_emissions( + project_name=self.project_name, + output_dir=self.emissions_path, + output_file=self.emissions_file, + ) + def dummy_train_model(): + # I don't know how to call flush() in decorator mode + return 42 + + res = dummy_train_model() + self.assertEqual(res, 42) + + self.verify_output_file(self.emissions_file_path, 2) + + def verify_output_file(self, file_path: str, expected_lines=3) -> None: + with open(file_path, "r") as f: + lines = [line.rstrip() for line in f] + assert len(lines) == expected_lines