Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/muse/demand_share.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def demand_share(
RetrofitAgentInStandardDemandShare,
)
from muse.registration import registrator
from muse.utilities import check_dimensions

DEMAND_SHARE_SIGNATURE = Callable[
[Sequence[AbstractAgent], xr.Dataset, xr.Dataset, KwArg(Any)], xr.DataArray
Expand Down Expand Up @@ -102,7 +103,27 @@ def demand_share(

keyword_args = copy(keywords)
keyword_args.update(**kwargs)
return function(agents, market, technologies, **keyword_args)

# Check inputs
check_dimensions(
market,
["commodity", "year", "timeslice", "region"],
optional=["dst_region"],
)
check_dimensions(
technologies,
["technology", "year", "region"],
optional=["timeslice", "commodity", "dst_region"],
)

# Calculate demand share
result = function(agents, market, technologies, **keyword_args)

# Check result
check_dimensions(
result, ["timeslice", "commodity"], optional=["asset", "region"]
) # TODO: asset should be required, but trade model is failing
return result

return cast(DEMAND_SHARE_SIGNATURE, demand_share)

Expand Down
48 changes: 30 additions & 18 deletions src/muse/objectives.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ def comfort(
these parameters.

Returns:
A DataArray with at least one dimension corresponding to ``replacement``.
Other dimensions can be present, as long as the subsequent decision function knows
how to reduce them.
A DataArray with at least two dimension corresponding to `replacement` and `asset`.
A `timeslice` dimension may also be present.
"""

__all__ = [
Expand Down Expand Up @@ -72,7 +71,7 @@ def comfort(
from muse.outputs.cache import cache_quantity
from muse.registration import registrator
from muse.timeslices import broadcast_timeslice, distribute_timeslice, drop_timeslice
from muse.utilities import filter_input
from muse.utilities import check_dimensions, filter_input

OBJECTIVE_SIGNATURE = Callable[
[xr.Dataset, xr.DataArray, xr.DataArray, KwArg(Any)], xr.DataArray
Expand Down Expand Up @@ -168,25 +167,30 @@ def register_objective(function: OBJECTIVE_SIGNATURE):
from functools import wraps

@wraps(function)
def decorated_objective(technologies: xr.Dataset, *args, **kwargs) -> xr.DataArray:
def decorated_objective(
technologies: xr.Dataset, demand: xr.DataArray, *args, **kwargs
) -> xr.DataArray:
from logging import getLogger

result = function(technologies, *args, **kwargs)
# Check inputs
check_dimensions(
demand, ["asset", "timeslice", "commodity"], optional=["region"]
)
check_dimensions(
technologies, ["replacement", "commodity"], optional=["timeslice"]
)

# Calculate objective
result = function(technologies, demand, *args, **kwargs)
result.name = function.__name__

# Check result
dtype = result.values.dtype
if not (np.issubdtype(dtype, np.number) or np.issubdtype(dtype, np.bool_)):
msg = f"dtype of objective {function.__name__} is not a number ({dtype})"
getLogger(function.__module__).warning(msg)
check_dimensions(result, ["replacement", "asset"], optional=["timeslice"])

if "replacement" not in result.dims:
raise RuntimeError("Objective should return a dimension 'replacement'")
if "technology" in result.dims:
raise RuntimeError("Objective should not return a dimension 'technology'")
if "technology" in result.coords:
raise RuntimeError("Objective should not return a coordinate 'technology'")
if "year" in result.dims:
raise RuntimeError("Objective should not return a dimension 'year'")
result.name = function.__name__
cache_quantity(**{result.name: result})
return result

Expand All @@ -196,21 +200,25 @@ def decorated_objective(technologies: xr.Dataset, *args, **kwargs) -> xr.DataArr
@register_objective
def comfort(
technologies: xr.Dataset,
demand: xr.DataArray,
*args,
**kwargs,
) -> xr.DataArray:
"""Comfort value provided by technologies."""
return technologies.comfort
result = xr.broadcast(technologies.comfort, demand.asset)[0]
return result


@register_objective
def efficiency(
technologies: xr.Dataset,
demand: xr.DataArray,
*args,
**kwargs,
) -> xr.DataArray:
"""Efficiency of the technologies."""
return technologies.efficiency
result = xr.broadcast(technologies.efficiency, demand.asset)[0]
return result


@register_objective(name="capacity")
Expand Down Expand Up @@ -292,6 +300,7 @@ def fixed_costs(
@register_objective
def capital_costs(
technologies: xr.Dataset,
demand: xr.Dataset,
*args,
**kwargs,
) -> xr.DataArray:
Expand All @@ -303,6 +312,7 @@ def capital_costs(
simulation for each technology.
"""
result = technologies.cap_par * (technologies.scaling_size**technologies.cap_exp)
result = xr.broadcast(result, demand.asset)[0]
return result


Expand Down Expand Up @@ -373,10 +383,12 @@ def annual_levelized_cost_of_energy(
"""
from muse.costs import annual_levelized_cost_of_energy as aLCOE

return filter_input(
result = filter_input(
aLCOE(technologies=technologies, prices=prices).max("timeslice"),
year=demand.year.item(),
)
result = xr.broadcast(result, demand.asset)[0]
return result


@register_objective(name=["LCOE", "LLCOE"])
Expand Down
76 changes: 49 additions & 27 deletions src/muse/utilities.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
"""Collection of functions and stand-alone algorithms."""

from __future__ import annotations

from collections.abc import Hashable, Iterable, Iterator, Mapping, Sequence
from typing import (
Any,
Callable,
NamedTuple,
Optional,
Union,
cast,
)

import numpy as np
import xarray as xr


def multiindex_to_coords(
data: Union[xr.Dataset, xr.DataArray], dimension: str = "asset"
):
def multiindex_to_coords(data: xr.Dataset | xr.DataArray, dimension: str = "asset"):
"""Flattens multi-index dimension into multi-coord dimension."""
from pandas import MultiIndex

Expand All @@ -33,8 +31,8 @@ def multiindex_to_coords(


def coords_to_multiindex(
data: Union[xr.Dataset, xr.DataArray], dimension: str = "asset"
) -> Union[xr.Dataset, xr.DataArray]:
data: xr.Dataset | xr.DataArray, dimension: str = "asset"
) -> xr.Dataset | xr.DataArray:
"""Creates a multi-index from flattened multiple coords."""
from pandas import MultiIndex

Expand All @@ -47,11 +45,11 @@ def coords_to_multiindex(


def reduce_assets(
assets: Union[xr.DataArray, xr.Dataset, Sequence[Union[xr.Dataset, xr.DataArray]]],
coords: Optional[Union[str, Sequence[str], Iterable[str]]] = None,
assets: xr.DataArray | xr.Dataset | Sequence[xr.Dataset | xr.DataArray],
coords: str | Sequence[str] | Iterable[str] | None = None,
dim: str = "asset",
operation: Optional[Callable] = None,
) -> Union[xr.DataArray, xr.Dataset]:
operation: Callable | None = None,
) -> xr.DataArray | xr.Dataset:
r"""Combine assets along given asset dimension.

This method simplifies combining assets across multiple agents, or combining assets
Expand Down Expand Up @@ -182,13 +180,13 @@ def operation(x):


def broadcast_techs(
technologies: Union[xr.Dataset, xr.DataArray],
template: Union[xr.DataArray, xr.Dataset],
technologies: xr.Dataset | xr.DataArray,
template: xr.DataArray | xr.Dataset,
dimension: str = "asset",
interpolation: str = "linear",
installed_as_year: bool = True,
**kwargs,
) -> Union[xr.Dataset, xr.DataArray]:
) -> xr.Dataset | xr.DataArray:
"""Broadcasts technologies to the shape of template in given dimension.

The dimensions of the technologies are fully explicit, in that each concept
Expand Down Expand Up @@ -246,7 +244,7 @@ def broadcast_techs(
return techs.sel(second_sel)


def clean_assets(assets: xr.Dataset, years: Union[int, Sequence[int]]):
def clean_assets(assets: xr.Dataset, years: int | Sequence[int]):
"""Cleans up and prepares asset for current iteration.

- adds current and forecast year by backfilling missing entries
Expand All @@ -265,11 +263,11 @@ def clean_assets(assets: xr.Dataset, years: Union[int, Sequence[int]]):


def filter_input(
dataset: Union[xr.Dataset, xr.DataArray],
year: Optional[Union[int, Iterable[int]]] = None,
dataset: xr.Dataset | xr.DataArray,
year: int | Iterable[int] | None = None,
interpolation: str = "linear",
**kwargs,
) -> Union[xr.Dataset, xr.DataArray]:
) -> xr.Dataset | xr.DataArray:
"""Filter inputs, taking care to interpolate years."""
if year is None:
setyear: set[int] = set()
Expand Down Expand Up @@ -300,8 +298,8 @@ def filter_input(


def filter_with_template(
data: Union[xr.Dataset, xr.DataArray],
template: Union[xr.DataArray, xr.Dataset],
data: xr.Dataset | xr.DataArray,
template: xr.DataArray | xr.Dataset,
asset_dimension: str = "asset",
**kwargs,
):
Expand Down Expand Up @@ -350,7 +348,7 @@ def tupled_dimension(array: np.ndarray, axis: int):
def lexical_comparison(
objectives: xr.Dataset,
binsize: xr.Dataset,
order: Optional[Sequence[Hashable]] = None,
order: Sequence[Hashable] | None = None,
bin_last: bool = True,
) -> xr.DataArray:
"""Lexical comparison over the objectives.
Expand Down Expand Up @@ -438,7 +436,7 @@ def avoid_repetitions(data: xr.DataArray, dim: str = "year") -> xr.DataArray:
return data.year[years]


def nametuple_to_dict(nametup: Union[Mapping, NamedTuple]) -> Mapping:
def nametuple_to_dict(nametup: Mapping | NamedTuple) -> Mapping:
"""Transforms a nametuple of type GenericDict into an OrderDict."""
from collections import OrderedDict
from dataclasses import asdict, is_dataclass
Expand Down Expand Up @@ -537,11 +535,11 @@ def future_propagation(


def agent_concatenation(
data: Mapping[Hashable, Union[xr.DataArray, xr.Dataset]],
data: Mapping[Hashable, xr.DataArray | xr.Dataset],
dim: str = "asset",
name: str = "agent",
fill_value: Any = 0,
) -> Union[xr.DataArray, xr.Dataset]:
) -> xr.DataArray | xr.Dataset:
"""Concatenates input map along given dimension.

Example:
Expand Down Expand Up @@ -613,10 +611,10 @@ def agent_concatenation(


def aggregate_technology_model(
data: Union[xr.DataArray, xr.Dataset],
data: xr.DataArray | xr.Dataset,
dim: str = "asset",
drop: Union[str, Sequence[str]] = "installed",
) -> Union[xr.DataArray, xr.Dataset]:
drop: str | Sequence[str] = "installed",
) -> xr.DataArray | xr.Dataset:
"""Aggregate together assets with the same installation year.

The assets of a given agent, region, and technology but different installation year
Expand Down Expand Up @@ -659,3 +657,27 @@ def aggregate_technology_model(
data,
[cast(str, u) for u in data.coords if u not in drop and data[u].dims == (dim,)],
)


def check_dimensions(
data: xr.DataArray | xr.Dataset,
required: Iterable[str] = (),
optional: Iterable[str] = (),
):
"""Ensure that an array has the required dimensions.

This will check that all required dimensions are present, and that no other
dimensions are present, apart from those listed as optional.

Args:
data: DataArray or Dataset to check dimensions of
required: List of dimension names that must be present
optional: List of dimension names that may be present
"""
present = set(data.dims)
missing = set(required) - present
if missing:
raise ValueError(f"Missing required dimensions: {missing}")
extra = present - set(required) - set(optional)
if extra:
raise ValueError(f"Extra dimensions: {extra}")
Loading