Skip to content
295 changes: 295 additions & 0 deletions tests/test_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from unittest.mock import patch

import numpy as np
import toml
import xarray as xr
from pytest import fixture, mark, raises
Expand Down Expand Up @@ -391,6 +392,10 @@ def test_read_existing_trade(tmp_path):

assert isinstance(data, xr.DataArray)
assert set(data.dims) == {"year", "technology", "dst_region", "region"}
assert list(data.coords["year"].values) == [2010, 2020, 2030, 2040, 2050]
assert list(data.coords["technology"].values) == ["gassupply1"]
assert list(data.coords["dst_region"].values) == ["R1", "R2"]
assert list(data.coords["region"].values) == ["R1", "R2"]


def test_read_trade_technodata(tmp_path):
Expand All @@ -412,6 +417,296 @@ def test_read_trade_technodata(tmp_path):
"max_capacity_growth",
"total_capacity_limit",
}
assert all(val == np.float64 for val in data.dtypes.values())
assert list(data.coords["dst_region"].values) == ["R1", "R2"]
assert list(data.coords["technology"].values) == ["gassupply1"]
assert list(data.coords["region"].values) == ["R1", "R2", "R3"]
assert all(var.coords.equals(data.coords) for var in data.data_vars.values())


@fixture
def default_model(tmp_path):
from muse.examples import copy_model

copy_model("default", tmp_path)
return tmp_path / "model"


def test_read_technodictionary(default_model):
from muse.readers.csv import read_technodictionary

path = default_model / "technodata" / "residential" / "Technodata.csv"
data = read_technodictionary(path)
assert isinstance(data, xr.Dataset)
assert set(data.dims) == {"technology", "region"}

assert dict(data.dtypes) == dict(
level=np.dtype("O"),
cap_par=np.dtype("float64"),
cap_exp=np.dtype("int64"),
fix_par=np.dtype("int64"),
fix_exp=np.dtype("int64"),
var_par=np.dtype("int64"),
interest_rate=np.dtype("float64"),
type=np.dtype("O"),
fuel=np.dtype("<U11"),
enduse=np.dtype("<U4"),
agent_share_2=np.dtype("int64"),
tech_type=np.dtype("<U6"),
efficiency=np.dtype("int64"),
max_capacity_addition=np.dtype("int64"),
max_capacity_growth=np.dtype("float64"),
scaling_size=np.dtype("float64"),
technical_life=np.dtype("int64"),
total_capacity_limit=np.dtype("int64"),
utilization_factor=np.dtype("int64"),
var_exp=np.dtype("int64"),
)
assert list(data.coords["technology"].values) == ["gasboiler", "heatpump"]
assert list(data.coords["region"].values) == ["R1"]

for var in data.data_vars:
if var in ("fuel", "enduse", "tech_type"):
assert list(data.data_vars[var].coords) == ["technology"]
else:
assert data.data_vars[var].coords.equals(data.coords)


def test_read_technodata_timeslices(tmp_path):
from muse.examples import copy_model
from muse.readers.csv import read_technodata_timeslices

copy_model("default_timeslice", tmp_path)
path = tmp_path / "model" / "technodata" / "power" / "TechnodataTimeslices.csv"
data = read_technodata_timeslices(path)

assert isinstance(data, xr.Dataset)
assert set(data.dims) == {"technology", "region", "year", "timeslice"}
assert dict(data.dtypes) == dict(
utilization_factor=np.int64,
minimum_service_factor=np.int64,
)
assert list(data.coords["technology"].values) == ["gasCCGT", "windturbine"]
assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["year"].values) == [2020]
month_values = ["all-year"] * 6
day_values = ["all-week"] * 6
hour_values = [
"afternoon",
"early-peak",
"evening",
"late-peak",
"morning",
"night",
]

assert list(data.coords["timeslice"].values) == list(
zip(month_values, day_values, hour_values)
)
assert list(data.coords["month"]) == month_values
assert list(data.coords["day"]) == day_values
assert list(data.coords["hour"]) == hour_values


def test_read_io_technodata(default_model):
from muse.readers.csv import read_io_technodata

path = default_model / "technodata" / "residential" / "CommOut.csv"
data = read_io_technodata(path)

assert isinstance(data, xr.Dataset)
assert set(data.dims) == {"technology", "region", "year", "commodity"}
assert dict(data.dtypes) == dict(
fixed=np.float64, flexible=np.float64, commodity_units=np.dtype("O")
)
assert list(data.coords["technology"].values) == ["gasboiler", "heatpump"]
assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["year"].values) == [2020]
assert list(data.coords["commodity"].values) == [
"electricity",
"gas",
"heat",
"CO2f",
"wind",
]

assert data.data_vars["fixed"].coords.equals(data.coords)
assert data.data_vars["flexible"].coords.equals(data.coords)
assert list(data.data_vars["commodity_units"].coords) == ["commodity"]


