Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,16 @@ with EmissionsTracker() as tracker:
# GPU Intensive code goes here
```

### Flush data when running

By default, Code Carbon only writes the CVS output file when it stops.
But for a long run you may want to save intermediate data.
It is possible to call the flush() method to do so.

Have a look to [./examples/mnist_callback.py](./examples/mnist_callback.py) for an example.

Note that if you use the API it will also call it when you call flush().
You could set *api_call_interval* to -1, so that it will not be called automatically and then force a call at the end of each epoch to get the emission of each epoch.

### Using comet.ml

Expand Down
21 changes: 13 additions & 8 deletions codecarbon/core/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,19 @@ def add_emission(self, carbon_emission: dict):
self._previous_call = time.time()
if self.run_id is None:
# TODO : raise an Exception ?
logger.error(
"add_emissionadd_emission need a run_id : the initial call may "
logger.debug(
"ApiClient.add_emission need a run_id : the initial call may "
+ "have failed. Retrying..."
)
self._create_run(self.experiment_id)
if self.run_id is None:
logger.error(
"ApiClient.add_emission still no run_id, aborting for this time !"
)
return False
if carbon_emission["duration"] < 1:
logger.warning(
"Warning : emission not send because of a duration smaller than 1."
"ApiClient : emissions not sent because of a duration smaller than 1."
)
return False
emission = EmissionCreate(
Expand All @@ -80,10 +85,10 @@ def add_emission(self, carbon_emission: dict):
payload = dataclasses.asdict(emission)
url = self.url + "/emission"
r = requests.post(url=url, json=payload, timeout=2)
logger.debug(f"Successful upload emission {payload} to {url}")
if r.status_code != 201:
self._log_error(url, payload, r)
return False
logger.debug(f"ApiClient - Successful upload emission {payload} to {url}")
except Exception as e:
logger.error(e, exc_info=True)
return False
Expand All @@ -96,7 +101,7 @@ def _create_run(self, experiment_id):
"""
if self.experiment_id is None:
# TODO : raise an Exception ?
logger.error("FATAL The API _create_run need an experiment_id !")
logger.error("ApiClient FATAL The API _create_run needs an experiment_id !")
return None
try:
run = RunCreate(
Expand All @@ -110,7 +115,7 @@ def _create_run(self, experiment_id):
return None
self.run_id = r.json()["id"]
logger.info(
"Successfully registered your run on the API.\n\n"
"ApiClient Successfully registered your run on the API.\n\n"
+ f"Run ID: {self.run_id}\n"
+ f"Experiment ID: {self.experiment_id}\n"
)
Expand All @@ -134,10 +139,10 @@ def add_experiment(self, experiment: ExperimentCreate):

def _log_error(self, url, payload, response):
logger.error(
f" Error when calling the API on {url} with : {json.dumps(payload)}"
f"ApiClient Error when calling the API on {url} with : {json.dumps(payload)}"
)
logger.error(
f" API return http code {response.status_code} and answer : {response.text}"
f"ApiClient API return http code {response.status_code} and answer : {response.text}"
)

def close_experiment(self):
Expand Down
45 changes: 45 additions & 0 deletions codecarbon/core/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
from contextlib import contextmanager
from os.path import expandvars
from pathlib import Path
from typing import Optional, Union

from codecarbon.external.logger import logger

Expand Down Expand Up @@ -36,3 +39,45 @@ def set_log_level(level: str):
logger.setLevel(getattr(logging, level))
return
logger.error(f"Unknown log level: {level}")


def resolve_path(path: Union[str, Path]) -> None:

"""
Fully resolve a path:
resolve env vars ($HOME etc.) -> expand user (~) -> make absolute

Args:
path (Union[str, Path]): Path to a file or repository to resolve as
string or pathlib.Path

Returns:
pathlib.Path: resolved absolute path
"""
return Path(expandvars(str(path))).expanduser().resolve()


def backup(file_path: Union[str, Path], ext: Optional[str] = ".bak") -> None:
"""
Resolves the path to a path then backs it up, adding the extension provided.

Args:
file_path (Union[str, Path]): Path to a file to backup.
ext (Optional[str], optional): extension to append to the filename when
backing it up. Defaults to ".bak".
"""
file_path = resolve_path(file_path)
if not file_path.exists():
return
assert file_path.is_file()
idx = 0
parent = file_path.parent
file_name = f"{file_path.name}{ext}"
backup = parent / file_name

while backup.exists():
file_name = f"{file_path.name}_{idx}{ext}"
backup = parent / file_name
idx += 1

file_path.rename(backup)
49 changes: 45 additions & 4 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dataclasses
import os
import time
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from functools import wraps
Expand Down Expand Up @@ -146,13 +147,15 @@ def __init__(
co2_signal_api_token: Optional[str] = _sentinel,
tracking_mode: Optional[str] = _sentinel,
log_level: Optional[Union[int, str]] = _sentinel,
on_csv_write: Optional[str] = _sentinel,
):
"""
:param project_name: Project name for current experiment run, default name
as "codecarbon"
:param measure_power_secs: Interval (in seconds) to measure hardware power
usage, defaults to 15
:param api_call_interval: Occurrence to wait before calling API :
-1 : only call api on flush() and at the end.
1 : at every measure
2 : every 2 measure, etc...
:param api_endpoint: Optional URL of Code Carbon API endpoint for sending
Expand All @@ -178,6 +181,10 @@ def __init__(
:param log_level: Global codecarbon log level. Accepts one of:
{"debug", "info", "warning", "error", "critical"}.
Defaults to "info".
:param on_csv_write: "append" or "update". Whether to always append a new line
to the csv when writing or to update the existing `run_id`
row (useful when calling`tracker.flush()` manually).
Accepts one of "append" or "update".
"""
self._external_conf = get_hierarchical_config()

Expand All @@ -194,6 +201,7 @@ def __init__(
self._set_from_conf(save_to_api, "save_to_api", False, bool)
self._set_from_conf(save_to_file, "save_to_file", True, bool)
self._set_from_conf(tracking_mode, "tracking_mode", "machine")
self._set_from_conf(on_csv_write, "on_csv_write", "append")

assert self._tracking_mode in ["machine", "process"]
set_log_level(self._log_level)
Expand Down Expand Up @@ -270,7 +278,10 @@ def __init__(

if self._save_to_file:
self.persistence_objs.append(
FileOutput(os.path.join(self._output_dir, self._output_file))
FileOutput(
os.path.join(self._output_dir, self._output_file),
self._on_csv_write,
)
)

if self._emissions_endpoint:
Expand All @@ -285,7 +296,10 @@ def __init__(
experiment_id=experiment_id,
api_key=api_key,
)
self.run_id = self._cc_api__out.run_id
self.persistence_objs.append(self._cc_api__out)
else:
self.run_id = uuid.uuid4()

@suppress(Exception)
def start(self) -> None:
Expand All @@ -301,6 +315,29 @@ def start(self) -> None:
self._last_measured_time = self._start_time = time.time()
self._scheduler.start()

@suppress(Exception)
def flush(self) -> Optional[float]:
"""
Write emission to disk or call the API depending on the configuration
but keep running the experiment.
:return: CO2 emissions in kgs
"""
if self._start_time is None:
logger.error("Need to first start the tracker")
return None

# Run to calculate the power used from last
# scheduled measurement to shutdown
self._measure_power()

emissions_data = self._prepare_emissions_data()
for persistence in self.persistence_objs:
if isinstance(persistence, CodeCarbonAPIOutput):
emissions_data = self._prepare_emissions_data(delta=True)
persistence.out(emissions_data)

return emissions_data.emissions

@suppress(Exception)
def stop(self) -> Optional[float]:
"""
Expand All @@ -325,6 +362,7 @@ def stop(self) -> Optional[float]:

persistence.out(emissions_data)

self.final_emissions_data = emissions_data
self.final_emissions = emissions_data.emissions
return emissions_data.emissions

Expand Down Expand Up @@ -357,6 +395,7 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData:
total_emissions = EmissionsData(
timestamp=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
project_name=self._project_name,
run_id=self.run_id,
duration=duration.seconds,
emissions=emissions,
emissions_rate=emissions * 1000 / duration.seconds,
Expand Down Expand Up @@ -452,7 +491,7 @@ def _measure_power(self) -> None:
)
self._last_measured_time = time.time()
self._measure_occurrence += 1
if self._cc_api__out is not None:
if self._cc_api__out is not None and self._api_call_interval != -1:
if self._measure_occurrence >= self._api_call_interval:
emissions = self._prepare_emissions_data(delta=True)
logger.info(
Expand Down Expand Up @@ -650,6 +689,7 @@ def track_emissions(
def _decorate(fn: Callable):
@wraps(fn)
def wrapped_fn(*args, **kwargs):
fn_result = None
if offline and offline is not _sentinel:
if (country_iso_code is None or country_iso_code is _sentinel) and (
cloud_provider is None or cloud_provider is _sentinel
Expand All @@ -670,7 +710,7 @@ def wrapped_fn(*args, **kwargs):
co2_signal_api_token=co2_signal_api_token,
)
tracker.start()
fn(*args, **kwargs)
fn_result = fn(*args, **kwargs)
tracker.stop()
else:
tracker = EmissionsTracker(
Expand All @@ -691,14 +731,15 @@ def wrapped_fn(*args, **kwargs):
)
tracker.start()
try:
fn(*args, **kwargs)
fn_result = fn(*args, **kwargs)
finally:
logger.info(
"\nGraceful stopping: collecting and writing information.\n"
+ "Please Allow for a few seconds..."
)
tracker.stop()
logger.info("Done!\n")
return fn_result

return wrapped_fn

Expand Down
45 changes: 38 additions & 7 deletions codecarbon/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from collections import OrderedDict
from dataclasses import dataclass

import pandas as pd
import requests

# from core.schema import EmissionCreate, Emission
from codecarbon.core.api_client import ApiClient
from codecarbon.core.util import backup
from codecarbon.external.logger import logger


Expand All @@ -25,6 +27,7 @@ class EmissionsData:

timestamp: str
project_name: str
run_id: str
duration: float
emissions: float
emissions_rate: float
Expand Down Expand Up @@ -79,7 +82,13 @@ class FileOutput(BaseOutput):
Saves experiment artifacts to a file
"""

def __init__(self, save_file_path: str):
def __init__(self, save_file_path: str, on_csv_write: str = "append"):
if on_csv_write not in {"append", "update"}:
raise ValueError(
f"Unknown `on_csv_write` value: {on_csv_write}"
+ " (should be one of 'append' or 'update'"
)
self.on_csv_write: str = on_csv_write
self.save_file_path: str = save_file_path

def has_valid_headers(self, data: EmissionsData):
Expand All @@ -93,14 +102,33 @@ def out(self, data: EmissionsData):
file_exists: bool = os.path.isfile(self.save_file_path)
if file_exists and not self.has_valid_headers(data):
logger.info("Backing up old emission file")
os.rename(self.save_file_path, self.save_file_path + ".bak")
backup(self.save_file_path)
file_exists = False

with open(self.save_file_path, "a+") as f:
writer = csv.DictWriter(f, fieldnames=data.values.keys())
if not file_exists:
writer.writeheader()
writer.writerow(data.values)
if not file_exists:
df = pd.DataFrame(columns=data.values.keys())
df = df.append(dict(data.values), ignore_index=True)
elif self.on_csv_write == "append":
df = pd.read_csv(self.save_file_path)
df = df.append(dict(data.values), ignore_index=True)
else:
df = pd.read_csv(self.save_file_path)
df_run = df.loc[df.run_id == data.run_id]
if len(df_run) < 1:
df = df.append(dict(data.values), ignore_index=True)
elif len(df_run) > 1:
logger.warning(
f"CSV contains more than 1 ({len(len(df_run))})"
+ f" rows with current run ID ({data.run_id})."
+ "Appending instead of updating."
)
df = df.append(dict(data.values), ignore_index=True)
else:
df.at[
df.run_id == data.run_id, data.values.keys()
] = data.values.values()

df.to_csv(self.save_file_path, index=False)


class HTTPOutput(BaseOutput):
Expand Down Expand Up @@ -132,13 +160,16 @@ class CodeCarbonAPIOutput(BaseOutput):
Send emissions data to HTTP endpoint
"""

run_id = None

def __init__(self, endpoint_url: str, experiment_id: str, api_key: str):
self.endpoint_url: str = endpoint_url
self.api = ApiClient(
experiment_id=experiment_id,
endpoint_url=endpoint_url,
api_key=api_key,
)
self.run_id = self.api.run_id

def out(self, data: EmissionsData):
try:
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pip install -r requirements-examples.txt
## Examples
* [mnist.py](mnist.py): Usage using explicit `CO2Tracker` objects.
* [mnist_decorator.py](mnist_decorator.py): Using the `@track_co2` decorator.
* [mnist_callback.py](mnist_callback.py): Using Keras callbacks to save emissions after each epoch.
* [comet-mnist.py](comet-mnist.py): Using `CO2Tracker` with [`Comet`](https://www.comet.ml/site) for automatic experiment and emissions tracking.
* [api_call_demo.py](api_call_demo.py) : Simplest demo to send computer emissions to CodeCarbon API.
* [api_call_debug.py](api_call_debug.py) : Script to send computer emissions to CodeCarbon API. Made for debugging : debug log and send data every 20 seconds.
Loading