diff --git a/docs/inputs/toml.rst b/docs/inputs/toml.rst index 6cadb7b8f..058b64e20 100644 --- a/docs/inputs/toml.rst +++ b/docs/inputs/toml.rst @@ -319,6 +319,32 @@ A sector accepts these attributes: Additional methods can be registered with :py:func:`muse.production.register_production` +*technodata* + Path to a csv file containing the characterization of the technologies involved in + the sector, e.g. lifetime, capital costs, etc... See :ref:`inputs-technodata`. + +*technodata_timeslices* + Optional. Path to a csv file describing the utilization factor and minimum service + factor of each technology in each timeslice. + See :ref:`user_guide/inputs/technodata_timeslices`. + +*commodities_in* + Path to a csv file describing the inputs of each technology involved in the sector. + See :ref:`inputs-iocomms`. + +*commodities_out* + Path to a csv file describing the outputs of each technology involved in the sector. + See :ref:`inputs-iocomms`. + +*timeslice_level* + Optional. This represents the level of timeslice granularity over which commodity + flows out of the sector are balanced (e.g. if "day", the sector will aim to meet + commodity demands on a daily basis, rather than an hourly basis). + If not given, defaults to the finest level defined in the global `timeslices` section. + Note: If *technodata_timeslices* is used, the data in this file must match the timeslice + level of the sector (e.g. with global timeslice levels "month", "day" and "hour", if a sector has "day" as + the timeslice level, then *technodata_timeslices* must have columns "month" and "day", but not "hour") + Sectors contain a number of subsections: *interactions* Defines interactions between agents. These interactions take place right before new @@ -372,32 +398,6 @@ Sectors contain a number of subsections: "new_to_retro" type of network has been defined but no retro agents are included in the sector. -*technodata* - - Defines technologies and their features, in terms of costs, efficiencies, and emissions. - - *technodata* are specified as an inline TOML table, e.g. with single - brackets. A technodata section would look like: - - .. code-block:: TOML - - [sectors.residential.technodata] - technodata = '{path}/technodata/residential/Technodata.csv' - commodities_in = '{path}/technodata/residential/CommIn.csv' - commodities_out = '{path}/technodata/residential/CommOut.csv' - - Where: - *technodata* - Path to a csv file containing the characterization of the technologies involved in - the sector, e.g. lifetime, capital costs, etc... See :ref:`inputs-technodata`. - - *commodities_in* - Path to a csv file describing the inputs of each technology involved in the sector. - See :ref:`inputs-iocomms`. - - *commodities_out* - Path to a csv file describing the outputs of each technology involved in the sector. - See :ref:`inputs-iocomms`. *subsectors* diff --git a/src/muse/agents/agent.py b/src/muse/agents/agent.py index 8d7b503ae..db0e8a539 100644 --- a/src/muse/agents/agent.py +++ b/src/muse/agents/agent.py @@ -23,6 +23,7 @@ def __init__( interpolation: str = "linear", category: Optional[str] = None, quantity: Optional[float] = 1, + timeslice_level: Optional[str] = None, ): """Creates a standard MUSE agent. @@ -39,6 +40,9 @@ def __init__( together. quantity: optional value to classify different agents' share of the population. + timeslice_level: the timeslice level over which investments/production + will be optimized (e.g "hour", "day"). If None, the agent will use the + finest timeslice level. """ from uuid import uuid4 @@ -57,6 +61,8 @@ def __init__( """Attribute to classify different sets of agents.""" self.quantity = quantity """Attribute to classify different agents' share of the population.""" + self.timeslice_level = timeslice_level + """Timeslice level for the agent.""" def filter_input( self, @@ -114,6 +120,7 @@ def __init__( asset_threshold: float = 1e-4, quantity: Optional[float] = 1, spend_limit: int = 0, + timeslice_level: Optional[str] = None, **kwargs, ): """Creates a standard agent. @@ -141,6 +148,9 @@ def __init__( asset_threshold: Threshold below which assets are not added. quantity: different agents' share of the population spend_limit: The cost above which agents will not invest + timeslice_level: the timeslice level over which the agent invesments will + be optimized (e.g "hour", "day"). If None, the agent will use the finest + timeslice level. **kwargs: Extra arguments """ from muse.decisions import factory as decision_factory @@ -155,6 +165,7 @@ def __init__( interpolation=interpolation, category=category, quantity=quantity, + timeslice_level=timeslice_level, ) self.year = year @@ -331,6 +342,7 @@ def next( market, technologies, year=current_year, + timeslice_level=self.timeslice_level, ) # Calculate investments @@ -339,6 +351,7 @@ def next( technologies, constraints, year=current_year, + timeslice_level=self.timeslice_level, ) # Add investments @@ -379,7 +392,10 @@ def compute_decision( # Compute the objectives objectives = self.objectives( - technologies=techs, demand=reduced_demand, prices=prices + technologies=techs, + demand=reduced_demand, + prices=prices, + timeslice_level=self.timeslice_level, ) # Compute the decision metric diff --git a/src/muse/constraints.py b/src/muse/constraints.py index 51640942a..559de8eb9 100644 --- a/src/muse/constraints.py +++ b/src/muse/constraints.py @@ -72,7 +72,7 @@ def constraints( search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, - year: Optional[int] = None, + year: int | None = None, **kwargs, ) -> Constraint: pass @@ -115,7 +115,7 @@ def constraints( from mypy_extensions import KwArg from muse.registration import registrator -from muse.timeslices import drop_timeslice +from muse.timeslices import broadcast_timeslice, distribute_timeslice, drop_timeslice CAPACITY_DIMS = "asset", "replacement", "region" """Default dimensions for capacity decision variables.""" @@ -248,11 +248,20 @@ def constraints( market: xr.Dataset, technologies: xr.Dataset, year: int | None = None, + timeslice_level: str | None = None, ) -> list[Constraint]: if year is None: year = int(market.year.min()) constraints = [ - function(demand, assets, search_space, market, technologies, year=year) + function( + demand, + assets, + search_space, + market, + technologies, + year=year, + timeslice_level=timeslice_level, + ) for function in constraint_closures ] return [constraint for constraint in constraints if constraint is not None] @@ -270,6 +279,7 @@ def max_capacity_expansion( year: int | None = None, forecast: int | None = None, interpolation: str = "linear", + **kwargs, ) -> Constraint: r"""Max-capacity addition, max-capacity growth, and capacity limits constraints. @@ -400,6 +410,7 @@ def demand( year: int | None = None, forecast: int = 5, interpolation: str = "linear", + **kwargs, ) -> Constraint: """Constraints production to meet demand.""" from muse.commodities import is_enduse @@ -423,6 +434,7 @@ def search_space( technologies: xr.Dataset, year: int | None = None, forecast: int = 5, + **kwargs, ) -> Constraint | None: """Removes disabled technologies.""" if search_space.all(): @@ -442,6 +454,8 @@ def max_production( market: xr.Dataset, technologies: xr.Dataset, year: int | None = None, + timeslice_level: str | None = None, + **kwargs, ) -> Constraint: """Constructs constraint between capacity and maximum production. @@ -451,7 +465,6 @@ def max_production( from xarray import ones_like, zeros_like from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice, distribute_timeslice if year is None: year = int(market.year.min()) @@ -470,9 +483,9 @@ def max_production( .sel(**kwargs) .drop_vars("technology") ) - capacity = distribute_timeslice(techs.fixed_outputs) * broadcast_timeslice( - techs.utilization_factor - ) + capacity = distribute_timeslice( + techs.fixed_outputs, level=timeslice_level + ) * broadcast_timeslice(techs.utilization_factor, level=timeslice_level) if "asset" not in capacity.dims and "asset" in search_space.dims: capacity = capacity.expand_dims(asset=search_space.asset) production = ones_like(capacity) @@ -489,8 +502,8 @@ def max_production( maxadd = maxadd.rename(technology="replacement") maxadd = maxadd.where(maxadd == 0, 0.0) maxadd = maxadd.where(maxadd > 0, -1.0) - capacity = capacity * broadcast_timeslice(maxadd) - production = production * broadcast_timeslice(maxadd) + capacity = capacity * broadcast_timeslice(maxadd, level=timeslice_level) + production = production * broadcast_timeslice(maxadd, level=timeslice_level) b = b.rename(region="src_region") return xr.Dataset( dict(capacity=-cast(np.ndarray, capacity), production=production, b=b), @@ -506,6 +519,8 @@ def demand_limiting_capacity( market: xr.Dataset, technologies: xr.Dataset, year: int | None = None, + timeslice_level: str | None = None, + **kwargs, ) -> Constraint: """Limits the maximum combined capacity to match the demand. @@ -521,7 +536,13 @@ def demand_limiting_capacity( """ # We start with the maximum production constraint and the demand constraint capacity_constraint = max_production( - demand_, assets, search_space, market, technologies, year=year + demand_, + assets, + search_space, + market, + technologies, + year=year, + timeslice_level=timeslice_level, ) demand_constraint = demand( demand_, assets, search_space, market, technologies, year=year @@ -714,12 +735,13 @@ def minimum_service( market: xr.Dataset, technologies: xr.Dataset, year: int | None = None, + timeslice_level: str | None = None, + **kwargs, ) -> Constraint | None: """Constructs constraint between capacity and minimum service.""" from xarray import ones_like, zeros_like from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice, distribute_timeslice if "minimum_service_factor" not in technologies.data_vars: return None @@ -742,9 +764,9 @@ def minimum_service( .sel(**kwargs) .drop_vars("technology") ) - capacity = distribute_timeslice(techs.fixed_outputs) * broadcast_timeslice( - techs.minimum_service_factor - ) + capacity = distribute_timeslice( + techs.fixed_outputs, level=timeslice_level + ) * broadcast_timeslice(techs.minimum_service_factor, level=timeslice_level) if "asset" not in capacity.dims: capacity = capacity.expand_dims(asset=search_space.asset) production = ones_like(capacity) @@ -755,7 +777,9 @@ def minimum_service( ) -def lp_costs(technologies: xr.Dataset, costs: xr.DataArray) -> xr.Dataset: +def lp_costs( + technologies: xr.Dataset, costs: xr.DataArray, timeslice_level: str | None = None +) -> xr.Dataset: """Creates costs for solving with scipy's LP solver. Example: @@ -805,7 +829,6 @@ def lp_costs(technologies: xr.Dataset, costs: xr.DataArray) -> xr.Dataset: from xarray import zeros_like from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice, distribute_timeslice assert "year" not in technologies.dims @@ -818,7 +841,10 @@ def lp_costs(technologies: xr.Dataset, costs: xr.DataArray) -> xr.Dataset: selection["region"] = costs.region fouts = technologies.fixed_outputs.sel(selection).rename(technology="replacement") - production = zeros_like(broadcast_timeslice(costs) * distribute_timeslice(fouts)) + production = zeros_like( + broadcast_timeslice(costs, level=timeslice_level) + * distribute_timeslice(fouts, level=timeslice_level) + ) for dim in production.dims: if isinstance(production.get_index(dim), pd.MultiIndex): production = drop_timeslice(production) @@ -1133,8 +1159,9 @@ def factory( technologies: xr.Dataset, costs: xr.DataArray, *constraints: Constraint, + timeslice_level: str | None = None, ) -> ScipyAdapter: - lpcosts = lp_costs(technologies, costs) + lpcosts = lp_costs(technologies, costs, timeslice_level=timeslice_level) data = cls._unified_dataset(technologies, lpcosts, *constraints) diff --git a/src/muse/costs.py b/src/muse/costs.py index adfba0f44..4461113eb 100644 --- a/src/muse/costs.py +++ b/src/muse/costs.py @@ -23,6 +23,7 @@ def net_present_value( capacity: xr.DataArray, production: xr.DataArray, year: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Net present value (NPV) of the relevant technologies. @@ -51,6 +52,7 @@ def net_present_value( capacity: xr.DataArray with the capacity of the relevant technologies production: xr.DataArray with the production of the relevant technologies year: int, the year of the forecast + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: xr.DataArray with the NPV calculated for the relevant technologies @@ -84,7 +86,8 @@ def net_present_value( years - year + 1, interest_rate=techs.interest_rate, mask=years <= year + life, - ) + ), + level=timeslice_level, ) # Filters @@ -102,7 +105,7 @@ def net_present_value( # Cost of installed capacity installed_capacity_costs = distribute_timeslice( - techs.cap_par * (capacity**techs.cap_exp), + techs.cap_par * (capacity**techs.cap_exp), level=timeslice_level ) # Cost related to environmental products @@ -123,17 +126,21 @@ def net_present_value( # Fixed costs fixed_costs = ( - distribute_timeslice(techs.fix_par * (capacity**techs.fix_exp)) * rates + distribute_timeslice( + techs.fix_par * (capacity**techs.fix_exp), level=timeslice_level + ) + * rates ).sum("year") # Variable costs tech_activity = ( - production.sel(commodity=products) / broadcast_timeslice(techs.fixed_outputs) + production.sel(commodity=products) + / broadcast_timeslice(techs.fixed_outputs, level=timeslice_level) ).max("commodity") variable_costs = ( ( - broadcast_timeslice(techs.var_par) - * tech_activity ** broadcast_timeslice(techs.var_exp) + broadcast_timeslice(techs.var_par, level=timeslice_level) + * tech_activity ** broadcast_timeslice(techs.var_exp, level=timeslice_level) ) * rates ).sum("year") @@ -186,6 +193,7 @@ def equivalent_annual_cost( capacity: xr.DataArray, production: xr.DataArray, year: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Equivalent annual costs (or annualized cost) of a technology. @@ -203,13 +211,14 @@ def equivalent_annual_cost( capacity: xr.DataArray with the capacity of the relevant technologies production: xr.DataArray with the production of the relevant technologies year: int, the year of the forecast + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: xr.DataArray with the EAC calculated for the relevant technologies """ npc = net_present_cost(technologies, prices, capacity, production, year) crf = capital_recovery_factor(technologies) - return npc * broadcast_timeslice(crf) + return npc * broadcast_timeslice(crf, level=timeslice_level) def lifetime_levelized_cost_of_energy( @@ -218,6 +227,7 @@ def lifetime_levelized_cost_of_energy( capacity: xr.DataArray, production: xr.DataArray, year: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Levelized cost of energy (LCOE) of technologies over their lifetime. @@ -229,12 +239,11 @@ def lifetime_levelized_cost_of_energy( capacity: xr.DataArray with the capacity of the relevant technologies production: xr.DataArray with the production of the relevant technologies year: int, the year of the forecast + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: xr.DataArray with the LCOE calculated for the relevant technologies """ - from muse.timeslices import broadcast_timeslice, distribute_timeslice - techs = technologies[ [ "technical_life", @@ -263,7 +272,8 @@ def lifetime_levelized_cost_of_energy( years=years - year + 1, interest_rate=techs.interest_rate, mask=years <= year + life, - ) + ), + level=timeslice_level, ) # Filters @@ -273,11 +283,16 @@ def lifetime_levelized_cost_of_energy( fuels = is_fuel(technologies.comm_usage) # Calculate consumption - cons = consumption(technologies=techs, production=production, prices=prices) + cons = consumption( + technologies=techs, + production=production, + prices=prices, + timeslice_level=timeslice_level, + ) # Cost of installed capacity installed_capacity_costs = distribute_timeslice( - techs.cap_par * (capacity**techs.cap_exp), + techs.cap_par * (capacity**techs.cap_exp), level=timeslice_level ) # Cost related to environmental products @@ -298,17 +313,21 @@ def lifetime_levelized_cost_of_energy( # Fixed costs fixed_costs = ( - distribute_timeslice(techs.fix_par * (capacity**techs.fix_exp)) * rates + distribute_timeslice( + techs.fix_par * (capacity**techs.fix_exp), level=timeslice_level + ) + * rates ).sum("year") # Variable costs tech_activity = ( - production.sel(commodity=products) / broadcast_timeslice(techs.fixed_outputs) + production.sel(commodity=products) + / broadcast_timeslice(techs.fixed_outputs, level=timeslice_level) ).max("commodity") variable_costs = ( ( - broadcast_timeslice(techs.var_par) - * tech_activity ** broadcast_timeslice(techs.var_exp) + broadcast_timeslice(techs.var_par, level=timeslice_level) + * tech_activity ** broadcast_timeslice(techs.var_exp, level=timeslice_level) ) * rates ).sum("year") @@ -339,6 +358,7 @@ def annual_levelized_cost_of_energy( prices: xr.DataArray, interpolation: str = "linear", fill_value: Union[int, str] = "extrapolate", + timeslice_level: Optional[str] = None, **filters, ) -> xr.DataArray: """Undiscounted levelized cost of energy (LCOE) of technologies on each given year. @@ -366,6 +386,7 @@ def annual_levelized_cost_of_energy( This dataarray contains at least timeslice and commodity dimensions. interpolation: interpolation method. fill_value: Fill value for values outside the extrapolation range. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") **filters: Anything by which prices can be filtered. Return: @@ -402,31 +423,33 @@ def annual_levelized_cost_of_energy( # Capital costs annualized_capital_costs = distribute_timeslice( - techs.cap_par * rates - ) / broadcast_timeslice(techs.utilization_factor) + techs.cap_par * rates, level=timeslice_level + ) / broadcast_timeslice(techs.utilization_factor, level=timeslice_level) # Fixed and variable running costs o_and_e_costs = distribute_timeslice( - techs.fix_par + techs.var_par - ) / broadcast_timeslice(techs.utilization_factor) + techs.fix_par + techs.var_par, level=timeslice_level + ) / broadcast_timeslice(techs.utilization_factor, level=timeslice_level) # Fuel costs from fixed and flexible inputs - fuel_costs = (distribute_timeslice(techs.fixed_inputs) * prices).sum("commodity") - fuel_costs += (distribute_timeslice(techs.flexible_inputs) * prices).sum( - "commodity" - ) + fuel_costs = ( + distribute_timeslice(techs.fixed_inputs, level=timeslice_level) * prices + ).sum("commodity") + fuel_costs += ( + distribute_timeslice(techs.flexible_inputs, level=timeslice_level) * prices + ).sum("commodity") # Environmental costs if "region" in techs.dims: env_costs = ( - (distribute_timeslice(techs.fixed_outputs) * prices) + (distribute_timeslice(techs.fixed_outputs, level=timeslice_level) * prices) .sel(region=techs.region) .sel(commodity=is_pollutant(techs.comm_usage)) .sum("commodity") ) else: env_costs = ( - (distribute_timeslice(techs.fixed_outputs) * prices) + (distribute_timeslice(techs.fixed_outputs, level=timeslice_level) * prices) .sel(commodity=is_pollutant(techs.comm_usage)) .sum("commodity") ) diff --git a/src/muse/data/default_settings.toml b/src/muse/data/default_settings.toml index 9081520fb..35bc1113c 100644 --- a/src/muse/data/default_settings.toml +++ b/src/muse/data/default_settings.toml @@ -70,8 +70,3 @@ summer.weekend.night = 150 summer.weekend.morning = 150 summer.weekend.afternoon = 150 summer.weekend.evening = 150 - -[timeslices.aggregates] -all-day = ["night", "morning", "afternoon", "early-peak", "late-peak", "evening"] -all-week = ["weekday", "weekend"] -all-year = ["winter", "summer", "spring-autumn"] diff --git a/src/muse/demand_share.py b/src/muse/demand_share.py index f32aab808..e8cf0d458 100644 --- a/src/muse/demand_share.py +++ b/src/muse/demand_share.py @@ -114,6 +114,7 @@ def new_and_retro( technologies: xr.Dataset, current_year: int, forecast: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: r"""Splits demand across new and retro agents. @@ -132,6 +133,7 @@ def new_and_retro( technologies: quantities describing the technologies. current_year: Current year of simulation forecast: How many years to forecast ahead + timeslice_level: the timeslice level of the sector (e.g. "hour", "day") Pseudo-code: @@ -234,6 +236,7 @@ def decommissioning(capacity): technologies, capacity, year=[current_year, current_year + forecast], + timeslice_level=timeslice_level, ).squeeze("year") capacity = reduce_assets([u.assets.capacity for u in agents]) @@ -244,6 +247,7 @@ def decommissioning(capacity): technologies, current_year=current_year, forecast=forecast, + timeslice_level=timeslice_level, ) demands = demands.where( @@ -308,6 +312,7 @@ def decommissioning(capacity): maximum_production, technologies=regional_techs, year=current_year, + timeslice_level=timeslice_level, ), id_to_nquantity, ) @@ -325,6 +330,7 @@ def standard_demand( technologies: xr.Dataset, current_year: int, forecast: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: r"""Splits demand across new agents. @@ -343,6 +349,7 @@ def standard_demand( technologies: quantities describing the technologies. current_year: Current year of simulation forecast: How many years to forecast ahead + timeslice_level: the timeslice level of the sector (e.g. "hour", "day") """ from functools import partial @@ -358,6 +365,7 @@ def decommissioning(capacity): technologies, capacity, year=[current_year, current_year + forecast], + timeslice_level=timeslice_level, ).squeeze("year") # Make sure there are no retrofit agents @@ -375,6 +383,7 @@ def decommissioning(capacity): technologies, current_year=current_year, forecast=forecast, + timeslice_level=timeslice_level, ) # Only consider end-use commodities @@ -410,6 +419,7 @@ def decommissioning(capacity): maximum_production, technologies=technologies.sel(region=region), year=current_year, + timeslice_level=timeslice_level, ), id_to_quantity, ) @@ -431,6 +441,7 @@ def unmet_forecasted_demand( technologies: xr.Dataset, current_year: int, forecast: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Forecast demand that cannot be serviced by non-decommissioned current assets.""" from muse.commodities import is_enduse @@ -441,7 +452,9 @@ def unmet_forecasted_demand( smarket: xr.Dataset = market.where(is_enduse(comm_usage), 0).interp(year=year) capacity = reduce_assets([u.assets.capacity.interp(year=year) for u in agents]) capacity = cast(xr.DataArray, capacity) - result = unmet_demand(smarket, capacity, technologies) + result = unmet_demand( + smarket, capacity, technologies, timeslice_level=timeslice_level + ) if "year" in result.dims: result = result.squeeze("year") return result @@ -503,6 +516,7 @@ def unmet_demand( market: xr.Dataset, capacity: xr.DataArray, technologies: xr.Dataset, + timeslice_level: Optional[str] = None, ): r"""Share of the demand that cannot be serviced by the existing assets. @@ -519,7 +533,9 @@ def unmet_demand( from muse.quantities import maximum_production # Calculate maximum production by existing assets - produced = maximum_production(capacity=capacity, technologies=technologies) + produced = maximum_production( + capacity=capacity, technologies=technologies, timeslice_level=timeslice_level + ) # Total commodity production by summing over assets if "dst_region" in produced.dims: @@ -540,6 +556,7 @@ def new_consumption( technologies: xr.Dataset, current_year: int, forecast: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: r"""Computes share of the demand attributed to new agents. @@ -569,7 +586,7 @@ def new_consumption( # Calculate the increase in consumption over the forecast period delta = (forecasted.consumption - current.consumption).clip(min=0) - missing = unmet_demand(current, capa, technologies) + missing = unmet_demand(current, capa, technologies, timeslice_level=timeslice_level) consumption = minimum(delta, missing) return consumption @@ -580,6 +597,7 @@ def new_and_retro_demands( technologies: xr.Dataset, current_year: int, forecast: int, + timeslice_level: Optional[str] = None, ) -> xr.Dataset: """Splits demand into *new* and *retrofit* demand. @@ -607,7 +625,12 @@ def new_and_retro_demands( # Calculate demand to allocate to "new" agents new_demand = new_consumption( - capa, smarket, technologies, current_year=current_year, forecast=forecast + capa, + smarket, + technologies, + current_year=current_year, + forecast=forecast, + timeslice_level=timeslice_level, ) if "year" in new_demand.dims: new_demand = new_demand.squeeze("year") @@ -617,6 +640,7 @@ def new_and_retro_demands( maximum_production( technologies, capa.sel(year=current_year + forecast), + timeslice_level=timeslice_level, ) .groupby("region") .sum("asset") diff --git a/src/muse/investments.py b/src/muse/investments.py index 50b3cb23d..8af7fcfa1 100644 --- a/src/muse/investments.py +++ b/src/muse/investments.py @@ -64,6 +64,7 @@ def investment( from muse.errors import GrowthOfCapacityTooConstrained from muse.outputs.cache import cache_quantity from muse.registration import registrator +from muse.timeslices import timeslice_max INVESTMENT_SIGNATURE = Callable[ [xr.DataArray, xr.DataArray, xr.Dataset, list[Constraint], KwArg(Any)], @@ -224,6 +225,7 @@ def adhoc_match_demand( technologies: xr.Dataset, constraints: list[Constraint], year: int, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: from muse.demand_matching import demand_matching from muse.quantities import capacity_in_use, maximum_production @@ -237,6 +239,7 @@ def adhoc_match_demand( year=year, technology=costs.replacement, commodity=demand.commodity, + timeslice_level=timeslice_level, ).drop_vars("technology") # Push disabled techs to last rank. @@ -253,7 +256,11 @@ def adhoc_match_demand( ).where(search_space, 0) capacity = capacity_in_use( - production, technologies, year=year, technology=production.replacement + production, + technologies, + year=year, + technology=production.replacement, + timeslice_level=timeslice_level, ).drop_vars("technology") if "timeslice" in capacity.dims: capacity = timeslice_max(capacity) @@ -269,6 +276,7 @@ def scipy_match_demand( technologies: xr.Dataset, constraints: list[Constraint], year: Optional[int] = None, + timeslice_level: Optional[str] = None, **options, ) -> xr.DataArray: from logging import getLogger @@ -289,7 +297,9 @@ def scipy_match_demand( techs = technologies # Run scipy optimization with highs solver - adapter = ScipyAdapter.factory(techs, cast(np.ndarray, costs), *constraints) + adapter = ScipyAdapter.factory( + techs, cast(np.ndarray, costs), *constraints, timeslice_level=timeslice_level + ) res = linprog(**adapter.kwargs, method="highs") # Backup: try with highs-ipm @@ -381,14 +391,3 @@ def default_to_scipy(): solution = cast(Callable[[np.ndarray], xr.Dataset], adapter.to_muse)(list(res["x"])) return solution - - -def timeslice_max(x: xr.DataArray) -> xr.DataArray: - """Find the max value over the timeslice dimension, normlaized for timeslice length. - - This first annualizes the value in each timeslice by dividing by the fraction of the - year that the timeslice occupies, then takes the maximum value - """ - from muse.timeslices import TIMESLICE, broadcast_timeslice - - return (x / (TIMESLICE / broadcast_timeslice(TIMESLICE.sum()))).max("timeslice") diff --git a/src/muse/mca.py b/src/muse/mca.py index ed455e123..3f2fcea2c 100644 --- a/src/muse/mca.py +++ b/src/muse/mca.py @@ -14,7 +14,7 @@ from muse.outputs.cache import OutputCache from muse.readers import read_initial_market from muse.sectors import SECTORS_REGISTERED, AbstractSector, Sector -from muse.timeslices import drop_timeslice +from muse.timeslices import broadcast_timeslice, drop_timeslice from muse.utilities import future_propagation @@ -41,7 +41,6 @@ def factory(cls, settings: str | Path | Mapping | Any) -> MCA: from muse.outputs.mca import factory as ofactory from muse.readers import read_settings from muse.readers.toml import convert - from muse.timeslices import drop_timeslice if isinstance(settings, (str, Path)): settings = read_settings(settings) # type: ignore @@ -272,8 +271,6 @@ def run(self) -> None: from xarray import DataArray - from muse.timeslices import broadcast_timeslice - nyear = len(self.time_framework) - 1 check_carbon_budget = len(self.carbon_budget) and len(self.carbon_commodities) shoots = self.control_undershoot or self.control_overshoot @@ -358,7 +355,6 @@ def single_year_iteration( from copy import deepcopy from muse.commodities import is_enduse - from muse.timeslices import drop_timeslice sectors = deepcopy(sectors) market = market.copy(deep=True) diff --git a/src/muse/objectives.py b/src/muse/objectives.py index 5e51d7ef0..ed865d8e7 100644 --- a/src/muse/objectives.py +++ b/src/muse/objectives.py @@ -63,7 +63,7 @@ def comfort( ] from collections.abc import Mapping, MutableMapping, Sequence -from typing import Any, Callable, Union +from typing import Any, Callable, Optional, Union import numpy as np import xarray as xr @@ -71,7 +71,7 @@ def comfort( from muse.outputs.cache import cache_quantity from muse.registration import registrator -from muse.timeslices import drop_timeslice +from muse.timeslices import broadcast_timeslice, distribute_timeslice, drop_timeslice from muse.utilities import filter_input OBJECTIVE_SIGNATURE = Callable[ @@ -130,18 +130,22 @@ def objectives( technologies: xr.Dataset, demand: xr.DataArray, prices: xr.DataArray, + timeslice_level: Optional[str] = None, *args, **kwargs, ) -> xr.Dataset: - from muse.timeslices import broadcast_timeslice - result = xr.Dataset() for name, objective in functions: obj = objective( - technologies=technologies, demand=demand, prices=prices, *args, **kwargs + technologies=technologies, + demand=demand, + prices=prices, + timeslice_level=timeslice_level, + *args, + **kwargs, ) if "timeslice" not in obj.dims: - obj = broadcast_timeslice(obj) + obj = broadcast_timeslice(obj, level=timeslice_level) if "timeslice" in result.dims: obj = drop_timeslice(obj) result[name] = obj @@ -323,7 +327,6 @@ def emission_cost( with :math:`s` the timeslices and :math:`c` the commodity. """ from muse.commodities import is_enduse, is_pollutant - from muse.timeslices import distribute_timeslice enduses = is_enduse(technologies.comm_usage.sel(commodity=demand.commodity)) total = demand.sel(commodity=enduses).sum("commodity") @@ -381,6 +384,7 @@ def lifetime_levelized_cost_of_energy( technologies: xr.Dataset, demand: xr.DataArray, prices: xr.DataArray, + timeslice_level: Optional[str] = None, *args, **kwargs, ): @@ -393,13 +397,14 @@ def lifetime_levelized_cost_of_energy( """ from muse.costs import lifetime_levelized_cost_of_energy as LCOE from muse.quantities import capacity_to_service_demand - from muse.timeslices import broadcast_timeslice, distribute_timeslice - capacity = capacity_to_service_demand(technologies=technologies, demand=demand) + capacity = capacity_to_service_demand( + technologies=technologies, demand=demand, timeslice_level=timeslice_level + ) production = ( - broadcast_timeslice(capacity) - * distribute_timeslice(technologies.fixed_outputs) - * broadcast_timeslice(technologies.utilization_factor) + broadcast_timeslice(capacity, level=timeslice_level) + * distribute_timeslice(technologies.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(technologies.utilization_factor, level=timeslice_level) ) results = LCOE( @@ -408,6 +413,7 @@ def lifetime_levelized_cost_of_energy( capacity=capacity, production=production, year=demand.year.item(), + timeslice_level=timeslice_level, ) return results.where(np.isfinite(results)).fillna(0.0) @@ -418,6 +424,7 @@ def net_present_value( technologies: xr.Dataset, demand: xr.DataArray, prices: xr.DataArray, + timeslice_level: Optional[str] = None, *args, **kwargs, ): @@ -427,13 +434,12 @@ def net_present_value( """ from muse.costs import net_present_value as NPV from muse.quantities import capacity_to_service_demand - from muse.timeslices import broadcast_timeslice, distribute_timeslice capacity = capacity_to_service_demand(technologies=technologies, demand=demand) production = ( - broadcast_timeslice(capacity) - * distribute_timeslice(technologies.fixed_outputs) - * broadcast_timeslice(technologies.utilization_factor) + broadcast_timeslice(capacity, level=timeslice_level) + * distribute_timeslice(technologies.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(technologies.utilization_factor, level=timeslice_level) ) results = NPV( @@ -442,6 +448,7 @@ def net_present_value( capacity=capacity, production=production, year=demand.year.item(), + timeslice_level=timeslice_level, ) return results @@ -451,6 +458,7 @@ def net_present_cost( technologies: xr.Dataset, demand: xr.DataArray, prices: xr.DataArray, + timeslice_level: Optional[str] = None, *args, **kwargs, ): @@ -460,13 +468,12 @@ def net_present_cost( """ from muse.costs import net_present_cost as NPC from muse.quantities import capacity_to_service_demand - from muse.timeslices import broadcast_timeslice, distribute_timeslice capacity = capacity_to_service_demand(technologies=technologies, demand=demand) production = ( - broadcast_timeslice(capacity) - * distribute_timeslice(technologies.fixed_outputs) - * broadcast_timeslice(technologies.utilization_factor) + broadcast_timeslice(capacity, level=timeslice_level) + * distribute_timeslice(technologies.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(technologies.utilization_factor, level=timeslice_level) ) results = NPC( @@ -484,6 +491,7 @@ def equivalent_annual_cost( technologies: xr.Dataset, demand: xr.DataArray, prices: xr.DataArray, + timeslice_level: Optional[str] = None, *args, **kwargs, ): @@ -493,13 +501,12 @@ def equivalent_annual_cost( """ from muse.costs import equivalent_annual_cost as EAC from muse.quantities import capacity_to_service_demand - from muse.timeslices import broadcast_timeslice, distribute_timeslice capacity = capacity_to_service_demand(technologies=technologies, demand=demand) production = ( - broadcast_timeslice(capacity) - * distribute_timeslice(technologies.fixed_outputs) - * broadcast_timeslice(technologies.utilization_factor) + broadcast_timeslice(capacity, level=timeslice_level) + * distribute_timeslice(technologies.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(technologies.utilization_factor, level=timeslice_level) ) results = EAC( @@ -508,5 +515,6 @@ def equivalent_annual_cost( capacity=capacity, production=production, year=demand.year.item(), + timeslice_level=timeslice_level, ) return results diff --git a/src/muse/production.py b/src/muse/production.py index 39f8dbb84..076c818c4 100644 --- a/src/muse/production.py +++ b/src/muse/production.py @@ -38,7 +38,7 @@ def production( "PRODUCTION_SIGNATURE", ] from collections.abc import Mapping, MutableMapping -from typing import Any, Callable, Union, cast +from typing import Any, Callable, Optional, Union, cast import xarray as xr @@ -98,7 +98,10 @@ def factory( @register_production(name=("max", "maximum")) def maximum_production( - market: xr.Dataset, capacity: xr.DataArray, technologies: xr.Dataset + market: xr.Dataset, + capacity: xr.DataArray, + technologies: xr.Dataset, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Production when running at full capacity. @@ -107,12 +110,15 @@ def maximum_production( """ from muse.quantities import maximum_production - return maximum_production(technologies, capacity) + return maximum_production(technologies, capacity, timeslice_level) @register_production(name=("share", "shares")) def supply( - market: xr.Dataset, capacity: xr.DataArray, technologies: xr.Dataset + market: xr.Dataset, + capacity: xr.DataArray, + technologies: xr.Dataset, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Service current demand equally from all assets. @@ -121,4 +127,6 @@ def supply( """ from muse.quantities import supply - return supply(capacity, market.consumption, technologies) + return supply( + capacity, market.consumption, technologies, timeslice_level=timeslice_level + ) diff --git a/src/muse/quantities.py b/src/muse/quantities.py index fa4cec2cc..27ccfba3c 100644 --- a/src/muse/quantities.py +++ b/src/muse/quantities.py @@ -13,11 +13,14 @@ import numpy as np import xarray as xr +from muse.timeslices import broadcast_timeslice, distribute_timeslice + def supply( capacity: xr.DataArray, demand: xr.DataArray, technologies: Union[xr.Dataset, xr.DataArray], + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Production and emission for a given capacity servicing a given demand. @@ -34,16 +37,20 @@ def supply( exceed its share of the demand. technologies: factors bindings the capacity of an asset with its production of commodities and environmental pollutants. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: A data array where the commodity dimension only contains actual outputs (i.e. no input commodities). """ from muse.commodities import CommodityUsage, check_usage, is_pollutant - from muse.timeslices import broadcast_timeslice - maxprod = maximum_production(technologies, capacity) - minprod = minimum_production(technologies, capacity) + maxprod = maximum_production( + technologies, capacity, timeslice_level=timeslice_level + ) + minprod = minimum_production( + technologies, capacity, timeslice_level=timeslice_level + ) size = np.array(maxprod.region).size # in presence of trade demand needs to map maxprod dst_region if ( @@ -83,10 +90,14 @@ def supply( expanded_demand = (demand * maxprod / maxprod.sum(demsum)).fillna(0) expanded_maxprod = ( - maxprod * demand / broadcast_timeslice(demand.sum(prodsum)) + maxprod + * demand + / broadcast_timeslice(demand.sum(prodsum), level=timeslice_level) ).fillna(0) expanded_minprod = ( - minprod * demand / broadcast_timeslice(demand.sum(prodsum)) + minprod + * demand + / broadcast_timeslice(demand.sum(prodsum), level=timeslice_level) ).fillna(0) expanded_demand = expanded_demand.reindex_like(maxprod) expanded_minprod = expanded_minprod.reindex_like(maxprod) @@ -98,9 +109,9 @@ def supply( # add production of environmental pollutants env = is_pollutant(technologies.comm_usage) - result[{"commodity": env}] = emission(result, technologies.fixed_outputs).transpose( - *result.dims - ) + result[{"commodity": env}] = emission( + result, technologies.fixed_outputs, timeslice_level=timeslice_level + ).transpose(*result.dims) result[ {"commodity": ~check_usage(technologies.comm_usage, CommodityUsage.PRODUCT)} ] = 0 @@ -108,7 +119,11 @@ def supply( return result -def emission(production: xr.DataArray, fixed_outputs: xr.DataArray): +def emission( + production: xr.DataArray, + fixed_outputs: xr.DataArray, + timeslice_level: Optional[str] = None, +): """Computes emission from current products. Emissions are computed as `sum(product) * fixed_outputs`. @@ -118,12 +133,12 @@ def emission(production: xr.DataArray, fixed_outputs: xr.DataArray): when computing emissions. fixed_outputs: factor relating total production to emissions. For convenience, this can also be a `technologies` dataset containing `fixed_output`. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: A data array containing emissions (and only emissions). """ from muse.commodities import is_enduse, is_pollutant - from muse.timeslices import broadcast_timeslice from muse.utilities import broadcast_techs # just in case we are passed a technologies dataset, like in other functions @@ -133,12 +148,15 @@ def emission(production: xr.DataArray, fixed_outputs: xr.DataArray): envs = is_pollutant(fouts.comm_usage) enduses = is_enduse(fouts.comm_usage) return production.sel(commodity=enduses).sum("commodity") * broadcast_timeslice( - fouts.sel(commodity=envs) + fouts.sel(commodity=envs), level=timeslice_level ) def gross_margin( - technologies: xr.Dataset, capacity: xr.DataArray, prices: xr.Dataset + technologies: xr.Dataset, + capacity: xr.DataArray, + prices: xr.Dataset, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """The percentage of revenue after direct expenses have been subtracted. @@ -152,7 +170,6 @@ def gross_margin( - non-environmental commodities OUTPUTS are related to revenues. """ from muse.commodities import is_enduse, is_pollutant - from muse.timeslices import distribute_timeslice from muse.utilities import broadcast_techs tech = broadcast_techs( # type: ignore @@ -191,13 +208,18 @@ def gross_margin( # Variable costs depend on factors such as labour variable_costs = distribute_timeslice( var_par * ((fixed_outputs.sel(commodity=enduses)).sum("commodity")) ** var_exp, + level=timeslice_level, ) # The individual prices are selected # costs due to consumables, direct inputs - consumption_costs = (prices * distribute_timeslice(fixed_inputs)).sum("commodity") + consumption_costs = ( + prices * distribute_timeslice(fixed_inputs, level=timeslice_level) + ).sum("commodity") # costs due to pollutants - production_costs = prices * distribute_timeslice(fixed_outputs) + production_costs = prices * distribute_timeslice( + fixed_outputs, level=timeslice_level + ) environmental_costs = (production_costs.sel(commodity=environmentals)).sum( "commodity" ) @@ -216,6 +238,7 @@ def decommissioning_demand( technologies: xr.Dataset, capacity: xr.DataArray, year: Optional[Sequence[int]] = None, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: r"""Computes demand from process decommissioning. @@ -257,6 +280,7 @@ def decommissioning_demand( return maximum_production( technologies, capacity_decrease, + timeslice_level=timeslice_level, ).clip(min=0) @@ -264,6 +288,7 @@ def consumption( technologies: xr.Dataset, production: xr.DataArray, prices: Optional[xr.DataArray] = None, + timeslice_level: Optional[str] = None, **kwargs, ) -> xr.DataArray: """Commodity consumption when fulfilling the whole production. @@ -272,7 +297,6 @@ def consumption( are not given, then flexible consumption is *not* considered. """ from muse.commodities import is_enduse, is_fuel - from muse.timeslices import broadcast_timeslice from muse.utilities import filter_with_template params = filter_with_template( @@ -286,7 +310,7 @@ def consumption( production = production.sel(commodity=is_enduse(comm_usage)).sum("commodity") params_fuels = is_fuel(params.comm_usage) consumption = production * broadcast_timeslice( - params.fixed_inputs.where(params_fuels, 0) + params.fixed_inputs.where(params_fuels, 0), level=timeslice_level ) if prices is None: @@ -308,7 +332,9 @@ def consumption( ] # add consumption from cheapest fuel assert all(flexs.commodity.values == consumption.commodity.values) - flex = flexs.where(minprices == broadcast_timeslice(flexs.commodity), 0) + flex = flexs.where( + minprices == broadcast_timeslice(flexs.commodity, level=timeslice_level), 0 + ) flex = flex / (flex > 0).sum("commodity").clip(min=1) return consumption + flex * production @@ -316,6 +342,7 @@ def consumption( def maximum_production( technologies: xr.Dataset, capacity: xr.DataArray, + timeslice_level: Optional[str] = None, **filters, ): r"""Production for a given capacity. @@ -345,13 +372,13 @@ def maximum_production( technologies. Filters not relevant to the quantities of interest, i.e. filters that are not a dimension of `capacity` or `technologies`, are silently ignored. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: `capacity * fixed_outputs * utilization_factor`, whittled down according to the filters and the set of technologies in `capacity`. """ from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice, distribute_timeslice from muse.utilities import broadcast_techs, filter_input capa = filter_input( @@ -364,9 +391,9 @@ def maximum_production( btechs, **{k: v for k, v in filters.items() if k in btechs.dims} ) result = ( - broadcast_timeslice(capa) - * distribute_timeslice(ftechs.fixed_outputs) - * broadcast_timeslice(ftechs.utilization_factor) + broadcast_timeslice(capa, level=timeslice_level) + * distribute_timeslice(ftechs.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(ftechs.utilization_factor, level=timeslice_level) ) return result.where(is_enduse(result.comm_usage), 0) @@ -375,6 +402,7 @@ def capacity_in_use( production: xr.DataArray, technologies: xr.Dataset, max_dim: Optional[Union[str, tuple[str]]] = "commodity", + timeslice_level: Optional[str] = None, **filters, ): """Capacity-in-use for each asset, given production. @@ -392,12 +420,12 @@ def capacity_in_use( technologies. Filters not relevant to the quantities of interest, i.e. filters that are not a dimension of `capacity` or `technologies`, are silently ignored. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: Capacity-in-use for each technology, whittled down by the filters. """ from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice from muse.utilities import broadcast_techs, filter_input prod = filter_input( @@ -412,7 +440,9 @@ def capacity_in_use( ) factor = 1 / (ftechs.fixed_outputs * ftechs.utilization_factor) - capa_in_use = (prod * broadcast_timeslice(factor)).where(~np.isinf(factor), 0) + capa_in_use = (prod * broadcast_timeslice(factor, level=timeslice_level)).where( + ~np.isinf(factor), 0 + ) capa_in_use = capa_in_use.where( is_enduse(technologies.comm_usage.sel(commodity=capa_in_use.commodity)), 0 @@ -426,6 +456,7 @@ def capacity_in_use( def minimum_production( technologies: xr.Dataset, capacity: xr.DataArray, + timeslice_level: Optional[str] = None, **filters, ): r"""Minimum production for a given capacity. @@ -455,13 +486,13 @@ def minimum_production( technologies. Filters not relevant to the quantities of interest, i.e. filters that are not a dimension of `capacity` or `technologies`, are silently ignored. + timeslice_level: the desired timeslice level of the result (e.g. "hour", "day") Return: `capacity * fixed_outputs * minimum_service_factor`, whittled down according to the filters and the set of technologies in `capacity`. """ from muse.commodities import is_enduse - from muse.timeslices import broadcast_timeslice, distribute_timeslice from muse.utilities import broadcast_techs, filter_input capa = filter_input( @@ -469,7 +500,7 @@ def minimum_production( ) if "minimum_service_factor" not in technologies: - return broadcast_timeslice(xr.zeros_like(capa)) + return broadcast_timeslice(xr.zeros_like(capa), level=timeslice_level) btechs = broadcast_techs( # type: ignore cast( @@ -482,9 +513,9 @@ def minimum_production( btechs, **{k: v for k, v in filters.items() if k in btechs.dims} ) result = ( - broadcast_timeslice(capa) - * distribute_timeslice(ftechs.fixed_outputs) - * broadcast_timeslice(ftechs.minimum_service_factor) + broadcast_timeslice(capa, level=timeslice_level) + * distribute_timeslice(ftechs.fixed_outputs, level=timeslice_level) + * broadcast_timeslice(ftechs.minimum_service_factor, level=timeslice_level) ) return result.where(is_enduse(result.comm_usage), 0) @@ -492,12 +523,12 @@ def minimum_production( def capacity_to_service_demand( demand: xr.DataArray, technologies: xr.Dataset, + timeslice_level: Optional[str] = None, ) -> xr.DataArray: """Minimum capacity required to fulfill the demand.""" - from muse.timeslices import broadcast_timeslice, distribute_timeslice - timeslice_outputs = distribute_timeslice( - technologies.fixed_outputs.sel(commodity=demand.commodity) - ) * broadcast_timeslice(technologies.utilization_factor) + technologies.fixed_outputs.sel(commodity=demand.commodity), + level=timeslice_level, + ) * broadcast_timeslice(technologies.utilization_factor, level=timeslice_level) capa_to_service_demand = demand / timeslice_outputs return capa_to_service_demand.max(("commodity", "timeslice")) diff --git a/src/muse/readers/csv.py b/src/muse/readers/csv.py index 600715b7b..e62f126ec 100644 --- a/src/muse/readers/csv.py +++ b/src/muse/readers/csv.py @@ -136,7 +136,7 @@ def to_agent_share(name): def read_technodata_timeslices(filename: Union[str, Path]) -> xr.Dataset: from muse.readers import camel_to_snake - from muse.timeslices import TIMESLICE + from muse.timeslices import sort_timeslices csv = pd.read_csv(filename, float_precision="high", low_memory=False) csv = csv.rename(columns=camel_to_snake) @@ -170,9 +170,7 @@ def read_technodata_timeslices(filename: Union[str, Path]) -> xr.Dataset: if item not in ["technology", "region", "year"] ] result = result.stack(timeslice=timeslice_levels) - result = result.sel(timeslice=TIMESLICE.timeslice) - # sorts timeslices into the correct order - return result + return sort_timeslices(result) def read_io_technodata(filename: Union[str, Path]) -> xr.Dataset: @@ -617,8 +615,8 @@ def read_initial_market( getLogger(__name__).info("Base year import not provided. Set to zero.") base_year_import = xr.zeros_like(projections) - base_year_export = distribute_timeslice(base_year_export) - base_year_import = distribute_timeslice(base_year_import) + base_year_export = distribute_timeslice(base_year_export, level=None) + base_year_import = distribute_timeslice(base_year_import, level=None) base_year_export.name = "exports" base_year_import.name = "imports" diff --git a/src/muse/sectors/abstract.py b/src/muse/sectors/abstract.py index 231b444c4..4fe560531 100644 --- a/src/muse/sectors/abstract.py +++ b/src/muse/sectors/abstract.py @@ -29,7 +29,6 @@ def factory(cls, name: str, settings: Any) -> AbstractSector: @abstractmethod def next(self, mca_market: Dataset) -> Dataset: """Advance sector by one time period.""" - pass def __repr__(self): return f"<{self.name.title()} sector - object at {hex(id(self))}>" diff --git a/src/muse/sectors/sector.py b/src/muse/sectors/sector.py index 7037eeca4..26e721606 100644 --- a/src/muse/sectors/sector.py +++ b/src/muse/sectors/sector.py @@ -14,6 +14,7 @@ from muse.sectors.abstract import AbstractSector from muse.sectors.register import register_sector from muse.sectors.subsector import Subsector +from muse.timeslices import compress_timeslice, expand_timeslice, get_level @register_sector(name="default") @@ -48,6 +49,7 @@ def factory(cls, name: str, settings: Any) -> Sector: regions=settings.regions, current_year=int(min(settings.time_framework)), name=subsec_name, + timeslice_level=sector_settings.get("timeslice_level", None), ) for subsec_name, subsec_settings in sector_settings.pop("subsectors") ._asdict() @@ -103,25 +105,38 @@ def __init__( interpolation: str = "linear", outputs: Callable | None = None, supply_prod: PRODUCTION_SIGNATURE | None = None, + timeslice_level: str | None = None, ): from muse.interactions import factory as interaction_factory from muse.outputs.sector import factory as ofactory from muse.production import maximum_production + from muse.timeslices import TIMESLICE - self.name: str = name """Name of the sector.""" - self.subsectors: Sequence[Subsector] = list(subsectors) + self.name: str = name + + """Timeslice level for the sector (e.g. "month").""" + self.timeslice_level = timeslice_level or get_level(TIMESLICE) + """Subsectors controlled by this object.""" - self.technologies: xr.Dataset = technologies + self.subsectors: Sequence[Subsector] = list(subsectors) + """Parameters describing the sector's technologies.""" + self.technologies: xr.Dataset = technologies + if "timeslice" in self.technologies.dims: + if not get_level(self.technologies) == self.timeslice_level: + raise ValueError( + f"Technodata for {self.name} sector does not match " + "the specified timeslice level for that sector " + f"({self.timeslice_level})" + ) + + """Interpolation method and arguments when computing years.""" self.interpolation: Mapping[str, Any] = { "method": interpolation, "kwargs": {"fill_value": "extrapolate"}, } - """Interpolation method and arguments when computing years.""" - if interactions is None: - interactions = interaction_factory() - self.interactions = interactions + """Interactions between agents. Called right before computing new investments, this function should manage any @@ -138,20 +153,22 @@ def __init__( :py:mod:`muse.interactions` contains MUSE's base interactions """ + self.interactions = interactions or interaction_factory() + + """A function for outputting data for post-mortem analysis.""" self.outputs: Callable = ( cast(Callable, ofactory()) if outputs is None else outputs ) - """A function for outputting data for post-mortem analysis.""" - self.supply_prod = ( - supply_prod if supply_prod is not None else maximum_production - ) - """ Computes production as used to return the supply to the MCA. + + """Computes production as used to return the supply to the MCA. It can be anything registered with :py:func:`@register_production`. """ - self.output_data: xr.Dataset + self.supply_prod = supply_prod or maximum_production + """Full supply, consumption and costs data for the most recent year.""" + self.output_data: xr.Dataset @property def forecast(self): @@ -188,6 +205,9 @@ def group_assets(x: xr.DataArray) -> xr.DataArray: # Agent interactions self.interactions(list(self.agents)) + # Convert market to sector timeslicing + mca_market = self.convert_to_sector_timeslicing(mca_market) + # Select appropriate data from the market market = mca_market.sel( commodity=self.technologies.commodity, region=self.technologies.region @@ -258,7 +278,9 @@ def group_assets(x: xr.DataArray) -> xr.DataArray: commodity=result.commodity ) result.set_coords("comm_usage") - return result + + # Convert result to global timeslicing scheme + return self.convert_to_global_timeslicing(result) def save_outputs(self) -> None: """Calls the outputs function with the current output data.""" @@ -276,24 +298,51 @@ def market_variables(self, market: xr.Dataset, technologies: xr.Dataset) -> Any: # Calculate supply supply = self.supply_prod( - market=market, capacity=capacity, technologies=technologies + market=market, + capacity=capacity, + technologies=technologies, + timeslice_level=self.timeslice_level, ) # Calculate consumption - consume = consumption(technologies, supply, market.prices) + consume = consumption( + technologies, supply, market.prices, timeslice_level=self.timeslice_level + ) # Calculate commodity prices technodata = cast(xr.Dataset, broadcast_techs(technologies, supply)) costs = supply_cost( supply.where(~is_pollutant(supply.comm_usage), 0), annual_levelized_cost_of_energy( - prices=market.prices.sel(region=supply.region), technologies=technodata + prices=market.prices.sel(region=supply.region), + technologies=technodata, + timeslice_level=self.timeslice_level, ), asset_dim="asset", ) return supply, consume, costs + def convert_to_sector_timeslicing(self, market: xr.Dataset) -> xr.Dataset: + """Converts market data to sector timeslicing.""" + supply = compress_timeslice( + market["supply"], level=self.timeslice_level, operation="sum" + ) + consumption = compress_timeslice( + market["consumption"], level=self.timeslice_level, operation="sum" + ) + prices = compress_timeslice( + market["prices"], level=self.timeslice_level, operation="mean" + ) + return xr.Dataset(dict(supply=supply, consumption=consumption, prices=prices)) + + def convert_to_global_timeslicing(self, market: xr.Dataset) -> xr.Dataset: + """Converts market data to global timeslicing.""" + supply = expand_timeslice(market["supply"], operation="distribute") + consumption = expand_timeslice(market["consumption"], operation="distribute") + costs = expand_timeslice(market["costs"], operation="broadcast") + return xr.Dataset(dict(supply=supply, consumption=consumption, costs=costs)) + @property def capacity(self) -> xr.DataArray: """Aggregates capacity across agents. diff --git a/src/muse/sectors/subsector.py b/src/muse/sectors/subsector.py index 03798af0f..381a49b2f 100644 --- a/src/muse/sectors/subsector.py +++ b/src/muse/sectors/subsector.py @@ -26,6 +26,7 @@ def __init__( name: str = "subsector", forecast: int = 5, expand_market_prices: bool = False, + timeslice_level: str | None = None, ): from muse import constraints as cs from muse import demand_share as ds @@ -39,6 +40,7 @@ def __init__( self.forecast = forecast self.name = name self.expand_market_prices = expand_market_prices + self.timeslice_level = timeslice_level """Whether to expand prices to include destination region. If ``True``, the input market prices are expanded of the missing "dst_region" @@ -83,6 +85,7 @@ def aggregate_lp( technologies, current_year=current_year, forecast=self.forecast, + timeslice_level=self.timeslice_level, ) if "dst_region" in demands.dims: @@ -122,6 +125,7 @@ def factory( regions: Sequence[str] | None = None, current_year: int | None = None, name: str = "subsector", + timeslice_level: str | None = None, ) -> Subsector: from muse import constraints as cs from muse import demand_share as ds @@ -146,6 +150,7 @@ def factory( investment=getattr(settings, "lpsolver", "adhoc"), forecast=getattr(settings, "forecast", 5), constraints=getattr(settings, "constraints", ()), + timeslice_level=timeslice_level, ) # technologies can have nans where a commodity # does not apply to a technology at all @@ -199,6 +204,7 @@ def factory( forecast=forecast, name=name, expand_market_prices=expand_market_prices, + timeslice_level=timeslice_level, ) diff --git a/src/muse/timeslices.py b/src/muse/timeslices.py index d22ab791f..12ca36c3d 100644 --- a/src/muse/timeslices.py +++ b/src/muse/timeslices.py @@ -6,97 +6,73 @@ "distribute_timeslice", "drop_timeslice", "setup_module", + "compress_timeslice", + "expand_timeslice", + "get_level", + "sort_timeslices", + "timeslice_max", ] from collections.abc import Mapping, Sequence -from typing import Union +from typing import Optional, Union -from numpy import ndarray -from pandas import MultiIndex +import numpy as np +import pandas as pd from xarray import DataArray TIMESLICE: DataArray = None # type: ignore -"""Array with the finest timeslice.""" -TRANSFORMS: dict[tuple, ndarray] = None # type: ignore -"""Transforms from each aggregate to the finest timeslice.""" def read_timeslices( settings: Union[Mapping, str], level_names: Sequence[str] = ("month", "day", "hour"), - name: str = "timeslice", ) -> DataArray: - '''Reads reference timeslice from toml like input. - - Arguments: - settings: A dictionary of nested dictionaries or a string that toml will - interpret as such. The nesting specifies different levels of the timeslice. - If a dictionary and it contains "timeslices" key, then the associated value - is used as the root dictionary. Ultimately, the most nested values should be - relative weights for each slice in the timeslice, e.g. the corresponding - number of hours. - level_names: Hints indicating the names of each level. Can also be given a - "level_names" key in ``settings``. - name: name of the reference array - - Return: - A ``DataArray`` with dimension *timeslice* and values representing the relative - weight of each timeslice. - - Example: - >>> from muse.timeslices import read_timeslices - >>> read_timeslices( - ... """ - ... [timeslices] - ... spring.weekday = 5 - ... spring.weekend = 2 - ... autumn.weekday = 5 - ... autumn.weekend = 2 - ... winter.weekday = 5 - ... winter.weekend = 2 - ... summer.weekday = 5 - ... summer.weekend = 2 - ... level_names = ["season", "week"] - ... """ - ... ) # doctest: +SKIP - Size: 32B - array([5, 2, 5, 2, 5, 2, 5, 2]) - Coordinates: - * timeslice (timeslice) object 64B MultiIndex - * season (timeslice) object 64B 'spring' 'spring' ... 'summer' 'summer' - * week (timeslice) object 64B 'weekday' 'weekend' ... 'weekend' - ''' from functools import reduce + from logging import getLogger from toml import loads + # Read timeslice settings if isinstance(settings, str): settings = loads(settings) settings = dict(**settings.get("timeslices", settings)) + + # Legacy: warn user about deprecation of "aggregates" feature (#550) + if "aggregates" in settings: + msg = ( + "Timeslice aggregation has been deprecated since v1.3.0. Please see the " + "release notes for that version for more information." + ) + getLogger(__name__).warning(msg) + settings.pop("aggregates") + + # Extract level names if "level_names" in settings: level_names = settings.pop("level_names") - settings.pop("aggregates", {}) - # figures out levels - levels: list[tuple] = [(level,) for level in settings] + # Extract timeslice levels and lengths ts = list(settings.values()) + levels: list[tuple] = [(level,) for level in settings] while all(isinstance(v, Mapping) for v in ts): levels = [(*previous, b) for previous, a in zip(levels, ts) for b in a] ts = reduce(list.__add__, (list(u.values()) for u in ts), []) + # Prepare multiindex nln = min(len(levels[0]), len(level_names)) level_names = ( list(level_names[:nln]) + [str(i) for i in range(len(levels[0]))][nln:] ) - indices = MultiIndex.from_tuples(levels, names=level_names) + indices = pd.MultiIndex.from_tuples(levels, names=level_names) + # Make sure names from different levels don't overlap if any( reduce(set.union, indices.levels[:i], set()).intersection(indices.levels[i]) for i in range(1, indices.nlevels) ): raise ValueError("Names from different levels should not overlap.") - return DataArray(ts, coords={"timeslice": indices}, dims=name) + # Create DataArray + return DataArray(ts, coords={"timeslice": indices}, dims="timeslice") def setup_module(settings: Union[str, Mapping]): @@ -105,29 +81,199 @@ def setup_module(settings: Union[str, Mapping]): TIMESLICE = read_timeslices(settings) -def broadcast_timeslice(x, ts=None): +def broadcast_timeslice( + data: DataArray, ts: Optional[DataArray] = None, level: Optional[str] = None +) -> DataArray: + """Convert a non-timesliced array to a timesliced array by broadcasting. + + If data is already timesliced in the appropriate scheme, it will be returned + unchanged. + + Args: + data: Array to broadcast. + ts: Dataarray with timeslice lengths. If None, defaults to the global timeslice. + level: Level to broadcast to. If None, use the finest level of ts. + + """ from xarray import Coordinates if ts is None: ts = TIMESLICE - # If x already has timeslices, check that it is matches the reference timeslice. - if "timeslice" in x.dims: - if x.timeslice.reset_coords(drop=True).equals(ts.timeslice): - return x - raise ValueError("x has incompatible timeslicing.") + if level is not None: + ts = compress_timeslice(ts, ts=ts, level=level, operation="sum") + + # If data already has timeslices, check that it matches the reference timeslice. + if "timeslice" in data.dims: + if data.timeslice.reset_coords(drop=True).equals(ts.timeslice): + return data + raise ValueError( + "Data is already timesliced, but does not match the reference." + ) mindex_coords = Coordinates.from_pandas_multiindex(ts.timeslice, "timeslice") - extensive = x.expand_dims(timeslice=ts["timeslice"]).assign_coords(mindex_coords) - return extensive + broadcasted = data.expand_dims(timeslice=ts["timeslice"]).assign_coords( + mindex_coords + ) + return broadcasted + + +def distribute_timeslice( + data: DataArray, ts: Optional[DataArray] = None, level=None +) -> DataArray: + """Convert a non-timesliced array to a timesliced array by distribution. + + If data is already timesliced in the appropriate scheme, it will be returned + unchanged. + + Args: + data: Array to distribute. + ts: Dataarray with timeslice lengths. If None, defaults to the global timeslice. + level: Level to distribute to. If None, use the finest level of ts. + + """ + if ts is None: + ts = TIMESLICE + + if level is not None: + ts = compress_timeslice(ts, ts=ts, level=level, operation="sum") + + # If data already has timeslices, check that it matches the reference timeslice. + if "timeslice" in data.dims: + if data.timeslice.reset_coords(drop=True).equals(ts.timeslice): + return data + raise ValueError( + "Data is already timesliced, but does not match the reference." + ) + + broadcasted = broadcast_timeslice(data, ts=ts) + timeslice_fractions = ts / broadcast_timeslice(ts.sum(), ts=ts) + return broadcasted * timeslice_fractions + + +def compress_timeslice( + data: DataArray, + ts: Optional[DataArray] = None, + level: Optional[str] = None, + operation: str = "sum", +) -> DataArray: + """Convert a fully timesliced array to a coarser level. + + The operation can be either 'sum', or 'mean': + - sum: sum values at each compressed timeslice level + - mean: take a weighted average of values at each compressed timeslice level, + according to timeslice length + + Args: + data: Timesliced array to compress. Must have the same timeslicing as ts. + ts: Dataarray with timeslice lengths. If None, defaults to the global timeslice. + level: Level to compress to. If None, don't compress. + operation: Operation to perform ("sum" or "mean"). Defaults to "sum". + + """ + if ts is None: + ts = TIMESLICE + + # Raise error if data is not timesliced appropriately + if "timeslice" not in data.dims: + raise ValueError("Data must have a 'timeslice' dimension.") + if not data.timeslice.reset_coords(drop=True).equals(ts.timeslice): + raise ValueError("Data has incompatible timeslicing with reference.") + + # If level is not specified, don't compress + if level is None: + return data + + # level must be a valid timeslice level + x_levels = data.timeslice.to_index().names + if level not in x_levels: + raise ValueError(f"Unknown level: {level}. Must be one of {x_levels}.") + + # Return data unchanged if already at the desired level + if get_level(data) == level: + return data + + # Prepare mask + idx = x_levels.index(level) + kept_levels, compressed_levels = x_levels[: idx + 1], x_levels[idx + 1 :] + mask = ts.unstack(dim="timeslice") + if operation == "sum": + mask = mask.where(np.isnan(mask), 1) + elif operation == "mean": + mask = mask / mask.sum(compressed_levels) + else: + raise ValueError(f"Unknown operation: {operation}. Must be 'sum' or 'mean'.") + + # Perform the operation + result = ( + (data.unstack(dim="timeslice") * mask) + .sum(compressed_levels) + .stack(timeslice=kept_levels) + ) + return sort_timeslices(result, ts) -def distribute_timeslice(x, ts=None): +def expand_timeslice( + data: DataArray, ts: Optional[DataArray] = None, operation: str = "distribute" +) -> DataArray: + """Convert a timesliced array to a finer level. + + The operation can be either 'distribute', or 'broadcast' + - distribute: distribute values over the new timeslice level(s) according to + timeslice lengths, such that the sum of the output over all timeslices is equal + to the sum of the input + - broadcast: broadcast values across over the new timeslice level(s) + + Args: + data: Timesliced array to expand. + ts: Dataarray with timeslice lengths. If None, defaults to the global timeslice. + operation: Operation to perform ("distribute" or "broadcast"). + Defaults to "distribute". + + """ if ts is None: ts = TIMESLICE - extensive = broadcast_timeslice(x, ts) - return extensive * (ts / broadcast_timeslice(ts.sum())) + # Raise error if data is not timesliced + if "timeslice" not in data.dims: + raise ValueError("Data must have a 'timeslice' dimension.") + + # Get level names + ts_levels = ts.timeslice.to_index().names + x_levels = data.timeslice.to_index().names + + # Raise error if x_levels is not a subset of ts_levels + if not set(x_levels).issubset(ts_levels): + raise ValueError( + "Data has incompatible timeslicing with reference. " + f"Timeslice levels of data ({x_levels}) must be a subset of ts " + f"({ts_levels})." + ) + + # Return data unchanged if already at the desired level + finest_level = get_level(ts) + current_level = get_level(data) + if current_level == finest_level: + return data + + # Prepare mask + mask = ts.unstack(dim="timeslice") + if operation == "broadcast": + mask = mask.where(np.isnan(mask), 1) + elif operation == "distribute": + mask = mask / mask.sum(ts_levels[ts_levels.index(current_level) + 1 :]) + else: + raise ValueError( + f"Unknown operation: {operation}. Must be 'distribute' or 'broadcast'." + ) + + # Perform the operation + result = ( + (data.unstack(dim="timeslice") * mask) + .stack(timeslice=ts_levels) + .dropna("timeslice") + ) + return sort_timeslices(result, ts) def drop_timeslice(data: DataArray) -> DataArray: @@ -139,3 +285,51 @@ def drop_timeslice(data: DataArray) -> DataArray: return data return data.drop_vars(data.timeslice.indexes) + + +def get_level(data: DataArray) -> str: + """Get the timeslice level of a DataArray.""" + if "timeslice" not in data.dims: + raise ValueError("Data does not have a 'timeslice' dimension.") + return data.timeslice.to_index().names[-1] + + +def sort_timeslices(data: DataArray, ts: Optional[DataArray] = None) -> DataArray: + """Sorts the timeslices of a DataArray according to a reference timeslice. + + This will only sort timeslices to match the reference if the data is at the same + timeslice level as the reference. Otherwise, it will sort timeslices in alphabetical + order. + + Args: + data: Timesliced DataArray to sort. + ts: Dataarray with reference timeslices in the appropriate order + """ + if ts is None: + ts = TIMESLICE + + # If data is at the same timeslice level as ts, sort timeslices according to ts + if get_level(data) == get_level(ts): + return data.sel(timeslice=ts.timeslice) + # Otherwise, sort timeslices in alphabetical order + return data.sortby("timeslice") + + +def timeslice_max(data: DataArray, ts: Optional[DataArray] = None) -> DataArray: + """Find the max value over the timeslice dimension, normalized for timeslice length. + + This first annualizes the value in each timeslice by dividing by the fraction of the + year that the timeslice occupies, then takes the maximum value + + Args: + data: Timesliced DataArray to find the max of. + ts: Dataarray with timeslice lengths. If None, defaults to the global timeslice. + """ + if ts is None: + ts = TIMESLICE + + timeslice_level = get_level(data) + timeslice_fractions = compress_timeslice( + ts, ts=ts, level=timeslice_level, operation="sum" + ) / broadcast_timeslice(ts.sum(), ts=ts, level=timeslice_level) + return (data / timeslice_fractions).max("timeslice") diff --git a/tests/conftest.py b/tests/conftest.py index 3c223b50a..95b9b6753 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,14 @@ from collections.abc import Mapping, Sequence from pathlib import Path from typing import Callable, Optional +from unittest.mock import patch import numpy as np from pandas import DataFrame from pytest import fixture from xarray import DataArray, Dataset +from muse.__main__ import patched_broadcast_compat_data from muse.agents import Agent @@ -19,6 +21,14 @@ def logger(): return logger +@fixture(autouse=True) +def patch_broadcast_compat_data(): + with patch( + "xarray.core.variable._broadcast_compat_data", patched_broadcast_compat_data + ): + yield + + def compare_df( expected: DataFrame, actual: DataFrame, @@ -133,13 +143,6 @@ def default_timeslice_globals(): summer.weekend.afternoon = 150 summer.weekend.evening = 150 level_names = ["month", "day", "hour"] - - [timeslices.aggregates] - all-day = [ - "night", "morning", "afternoon", "early-peak", "late-peak", "evening", "night" - ] - all-week = ["weekday", "weekend"] - all-year = ["winter", "summer", "spring-autumn"] """ setup_module(default_timeslices) diff --git a/tests/test_timeslices.py b/tests/test_timeslices.py index d1dd4e72f..0a3ae55a2 100644 --- a/tests/test_timeslices.py +++ b/tests/test_timeslices.py @@ -1,57 +1,17 @@ """Test timeslice utilities.""" -from pytest import fixture +import numpy as np +from pytest import approx, fixture, raises from xarray import DataArray @fixture -def toml(): - return """ - ["timeslices"] - winter.weekday.day = 10 - winter.weekday.night = 5 - winter.weekend.day = 2 - winter.weekend.night = 2 - winter.weekend.dusk = 1 - summer.weekday.day = 5 - summer.weekday.night = 5 - summer.weekend.day = 2 - summer.weekend.night = 2 - summer.weekend.dusk = 1 - level_names = ["semester", "week", "day"] - [timeslices.aggregates] - allday = ["day", "night"] - """ +def non_timesliced_dataarray(): + return DataArray([1, 2, 3], dims=["x"]) @fixture -def reference(toml): - from muse.timeslices import read_timeslices - - return read_timeslices(toml) - - -@fixture -def timeslice_dataarray(reference): - from pandas import MultiIndex - - return DataArray( - [1, 2, 3], - coords={ - "timeslice": MultiIndex.from_tuples( - [ - ("winter", "weekday", "allday"), - ("winter", "weekend", "dusk"), - ("summer", "weekend", "night"), - ], - names=reference.get_index("timeslice").names, - ) - }, - dims="timeslice", - ) - - -def test_read_timeslices(): +def timeslice(): from toml import loads from muse.timeslices import read_timeslices @@ -83,6 +43,7 @@ def test_read_timeslices(): ts = read_timeslices(inputs) assert isinstance(ts, DataArray) assert "timeslice" in ts.coords + return ts def test_no_overlap(): @@ -104,15 +65,224 @@ def test_no_overlap(): ) -def test_drop_timeslice(timeslice_dataarray): - from muse.timeslices import drop_timeslice +def test_drop_timeslice(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, drop_timeslice - dropped = drop_timeslice(timeslice_dataarray) - coords_to_check = {"timeslice", "semester", "week", "day"} - assert coords_to_check.issubset(timeslice_dataarray.coords) + # Test on array with timeslice data + timesliced_dataarray = broadcast_timeslice(non_timesliced_dataarray, ts=timeslice) + dropped = drop_timeslice(timesliced_dataarray) + coords_to_check = {"timeslice", "month", "day", "hour"} + assert coords_to_check.issubset(timesliced_dataarray.coords) assert not coords_to_check.intersection(dropped.coords) # Test on arrays without timeslice data - data_without_timeslice = DataArray([1, 2, 3], dims=["x"]) - assert drop_timeslice(data_without_timeslice).equals(data_without_timeslice) + assert drop_timeslice(non_timesliced_dataarray).equals(non_timesliced_dataarray) assert drop_timeslice(dropped).equals(dropped) + + +def test_broadcast_timeslice(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, compress_timeslice + + # Broadcast array to different levels of granularity + for level in ["month", "day", "hour"]: + out = broadcast_timeslice(non_timesliced_dataarray, ts=timeslice, level=level) + target_timeslices = compress_timeslice( + timeslice, ts=timeslice, level=level, operation="sum" + ) + + # Check that timeslicing in output matches the global scheme + assert out.timeslice.equals(target_timeslices.timeslice) + + # Check that all timeslices in the output are equal to each other + assert (out.diff(dim="timeslice") == 0).all() + + # Check that all values in the output are equal to the input + assert all( + (out.isel(timeslice=i) == non_timesliced_dataarray).all() + for i in range(out.sizes["timeslice"]) + ) + + # Calling on a fully timesliced array: the input should be returned unchanged + out2 = broadcast_timeslice(out, ts=timeslice) + assert out2.equals(out) + + # Calling on an array with inappropriate timeslicing: ValueError should be raised + with raises(ValueError): + broadcast_timeslice( + compress_timeslice(out, ts=timeslice, level="day"), ts=timeslice + ) + + +def test_distribute_timeslice(non_timesliced_dataarray, timeslice): + from muse.timeslices import ( + broadcast_timeslice, + compress_timeslice, + distribute_timeslice, + ) + + # Distribute array to different levels of granularity + for level in ["month", "day", "hour"]: + out = distribute_timeslice(non_timesliced_dataarray, ts=timeslice, level=level) + target_timeslices = compress_timeslice( + timeslice, ts=timeslice, level=level, operation="sum" + ) + + # Check that timeslicing in output matches the global scheme + assert out.timeslice.equals(target_timeslices.timeslice) + + # Check that all values are proportional to timeslice lengths + out_proportions = out / broadcast_timeslice( + out.sum("timeslice"), ts=timeslice, level=level + ) + ts_proportions = target_timeslices / broadcast_timeslice( + target_timeslices.sum("timeslice"), ts=timeslice, level=level + ) + assert abs(out_proportions - ts_proportions).max() < 1e-6 + + # Check that the sum across timeslices is equal to the input + assert (out.sum("timeslice") == approx(non_timesliced_dataarray)).all() + + # Calling on a fully timesliced array: the input should be returned unchanged + out2 = distribute_timeslice(out, ts=timeslice) + assert out2.equals(out) + + # Calling on an array with inappropraite timeslicing: ValueError should be raised + with raises(ValueError): + distribute_timeslice( + compress_timeslice(out, ts=timeslice, level="day"), ts=timeslice + ) + + +def test_compress_timeslice(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, compress_timeslice, get_level + + # Create timesliced dataarray for compressing + timesliced_dataarray = broadcast_timeslice(non_timesliced_dataarray, ts=timeslice) + + # Compress array to different levels of granularity + for level in ["month", "day", "hour"]: + # Sum operation + out = compress_timeslice( + timesliced_dataarray, ts=timeslice, operation="sum", level=level + ) + assert get_level(out) == level + assert ( + out.sum("timeslice") == approx(timesliced_dataarray.sum("timeslice")) + ).all() + + # Mean operation + out = compress_timeslice( + timesliced_dataarray, ts=timeslice, operation="mean", level=level + ) + assert get_level(out) == level + assert ( + out.mean("timeslice") == approx(timesliced_dataarray.mean("timeslice")) + ).all() # NB in general this should be a weighted mean, but this works here + # because the data is equal in every timeslice + + # Calling without specifying a level: the input should be returned unchanged + out = compress_timeslice(timesliced_dataarray, ts=timeslice) + assert out.equals(timesliced_dataarray) + + # Calling with an invalid level: ValueError should be raised + with raises(ValueError): + compress_timeslice(timesliced_dataarray, ts=timeslice, level="invalid") + + # Calling with an invalid operation: ValueError should be raised + with raises(ValueError): + compress_timeslice( + timesliced_dataarray, ts=timeslice, level="day", operation="invalid" + ) + + +def test_expand_timeslice(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, expand_timeslice + + # Different starting points for expansion + for level in ["month", "day", "hour"]: + timesliced_dataarray = broadcast_timeslice( + non_timesliced_dataarray, ts=timeslice, level=level + ) + + # Broadcast operation + out = expand_timeslice( + timesliced_dataarray, ts=timeslice, operation="broadcast" + ) + assert out.timeslice.equals(timeslice.timeslice) + assert ( + out.mean("timeslice") == approx(timesliced_dataarray.mean("timeslice")) + ).all() + + # Distribute operation + out = expand_timeslice( + timesliced_dataarray, ts=timeslice, operation="distribute" + ) + assert out.timeslice.equals(timeslice.timeslice) + assert ( + out.sum("timeslice") == approx(timesliced_dataarray.sum("timeslice")) + ).all() + + # Calling on an already expanded array: the input should be returned unchanged + out2 = expand_timeslice(out, ts=timeslice) + assert out.equals(out2) + + # Calling with an invalid operation: ValueError should be raised + with raises(ValueError): + timesliced_dataarray = broadcast_timeslice( + non_timesliced_dataarray, ts=timeslice, level="month" + ) + expand_timeslice(timesliced_dataarray, ts=timeslice, operation="invalid") + + +def test_get_level(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, get_level + + for level in ["month", "day", "hour"]: + timesliced_dataarray = broadcast_timeslice( + non_timesliced_dataarray, ts=timeslice, level=level + ) + assert get_level(timesliced_dataarray) == level + + # Should raise error with non-timesliced array + with raises(ValueError): + get_level(non_timesliced_dataarray) + + +def test_sort_timeslices(non_timesliced_dataarray, timeslice): + from muse.timeslices import broadcast_timeslice, sort_timeslices + + # Finest timeslice level -> should match ordering of `timeslice` + timesliced_dataarray = broadcast_timeslice( + non_timesliced_dataarray, ts=timeslice, level="hour" + ) + sorted = sort_timeslices(timesliced_dataarray, timeslice) + assert sorted.timeslice.equals(timeslice.timeslice) + assert not sorted.timeslice.equals( + timesliced_dataarray.sortby("timeslice").timeslice + ) # but could be true if the timeslices in `timeslice` are in alphabetical order + + # Coarser timeslice level -> should match xarray sortby + timesliced_dataarray = broadcast_timeslice( + non_timesliced_dataarray, ts=timeslice, level="month" + ) + sorted = sort_timeslices(timesliced_dataarray, timeslice) + assert sorted.timeslice.equals(timesliced_dataarray.sortby("timeslice").timeslice) + + +def test_timeslice_max(non_timesliced_dataarray): + from muse.timeslices import broadcast_timeslice, read_timeslices, timeslice_max + + # With two equal timeslice lengths, this should be equivalent to max * 2 + ts = read_timeslices( + """ + [timeslices] + winter.weekday.night = 396 + winter.weekday.morning = 396 + """ + ) + timesliced_dataarray = broadcast_timeslice(non_timesliced_dataarray, ts=ts) + timesliced_dataarray = timesliced_dataarray + np.random.rand( + *timesliced_dataarray.shape + ) + timeslice_max_dataarray = timeslice_max(timesliced_dataarray, ts=ts) + assert timeslice_max_dataarray.equals(timesliced_dataarray.max("timeslice") * 2)