def test_read_initial_assets(default_model):
from muse.readers.csv import read_initial_assets

path = default_model / "technodata" / "residential" / "ExistingCapacity.csv"
data = read_initial_assets(path)

assert isinstance(data, xr.DataArray)
assert set(data.dims) == {"region", "asset", "year"}
assert data.dtype == np.int64

assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["technology"].values) == ["gasboiler", "heatpump"]
assert list(data.coords["installed"].values) == [2020, 2020]
assert list(data.coords["year"].values) == list(range(2020, 2055, 5))


def test_global_commodities(default_model):
from muse.readers.csv import read_global_commodities

path = default_model / "input" / "GlobalCommodities.csv"
data = read_global_commodities(path)

assert isinstance(data, xr.Dataset)
assert set(data.dims) == {"commodity"}
assert dict(data.dtypes) == dict(
comm_name=np.dtype("O"),
comm_type=np.dtype("O"),
emmission_factor=np.float64,
heat_rate=np.int64,
unit=np.dtype("O"),
)

assert list(data.coords["commodity"].values) == [
"electricity",
"gas",
"heat",
"wind",
"CO2f",
]
assert all(var.coords.equals(data.coords) for var in data.data_vars.values())


def test_read_csv_agent_parameters(default_model):
from muse.readers.csv import read_csv_agent_parameters

path = default_model / "technodata" / "Agents.csv"
data = read_csv_agent_parameters(path)

assert data == [
{
"name": "A1",
"region": "R1",
"objectives": ["LCOE"],
"search_rules": "all",
"decision": {"name": "singleObj", "parameters": [("LCOE", True, 1)]},
"agent_type": "newcapa",
"quantity": 1,
"maturity_threshhold": -1,
"spend_limit": np.inf,
"share": "agent_share_1",
},
{
"name": "A1",
"region": "R1",
"objectives": ["LCOE"],
"search_rules": "all",
"decision": {"name": "singleObj", "parameters": [("LCOE", True, 1)]},
"agent_type": "retrofit",
"quantity": 1,
"maturity_threshhold": -1,
"spend_limit": np.inf,
"share": "agent_share_2",
},
]


def test_read_initial_market(default_model):
from muse.readers.csv import read_initial_market
from muse.readers.toml import read_settings

settings = read_settings(default_model / "settings.toml")
path = default_model / "input" / "Projections.csv"
data = read_initial_market(path, timeslices=settings.timeslices)

assert isinstance(data, xr.Dataset)
assert set(data.dims) == {"region", "year", "commodity", "timeslice"}
assert dict(data.dtypes) == dict(
prices=np.float64,
exports=np.float64,
imports=np.float64,
static_trade=np.float64,
)
assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["year"].values) == list(range(2010, 2105, 5))
assert list(data.coords["commodity"].values) == [
"electricity",
"gas",
"heat",
"CO2f",
"wind",
]
assert (
list(data.coords["units_prices"].values)
== ["MUS$2010/PJ"] * 3 + ["MUS$2010/kt"] * 2
)
month_values = ["all-year"] * 6
day_values = ["all-week"] * 6
hour_values = [
"night",
"morning",
"afternoon",
"early-peak",
"late-peak",
"evening",
]

assert list(data.coords["timeslice"].values) == list(
zip(month_values, day_values, hour_values)
)
assert list(data.coords["month"]) == month_values
assert list(data.coords["day"]) == day_values
assert list(data.coords["hour"]) == hour_values

assert all(var.coords.equals(data.coords) for var in data.data_vars.values())


def test_read_attribute_table(default_model):
from muse.readers.csv import read_attribute_table

path = default_model / "input" / "Projections.csv"
data = read_attribute_table(path)

assert isinstance(data, xr.DataArray)
assert data.dtype == np.float64

assert set(data.dims) == {"region", "year", "commodity"}
assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["year"].values) == list(range(2010, 2105, 5))
assert list(data.coords["commodity"].values) == [
"electricity",
"gas",
"heat",
"CO2f",
"wind",
]
assert (
list(data.coords["units_commodity_price"].values)
== ["MUS$2010/PJ"] * 3 + ["MUS$2010/kt"] * 2
)


def test_read_csv_outputs(default_model):
from muse.readers.csv import read_csv_outputs

path = default_model / "technodata" / "preset" / "*Consumption.csv"
data = read_csv_outputs(str(path))

assert isinstance(data, xr.DataArray)
assert data.dtype == np.float64

assert set(data.dims) == {"year", "commodity", "region", "process", "timeslice"}
assert list(data.coords["region"].values) == ["R1"]
assert list(data.coords["process"].values) == ["gasboiler"]
assert list(data.coords["timeslice"].values) == list(range(1, 7))
assert list(data.coords["year"].values) == [2020, 2050]
assert list(data.coords["commodity"].values) == [
"electricity",
"gas",
"heat",
"CO2f",
"wind",
]


def test_check_utilization_not_all_zero_success():
Expand Down