Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions codecarbon/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,10 @@ def get_cpu_details(self) -> Dict:
else:
cpu_details[col_name] = cpu_data[col_name].mean()
except Exception as e:
logger.info(
f"Unable to read Intel Power Gadget logged file at {self._log_file_path}\n \
Exception occurred {e}",
exc_info=True,
logger.warning(
f"Unable to read Intel Power Gadget logged file. Exception:\n{e}"
)
logger.debug("Full error:\n", exc_info=True)
return cpu_details


Expand Down Expand Up @@ -210,10 +209,10 @@ def get_cpu_details(self) -> Dict:
cpu_details[rapl_file.name] = rapl_file.power_measurement.kW
except Exception as e:
logger.info(
f"Unable to read Intel RAPL files at {self._rapl_files}\n \
Exception occurred {e}",
exc_info=True,
f"Unable to read Intel RAPL files at {self._rapl_files}."
+ f" Exception:\n {e}"
)
logger.debug("Full error:\n", exc_info=True)
return cpu_details


Expand Down
1 change: 0 additions & 1 deletion codecarbon/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def suppress(*exceptions):
logger.warning(
exceptions if len(exceptions) != 1 else exceptions[0], exc_info=True
)
logger.warning("stopping.")
pass


Expand Down
218 changes: 137 additions & 81 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# see: https://stackoverflow.com/questions/67202314/
# python-distinguish-default-argument-and-argument-provided-with-default-value

_sentinel = object()
_sentinel = "93dc1470-9879-43ed-b11e-158bcc932a99"


class BaseEmissionsTracker(ABC):
Expand All @@ -56,79 +56,6 @@ class BaseEmissionsTracker(ABC):
and `CarbonTracker.`
"""

def _set_from_conf(
self, var, name, default=None, return_type=None, prevent_setter=False
):
"""
Method to standardize private argument setting. Generic flow is:

* If a value for the variable `var` with name `name` is provided in the
__init__ constructor: set the the private attribute `self._{name}` to
that value

* If no value is provided for `var`, i.e. `var is _sentinel` is True then
we try to assign a value to it:

* If there is a value for `name` in the external configuration (config
files or env variables), then we use it
* Otherwise `self._{name}` is set to the `default` value

Additionally, if `return_type` is provided and one of `float` `int` or `bool`,
the value for `self._{name}` will be parsed to this type.

Use `prevent_setter=True` for debugging purposes only.

Args:
var (Any): The variable's value to set as private attribute
name (str): The variable's name such that `self._{name}` will be set
to `var`
default (Any, optional): The value to use for self._name if no value
is provided in the constructor and no value is found in the external
configuration.
Defaults to None.
return_type (Any, optional): A type to parse the value to. Defaults to None.
prevent_setter (bool, optional): Whether to set the private attribute or
simply return the value. For debugging. Defaults to False.

Returns:
[Any]: The value used for `self._{name}`
"""
# Check the hierarchical configuration has been read parsed and set.
assert hasattr(self, "_external_conf")
assert isinstance(self._external_conf, dict)

# Store final values in _conf
if not hasattr(self, "_conf"):
self._conf = {}

value = _sentinel

# a value for the keyword argument `name` is provided in the constructor:
# use it
if var is not _sentinel:
value = var
else:

# no value provided in the constructor for `name`: check in the conf
# (using the provided default value)
value = self._external_conf.get(name, default)

# parse to `return_type` if needed
if return_type is not None:
if return_type is bool:
value = str(value).lower() == "true"
else:
assert callable(return_type)
value = return_type(value)

# store final value
self._conf[name] = value
# set `self._{name}` to `value`
if not prevent_setter:
setattr(self, f"_{name}", value)
# return final value (why not?)
return value

def __init__(
self,
project_name: Optional[str] = _sentinel,
Expand Down Expand Up @@ -199,6 +126,7 @@ def __init__(
set_log_level(self._log_level)

self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._last_measured_time: float = time.time()
self._total_energy: Energy = Energy.from_energy(kWh=0)
self._total_cpu_energy: Energy = Energy.from_energy(kWh=0)
Expand All @@ -214,6 +142,8 @@ def __init__(
self._measure_occurrence: int = 0
self._cloud = None
self._previous_emissions = None
self._previous_duration: float = 0
self.final_emissions_data = None

if isinstance(self._gpu_ids, str):
self._gpu_ids = parse_gpu_ids(self._gpu_ids)
Expand Down Expand Up @@ -287,26 +217,144 @@ def __init__(
)
self.persistence_objs.append(self._cc_api__out)

def _set_from_conf(
self, var, name, default=None, return_type=None, prevent_setter=False
):
"""
Method to standardize private argument setting. Generic flow is:

* If a value for the variable `var` with name `name` is provided in the
__init__ constructor: set the the private attribute `self._{name}` to
that value

* If no value is provided for `var`, i.e. `var is _sentinel` is True then
we try to assign a value to it:

* If there is a value for `name` in the external configuration (config
files or env variables), then we use it
* Otherwise `self._{name}` is set to the `default` value

Additionally, if `return_type` is provided and one of `float` `int` or `bool`,
the value for `self._{name}` will be parsed to this type.

Use `prevent_setter=True` for debugging purposes only.

Args:
var (Any): The variable's value to set as private attribute
name (str): The variable's name such that `self._{name}` will be set
to `var`
default (Any, optional): The value to use for self._name if no value
is provided in the constructor and no value is found in the external
configuration.
Defaults to None.
return_type (Any, optional): A type to parse the value to. Defaults to None.
prevent_setter (bool, optional): Whether to set the private attribute or
simply return the value. For debugging. Defaults to False.

Returns:
[Any]: The value used for `self._{name}`
"""
# Check the hierarchical configuration has been read parsed and set.
assert hasattr(self, "_external_conf")
assert isinstance(self._external_conf, dict)

# Store final values in _conf
if not hasattr(self, "_conf"):
self._conf = {}

value = _sentinel

# a value for the keyword argument `name` is provided in the constructor:
# use it
if var is not _sentinel:
value = var
else:

# no value provided in the constructor for `name`: check in the conf
# (using the provided default value)
value = self._external_conf.get(name, default)

# parse to `return_type` if needed
if return_type is not None:
if return_type is bool:
value = str(value).lower() == "true"
else:
assert callable(return_type)
value = return_type(value)

# store final value
self._conf[name] = value
# set `self._{name}` to `value`
if not prevent_setter:
setattr(self, f"_{name}", value)
# return final value (why not?)
return value

@suppress(Exception)
def start(self) -> None:
"""
Starts tracking the experiment.
Currently, Nvidia GPUs are supported.
:return: None
"""
resuming = False
if self._start_time is not None:
logger.warning("Already started tracking")
return
if self._end_time is None:
logger.warning(
"You can only resume a tracker which has been started then stopped."
+ " The current tracker has been started but not stopped."
+ " Ignoring start() call."
)
return
logger.info("Resuming tracker.")
resuming = True
self._previous_duration = self._previous_duration + (
self._end_time - self._start_time
)

self._scheduler = BackgroundScheduler()
self._scheduler.add_job(
self._measure_power,
"interval",
seconds=self._measure_power_secs,
max_instances=1,
)

if not resuming:
logger.info("Starting tracker.")
self._last_measured_time = self._start_time = time.time()
self._scheduler.start()

@suppress(Exception)
def stop(self) -> Optional[float]:
def flush(self) -> Optional[float]:
"""
Write emission to disk or call the API depending of the configuration
but keep running the experiment.
:return: CO2 emissions in kgs
"""
if self._start_time is None:
logger.error("Need to first start the tracker")
return None

# Run to calculate the power used from last
# scheduled measurement to shutdown
self._measure_power()

emissions_data = self._prepare_emissions_data()
for persistence in self.persistence_objs:
if isinstance(persistence, CodeCarbonAPIOutput):
emissions_data = self._prepare_emissions_data(delta=True)
persistence.out(emissions_data)

return emissions_data.emissions

@suppress(Exception)
def stop(self) -> Optional[EmissionsData]:
"""
Stops tracking the experiment
:return: CO2 emissions in kgs
"""
logger.info("Stopping tracker")
if self._start_time is None:
logger.error("Need to first start the tracker")
return None
Expand All @@ -317,16 +365,18 @@ def stop(self) -> Optional[float]:
# scheduled measurement to shutdown
self._measure_power()

self._end_time = time.time()

emissions_data = self._prepare_emissions_data()

for persistence in self.persistence_objs:
if isinstance(persistence, CodeCarbonAPIOutput):
emissions_data = self._prepare_emissions_data(delta=True)
persistence.out(emissions_data, self.final_emissions_data)

persistence.out(emissions_data)

self.final_emissions = emissions_data.emissions
return emissions_data.emissions
self.final_emissions_data = emissions_data
# emissions are in emissions_data.emissions
return emissions_data

def _prepare_emissions_data(self, delta=False) -> EmissionsData:
"""
Expand Down Expand Up @@ -375,6 +425,8 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData:
cloud_region=cloud_region,
)
if delta:
# sending to the API: we need to upload the relative change
# in measurements not the current absolute value
if self._previous_emissions is None:
self._previous_emissions = total_emissions
else:
Expand All @@ -386,6 +438,10 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData:
# TODO : the API call succeeded
self._previous_emissions = total_emissions
total_emissions = delta_emissions
else:
# not sending to the API: duration has to be incremented by potential
# previous duration (if tracker was re-started)
total_emissions.duration += self._previous_duration
logger.debug(total_emissions)
return total_emissions

Expand Down
Loading