Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 0 additions & 90 deletions src/muse/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def production(
"""

__all__ = [
"demand_matched_production",
"factory",
"maximum_production",
"register_production",
Expand Down Expand Up @@ -123,92 +122,3 @@ def supply(
from muse.quantities import supply

return supply(capacity, market.consumption, technologies)


@register_production(name="match")
def demand_matched_production(
market: xr.Dataset,
capacity: xr.DataArray,
technologies: xr.Dataset,
costs: str = "prices",
) -> xr.DataArray:
"""Production from matching demand via annual lcoe."""
from muse.costs import annual_levelized_cost_of_energy as lcoe
from muse.quantities import demand_matched_production, gross_margin
from muse.utilities import broadcast_techs

if costs == "prices":
prices = market.prices
elif costs == "gross_margin":
prices = gross_margin(technologies, capacity, market.prices)
elif costs == "lcoe":
prices = lcoe(
market.prices, cast(xr.Dataset, broadcast_techs(technologies, capacity))
)
else:
raise ValueError(f"Unknown costs option {costs}")

return demand_matched_production(
demand=market.consumption,
prices=prices,
capacity=capacity,
technologies=technologies,
)


@register_production(name="costed")
def costed_production(
market: xr.Dataset,
capacity: xr.DataArray,
technologies: xr.Dataset,
costs: Union[xr.DataArray, Callable, str] = "alcoe",
with_minimum_service: bool = True,
with_emission: bool = True,
) -> xr.DataArray:
"""Computes production from ranked assets.

The assets are ranked according to their cost. The cost can be provided as an
xarray, a callable creating an xarray, or as "alcoe". The asset with least cost are
allowed to service the demand first, up to the maximum production. By default, the
minimum service is applied first.
"""
from muse.commodities import CommodityUsage, check_usage, is_pollutant
from muse.costs import annual_levelized_cost_of_energy
from muse.quantities import (
costed_production,
emission,
)
from muse.utilities import broadcast_techs

if isinstance(costs, str) and costs.lower() == "alcoe":
costs = annual_levelized_cost_of_energy
elif isinstance(costs, str):
raise ValueError(f"Unknown cost {costs}")
if callable(costs):
technodata = cast(xr.Dataset, broadcast_techs(technologies, capacity))
costs = costs(
prices=market.prices.sel(region=technodata.region), technologies=technodata
)
else:
costs = costs
assert isinstance(costs, xr.DataArray)

production = costed_production(
market.consumption,
costs,
capacity,
technologies,
with_minimum_service=with_minimum_service,
)
# add production of environmental pollutants
if with_emission:
env = is_pollutant(technologies.comm_usage)
production[dict(commodity=env)] = emission(
production, technologies.fixed_outputs
).transpose(*production.dims)
production[
dict(
commodity=~check_usage(technologies.comm_usage, CommodityUsage.PRODUCT)
)
] = 0
return production
106 changes: 0 additions & 106 deletions src/muse/quantities.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,36 +386,6 @@ def maximum_production(
return result.where(is_enduse(result.comm_usage), 0)


def demand_matched_production(
demand: xr.DataArray,
prices: xr.DataArray,
capacity: xr.DataArray,
technologies: xr.Dataset,
**filters,
) -> xr.DataArray:
"""Production matching the input demand.

Arguments:
demand: demand to match.
prices: price from which to compute the annual levelized cost of energy.
capacity: capacity from which to obtain the maximum production constraints.
technologies: technologies we are looking at
**filters: keyword arguments with which to filter the input datasets and
data arrays., e.g. region, or year.
"""
from muse.costs import annual_levelized_cost_of_energy as ALCOE
from muse.demand_matching import demand_matching
from muse.utilities import broadcast_techs

technodata = cast(xr.Dataset, broadcast_techs(technologies, capacity))
cost = ALCOE(prices=prices, technologies=technodata, **filters)
max_production = maximum_production(
technodata, capacity, timeslices=demand, **filters
)
assert ("timeslice" in demand.dims) == ("timeslice" in cost.dims)
return demand_matching(demand, cost, max_production)


def capacity_in_use(
production: xr.DataArray,
technologies: xr.Dataset,
Expand Down Expand Up @@ -467,82 +437,6 @@ def capacity_in_use(
return capa_in_use


def costed_production(
demand: xr.Dataset,
costs: xr.DataArray,
capacity: xr.DataArray,
technologies: xr.Dataset,
with_minimum_service: bool = True,
) -> xr.DataArray:
"""Computes production from ranked assets.

The assets are ranked according to their cost. The asset with least cost are allowed
to service the demand first, up to the maximum production. By default, the minimum
service is applied first.
"""
from muse.quantities import maximum_production
from muse.utilities import broadcast_techs

technodata = cast(xr.Dataset, broadcast_techs(technologies, capacity))

if len(capacity.region.dims) == 0:

def group_assets(x: xr.DataArray) -> xr.DataArray:
return x.sum("asset")

else:

def group_assets(x: xr.DataArray) -> xr.DataArray:
return xr.Dataset(dict(x=x)).groupby("region").sum("asset").x

ranking = costs.rank("asset")
maxprod = maximum_production(technodata, capacity, timeslices=demand.timeslice)
commodity = (maxprod > 0).any([i for i in maxprod.dims if i != "commodity"])
commodity = commodity.drop_vars(
[u for u in commodity.coords if u not in commodity.dims]
)
demand = demand.sel(commodity=commodity).copy()

constraints = (
xr.Dataset(dict(maxprod=maxprod, ranking=ranking, has_output=maxprod > 0))
.set_coords("ranking")
.set_coords("has_output")
.sel(commodity=commodity)
)

if not with_minimum_service:
production = xr.zeros_like(constraints.maxprod)
else:
production = (
getattr(technodata, "minimum_service_factor", 0) * constraints.maxprod
)
demand = np.maximum(demand - group_assets(production), 0)

for rank in sorted(set(constraints.ranking.values.flatten())):
condition = (constraints.ranking == rank) & constraints.has_output
current_maxprod = constraints.maxprod.where(condition, 0)
fullprod = group_assets(current_maxprod)
if (fullprod <= demand + 1e-10).all():
current_demand = fullprod
current_prod = current_maxprod
else:
if "region" in demand.dims:
demand_prod = demand.sel(region=production.region)
else:
demand_prod = demand
demand_prod = (
current_maxprod / current_maxprod.sum("asset") * demand_prod
).where(condition, 0)
current_prod = np.minimum(demand_prod, current_maxprod)
current_demand = group_assets(current_prod)
demand -= np.minimum(current_demand, demand)
production = production + current_prod

result = xr.zeros_like(maxprod)
result[dict(commodity=commodity)] = result[dict(commodity=commodity)] + production
return result


def minimum_production(
technologies: xr.Dataset,
capacity: xr.DataArray,
Expand Down
Loading