Skip to content
Merged
145 changes: 123 additions & 22 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::input::*;
use crate::region::*;
use crate::time_slice::{TimeSliceInfo, TimeSliceSelection};
use ::log::warn;
use anyhow::{bail, ensure, Context, Result};
use anyhow::{ensure, Context, Result};
use itertools::Itertools;
use serde::{Deserialize, Deserializer};
use serde_string_enum::{DeserializeLabeledStringEnum, SerializeLabeledStringEnum};
Expand Down Expand Up @@ -54,7 +54,9 @@ pub struct ProcessAvailability {
}
define_process_id_getter! {ProcessAvailability}

#[derive(PartialEq, Default, Debug, SerializeLabeledStringEnum, DeserializeLabeledStringEnum)]
#[derive(
PartialEq, Default, Debug, Clone, SerializeLabeledStringEnum, DeserializeLabeledStringEnum,
)]
pub enum FlowType {
#[default]
#[string = "fixed"]
Expand All @@ -65,7 +67,7 @@ pub enum FlowType {
Flexible,
}

#[derive(PartialEq, Debug, Deserialize)]
#[derive(PartialEq, Debug, Deserialize, Clone)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Clone (and the one above) only added for the tests? I'm not sure if we want to make the structs clone-able just for the testing, it opens up cloning as an option in other parts of the code, which we maybe don't want? @alexdewar

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't really thought about it from that perspective, but that's an interesting thought. I don't think we'll want instances of these objects outside of the input layer, either with "clone()" or "new()". That said, I think we can catch this kind of broken code at review time.

You possibly could make it a test-only feature by doing something like this:

#[cfg(test)])
#[derive(Clone)]
#[derive(PartialEq, Debug, Deserialize)]

Not sure it's worth the effort though.

pub struct ProcessFlow {
/// A unique identifier for the process (typically uses a structured naming convention).
pub process_id: String,
Expand Down Expand Up @@ -330,27 +332,78 @@ fn read_process_pacs_from_iter<I>(
iter: I,
process_ids: &HashSet<Rc<str>>,
commodities: &HashMap<Rc<str>, Rc<Commodity>>,
flows: &HashMap<Rc<str>, Vec<ProcessFlow>>,
) -> Result<HashMap<Rc<str>, Vec<Rc<Commodity>>>>
where
I: Iterator<Item = ProcessPAC>,
{
// Keep track of previous PACs so we can check for duplicates
let mut pacs = HashSet::new();
let mut existing_pacs = HashSet::new();

// Build hashmap of process ID to PAC commodities
let pacs = iter
.map(|pac| {
let process_id = process_ids.get_id(&pac.process_id)?;
let commodity = commodities
.get(pac.commodity_id.as_str())
.with_context(|| format!("{} is not a valid commodity ID", &pac.commodity_id))?;

// Check that commodity is valid and PAC is not a duplicate
ensure!(existing_pacs.insert(pac), "Duplicate PACs found");
Ok((process_id, Rc::clone(commodity)))
})
.process_results(|iter| iter.into_group_map())?;

iter.map(|pac| {
let process_id = process_ids.get_id(&pac.process_id)?;
let commodity = commodities.get(pac.commodity_id.as_str());
// Check that PACs for each process are either all inputs or all outputs
validate_pac_flows(&pacs, flows)?;

match commodity {
None => bail!("{} is not a valid commodity ID", &pac.commodity_id),
Some(commodity) => {
ensure!(pacs.insert(pac), "Duplicate PACs found");
// Return result
Ok(pacs)
}

Ok((process_id, Rc::clone(commodity)))
/// Validate that the PACs for each process are either all inputs or all outputs.
///
/// # Arguments
///
/// * `pacs` - A map of process IDs to PAC commodities
/// * `flows` - A map of process IDs to process flows
///
/// # Returns
/// An `Ok(())` if the check is successful, or an error.
fn validate_pac_flows(
pacs: &HashMap<Rc<str>, Vec<Rc<Commodity>>>,
flows: &HashMap<Rc<str>, Vec<ProcessFlow>>,
) -> Result<()> {
for (process_id, pacs) in pacs.iter() {
// Get the flows for the process (unwrap is safe as every process has associated flows)
let flows = flows.get(process_id).unwrap();

let mut flow_sign: Option<bool> = None; // False for inputs, true for outputs
for pac in pacs.iter() {
// Find the flow associated with the PAC
let flow = flows
.iter()
.find(|item| *item.commodity_id.as_str() == *pac.id)
.with_context(|| {
format!(
"PAC {} for process {} must have an associated flow",
pac.id, process_id
)
})?;

// Check that flow sign is consistent
let current_flow_sign = flow.flow > 0.0;
if let Some(flow_sign) = flow_sign {
ensure!(
current_flow_sign == flow_sign,
"PACs for process {} are a mix of inputs and outputs",
process_id
);
}
flow_sign = Some(current_flow_sign);
}
})
.process_results(|iter| iter.into_group_map())
}
Ok(())
}

/// Read process Primary Activity Commodities (PACs) from the specified model directory.
Expand All @@ -364,10 +417,11 @@ fn read_process_pacs(
model_dir: &Path,
process_ids: &HashSet<Rc<str>>,
commodities: &HashMap<Rc<str>, Rc<Commodity>>,
flows: &HashMap<Rc<str>, Vec<ProcessFlow>>,
) -> Result<HashMap<Rc<str>, Vec<Rc<Commodity>>>> {
let file_path = model_dir.join(PROCESS_PACS_FILE_NAME);
let process_pacs_csv = read_csv(&file_path)?;
read_process_pacs_from_iter(process_pacs_csv, process_ids, commodities)
read_process_pacs_from_iter(process_pacs_csv, process_ids, commodities, flows)
.with_context(|| input_err_msg(&file_path))
}

Expand Down Expand Up @@ -398,7 +452,7 @@ pub fn read_processes(
let mut availabilities = read_process_availabilities(model_dir, &process_ids, time_slice_info)?;
let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
let mut flows = read_csv_grouped_by_id(&file_path, &process_ids)?;
let mut pacs = read_process_pacs(model_dir, &process_ids, commodities)?;
let mut pacs = read_process_pacs(model_dir, &process_ids, commodities, &flows)?;
let mut parameters = read_process_parameters(model_dir, &process_ids, year_range)?;
let file_path = model_dir.join(PROCESS_REGIONS_FILE_NAME);
let mut regions =
Expand Down Expand Up @@ -717,6 +771,7 @@ mod tests {

#[test]
fn test_read_process_pacs_from_iter() {
// Prepare test data
let process_ids = ["id1".into(), "id2".into()].into_iter().collect();
let commodities = ["commodity1", "commodity2"]
.into_iter()
Expand All @@ -729,28 +784,53 @@ mod tests {
costs: CommodityCostMap::new(),
demand_by_region: HashMap::new(),
};

(Rc::clone(&commodity.id), commodity.into())
})
.collect();
let flows: HashMap<Rc<str>, Vec<ProcessFlow>> = ["id1", "id2"]
.into_iter()
.map(|process_id| {
(
process_id.into(),
["commodity1", "commodity2"]
.into_iter()
.map(|commodity_id| ProcessFlow {
process_id: process_id.into(),
commodity_id: commodity_id.into(),
flow: 1.0,
flow_type: FlowType::Fixed,
flow_cost: 1.0,
})
.collect(),
)
})
.collect();

// duplicate PAC
let pac = ProcessPAC {
process_id: "id1".into(),
commodity_id: "commodity1".into(),
};
let pacs = [pac.clone(), pac];
assert!(read_process_pacs_from_iter(pacs.into_iter(), &process_ids, &commodities).is_err());
assert!(
read_process_pacs_from_iter(pacs.into_iter(), &process_ids, &commodities, &flows)
.is_err()
);

// invalid commodity ID
let bad_pac = ProcessPAC {
process_id: "id1".into(),
commodity_id: "other_commodity".into(),
};
assert!(
read_process_pacs_from_iter([bad_pac].into_iter(), &process_ids, &commodities).is_err()
);
assert!(read_process_pacs_from_iter(
[bad_pac].into_iter(),
&process_ids,
&commodities,
&flows
)
.is_err());

// Valid
let pacs = [
ProcessPAC {
process_id: "id1".into(),
Expand Down Expand Up @@ -787,8 +867,29 @@ mod tests {
.into_iter()
.collect();
assert!(
read_process_pacs_from_iter(pacs.into_iter(), &process_ids, &commodities).unwrap()
read_process_pacs_from_iter(
pacs.clone().into_iter(),
&process_ids,
&commodities,
&flows
)
.unwrap()
== expected
);

// Invalid flows
// Making commodity1 an input so the PACs for process id1 are a mix of inputs and outputs
let mut flows = flows.clone();
flows
.get_mut(&Rc::from("id1"))
.unwrap()
.iter_mut()
.find(|flow| flow.commodity_id == "commodity1")
.unwrap()
.flow = -1.0;
assert!(
read_process_pacs_from_iter(pacs.into_iter(), &process_ids, &commodities, &flows)
.is_err()
);
}
}