From deec073b999c16fa8cc774f8b908ef97d34d3fad Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Mon, 11 Nov 2024 17:03:09 +0000 Subject: [PATCH 1/8] Create ProcessFlowRaw struct --- src/process.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/process.rs b/src/process.rs index eb891006f..7a28de639 100644 --- a/src/process.rs +++ b/src/process.rs @@ -58,16 +58,38 @@ pub enum FlowType { Flexible, } +#[derive(PartialEq, Debug, Deserialize)] +struct ProcessFlowRaw { + process_id: String, + commodity_id: String, + flow: f64, + #[serde(default)] + flow_type: FlowType, + #[serde(deserialize_with = "deserialise_flow_cost")] + flow_cost: f64, +} + +impl ProcessFlowRaw { + fn into_flow(self) -> ProcessFlow { + ProcessFlow { + process_id: self.process_id, + commodity_id: self.commodity_id, + flow: self.flow, + flow_type: self.flow_type, + flow_cost: self.flow_cost, + } + } +} + #[derive(PartialEq, Debug, Deserialize)] pub struct ProcessFlow { - pub process_id: String, + process_id: String, pub commodity_id: String, pub flow: f64, - #[serde(default)] pub flow_type: FlowType, - #[serde(deserialize_with = "deserialise_flow_cost")] pub flow_cost: f64, } + define_process_id_getter! {ProcessFlow} /// Custom deserialiser for flow cost - treat empty fields as 0.0 From e30052af9885600309b2557b6278d66b180a3672 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Wed, 11 Dec 2024 11:13:19 +0000 Subject: [PATCH 2/8] Convert method for ProcessFlow --- src/process.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/process.rs b/src/process.rs index 41ef7cbf6..f1275e133 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,7 +1,10 @@ #![allow(missing_docs)] use crate::commodity::Commodity; +use crate::commodity::CommodityCostMap; +use crate::commodity::CommodityType; use crate::input::*; use crate::region::*; +use crate::time_slice::TimeSliceLevel; use crate::time_slice::{TimeSliceInfo, TimeSliceSelection}; use ::log::warn; use anyhow::{bail, ensure, Context, Result}; @@ -78,9 +81,18 @@ struct ProcessFlowRaw { impl ProcessFlowRaw { fn into_flow(self) -> ProcessFlow { + let commodity = Rc::new(Commodity { + id: Rc::from(self.commodity_id), + description: "".to_string(), + kind: CommodityType::InputCommodity, + time_slice_level: TimeSliceLevel::Annual, + costs: CommodityCostMap::new(), + demand_by_region: HashMap::new(), + }); + ProcessFlow { process_id: self.process_id, - commodity_id: self.commodity_id, + commodity, flow: self.flow, flow_type: self.flow_type, flow_cost: self.flow_cost, @@ -93,7 +105,7 @@ pub struct ProcessFlow { /// A unique identifier for the process (typically uses a structured naming convention). pub process_id: String, /// Identifies the commodity for the specified flow - pub commodity_id: String, + pub commodity: Rc, /// Commodity flow quantity relative to other commodity flows. +ve value indicates flow out, -ve value indicates flow in. pub flow: f64, #[serde(default)] From 31788eefd70fd6509d897e5a80287df20b23c296 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Fri, 13 Dec 2024 03:43:37 +0000 Subject: [PATCH 3/8] Correctly fetch Commodity and convert to ProcessFlow --- src/process.rs | 65 +++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/process.rs b/src/process.rs index f1275e133..c95ac097c 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,10 +1,7 @@ #![allow(missing_docs)] use crate::commodity::Commodity; -use crate::commodity::CommodityCostMap; -use crate::commodity::CommodityType; use crate::input::*; use crate::region::*; -use crate::time_slice::TimeSliceLevel; use crate::time_slice::{TimeSliceInfo, TimeSliceSelection}; use ::log::warn; use anyhow::{bail, ensure, Context, Result}; @@ -79,23 +76,21 @@ struct ProcessFlowRaw { flow_cost: f64, } +define_process_id_getter! {ProcessFlowRaw} + impl ProcessFlowRaw { - fn into_flow(self) -> ProcessFlow { - let commodity = Rc::new(Commodity { - id: Rc::from(self.commodity_id), - description: "".to_string(), - kind: CommodityType::InputCommodity, - time_slice_level: TimeSliceLevel::Annual, - costs: CommodityCostMap::new(), - demand_by_region: HashMap::new(), - }); - - ProcessFlow { - process_id: self.process_id, - commodity, - flow: self.flow, - flow_type: self.flow_type, - flow_cost: self.flow_cost, + fn into_flow(self, commodities: &HashMap, Rc>) -> ProcessFlow { + let commodity = commodities.get(self.commodity_id.as_str()); + + match commodity { + None => panic!("{} is not a valid commodity ID", &self.commodity_id), + Some(commodity) => ProcessFlow { + process_id: self.process_id, + commodity: Rc::clone(commodity), + flow: self.flow, + flow_type: self.flow_type, + flow_cost: self.flow_cost, + }, } } } @@ -108,10 +103,8 @@ pub struct ProcessFlow { pub commodity: Rc, /// Commodity flow quantity relative to other commodity flows. +ve value indicates flow out, -ve value indicates flow in. pub flow: f64, - #[serde(default)] /// Identifies if a flow is fixed or flexible. pub flow_type: FlowType, - #[serde(deserialize_with = "deserialise_flow_cost")] /// Cost per unit flow. For example, cost per unit of natural gas produced. Differs from var_opex because the user can apply it to any specified flow, whereas var_opex applies to pac flow. pub flow_cost: f64, } @@ -321,6 +314,31 @@ fn read_process_availabilities( .with_context(|| input_err_msg(&file_path)) } +fn read_process_flows( + file_path: &Path, + process_ids: &HashSet>, + commodities: &HashMap, Rc>, +) -> Result, Vec>> { + // Read raw process flows from file then convert raw flows to ProcessFlow + let process_flows: HashMap, Vec> = + read_csv_grouped_by_id(file_path, process_ids)?; + + // Convert raw flows to ProcessFlow + let flows: HashMap, Vec> = process_flows + .into_iter() + .map(|(id, raw_flows)| { + let flows = raw_flows + .into_iter() + .map(|raw_flow| raw_flow.into_flow(commodities)) + .collect::>(); + + (id, flows) + }) + .collect(); + + Ok(flows) +} + fn read_process_parameters_from_iter( iter: I, process_ids: &HashSet>, @@ -333,18 +351,15 @@ where for param in iter { let param = param.into_parameter(year_range)?; let id = process_ids.get_id(¶m.process_id)?; - ensure!( params.insert(Rc::clone(&id), param).is_none(), "More than one parameter provided for process {id}" ); } - ensure!( params.len() == process_ids.len(), "Each process must have an associated parameter" ); - Ok(params) } @@ -442,7 +457,7 @@ pub fn read_processes( let mut availabilities = read_process_availabilities(model_dir, &process_ids, time_slice_info)?; let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME); - let mut flows = read_csv_grouped_by_id(&file_path, &process_ids)?; + let mut flows = read_process_flows(&file_path, &process_ids, commodities)?; let mut pacs = read_process_pacs(model_dir, &process_ids, commodities)?; let mut parameters = read_process_parameters(model_dir, &process_ids, year_range)?; let file_path = model_dir.join(PROCESS_REGIONS_FILE_NAME); From f1bab955a7be6718c74f48fcf41117e39864951e Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Mon, 16 Dec 2024 14:51:01 +0000 Subject: [PATCH 4/8] move into_flow logic in to read_process_flows_from_iter --- src/process.rs | 65 +++++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/src/process.rs b/src/process.rs index c95ac097c..9ee170ddc 100644 --- a/src/process.rs +++ b/src/process.rs @@ -78,23 +78,6 @@ struct ProcessFlowRaw { define_process_id_getter! {ProcessFlowRaw} -impl ProcessFlowRaw { - fn into_flow(self, commodities: &HashMap, Rc>) -> ProcessFlow { - let commodity = commodities.get(self.commodity_id.as_str()); - - match commodity { - None => panic!("{} is not a valid commodity ID", &self.commodity_id), - Some(commodity) => ProcessFlow { - process_id: self.process_id, - commodity: Rc::clone(commodity), - flow: self.flow, - flow_type: self.flow_type, - flow_cost: self.flow_cost, - }, - } - } -} - #[derive(PartialEq, Debug, Deserialize)] pub struct ProcessFlow { /// A unique identifier for the process (typically uses a structured naming convention). @@ -314,29 +297,51 @@ fn read_process_availabilities( .with_context(|| input_err_msg(&file_path)) } +fn read_process_flows_from_iter( + iter: I, + commodities: &HashMap, Rc>, +) -> Result> +where + I: Iterator, +{ + let mut flows = Vec::new(); + for flow_raw in iter { + let commodity = commodities + .get(flow_raw.commodity_id.as_str()) + .with_context(|| format!("{} is not a valid commodity ID", &flow_raw.commodity_id))?; + + let flow = ProcessFlow { + process_id: flow_raw.process_id, + commodity: Rc::clone(commodity), + flow: flow_raw.flow, + flow_type: flow_raw.flow_type, + flow_cost: flow_raw.flow_cost, + }; + + flows.push(flow); + } + + Ok(flows) +} + fn read_process_flows( file_path: &Path, process_ids: &HashSet>, commodities: &HashMap, Rc>, ) -> Result, Vec>> { // Read raw process flows from file then convert raw flows to ProcessFlow - let process_flows: HashMap, Vec> = + let mut process_flow_raws: HashMap, Vec> = read_csv_grouped_by_id(file_path, process_ids)?; - // Convert raw flows to ProcessFlow - let flows: HashMap, Vec> = process_flows - .into_iter() - .map(|(id, raw_flows)| { - let flows = raw_flows - .into_iter() - .map(|raw_flow| raw_flow.into_flow(commodities)) - .collect::>(); + let mut process_flows = HashMap::new(); - (id, flows) - }) - .collect(); + for process_id in process_ids.iter() { + let iter = process_flow_raws.remove(process_id).unwrap().into_iter(); + let flows = read_process_flows_from_iter(iter, commodities)?; + process_flows.insert(process_id.to_owned(), flows); + } - Ok(flows) + Ok(process_flows) } fn read_process_parameters_from_iter( From d1bdc489b6011ffef4e5d0a7b86c9d990bee208f Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Mon, 16 Dec 2024 15:46:50 +0000 Subject: [PATCH 5/8] Add tests --- src/process.rs | 108 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/src/process.rs b/src/process.rs index 9ee170ddc..af8fc56ac 100644 --- a/src/process.rs +++ b/src/process.rs @@ -640,6 +640,114 @@ mod tests { ); } + #[test] + fn test_read_process_flows_from_iter_good() { + let commodities: HashMap, Rc> = ["commodity1", "commodity2"] + .into_iter() + .map(|id| { + let commodity = Commodity { + id: id.into(), + description: "Some description".into(), + kind: CommodityType::InputCommodity, + time_slice_level: TimeSliceLevel::Annual, + costs: CommodityCostMap::new(), + demand_by_region: HashMap::new(), + }; + + (Rc::clone(&commodity.id), commodity.into()) + }) + .collect(); + + let flows_raw = [ + ProcessFlowRaw { + process_id: "id1".into(), + commodity_id: "commodity1".into(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlowRaw { + process_id: "id1".into(), + commodity_id: "commodity2".into(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlowRaw { + process_id: "id2".into(), + commodity_id: "commodity1".into(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ]; + + let expected: Vec = vec![ + ProcessFlow { + process_id: "id1".into(), + commodity: commodities.get("commodity1").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlow { + process_id: "id1".into(), + commodity: commodities.get("commodity2").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlow { + process_id: "id2".into(), + commodity: commodities.get("commodity1").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ]; + + let actual = read_process_flows_from_iter(flows_raw.into_iter(), &commodities).unwrap(); + assert_eq!(expected, actual); + } + + #[test] + fn test_read_process_flows_from_iter_bad_commodity_id() { + let commodities = ["commodity1", "commodity2"] + .into_iter() + .map(|id| { + let commodity = Commodity { + id: id.into(), + description: "Some description".into(), + kind: CommodityType::InputCommodity, + time_slice_level: TimeSliceLevel::Annual, + costs: CommodityCostMap::new(), + demand_by_region: HashMap::new(), + }; + + (Rc::clone(&commodity.id), commodity.into()) + }) + .collect(); + + let flows_raw = [ + ProcessFlowRaw { + process_id: "id1".into(), + commodity_id: "commodity1".into(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlowRaw { + process_id: "id1".into(), + commodity_id: "commodity3".into(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ]; + + assert!(read_process_flows_from_iter(flows_raw.into_iter(), &commodities).is_err()); + } + #[test] fn test_read_process_parameters_from_iter_good() { let year_range = 2000..=2100; From c4fdd9fda27889edc9cd4e7740c6b3451984e0e6 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Mon, 16 Dec 2024 15:55:53 +0000 Subject: [PATCH 6/8] Consistent passing of model_dir --- src/process.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/process.rs b/src/process.rs index af8fc56ac..8c55850ba 100644 --- a/src/process.rs +++ b/src/process.rs @@ -325,13 +325,13 @@ where } fn read_process_flows( - file_path: &Path, + model_dir: &Path, process_ids: &HashSet>, commodities: &HashMap, Rc>, ) -> Result, Vec>> { - // Read raw process flows from file then convert raw flows to ProcessFlow + let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME); let mut process_flow_raws: HashMap, Vec> = - read_csv_grouped_by_id(file_path, process_ids)?; + read_csv_grouped_by_id(&file_path, process_ids)?; let mut process_flows = HashMap::new(); @@ -461,8 +461,7 @@ pub fn read_processes( let process_ids = HashSet::from_iter(descriptions.keys().cloned()); let mut availabilities = read_process_availabilities(model_dir, &process_ids, time_slice_info)?; - let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME); - let mut flows = read_process_flows(&file_path, &process_ids, commodities)?; + let mut flows = read_process_flows(model_dir, &process_ids, commodities)?; let mut pacs = read_process_pacs(model_dir, &process_ids, commodities)?; let mut parameters = read_process_parameters(model_dir, &process_ids, year_range)?; let file_path = model_dir.join(PROCESS_REGIONS_FILE_NAME); From 5f7bcf3ecf0258bea6b01514222990d099d3bb4a Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Thu, 19 Dec 2024 18:20:28 +0000 Subject: [PATCH 7/8] Refactor and move processing logic into read_process_flows_from_iter. Fix tests. --- src/process.rs | 100 ++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 47 deletions(-) diff --git a/src/process.rs b/src/process.rs index 8c55850ba..585683596 100644 --- a/src/process.rs +++ b/src/process.rs @@ -299,29 +299,28 @@ fn read_process_availabilities( fn read_process_flows_from_iter( iter: I, + process_ids: &HashSet>, commodities: &HashMap, Rc>, -) -> Result> +) -> Result, Vec>> where I: Iterator, { - let mut flows = Vec::new(); - for flow_raw in iter { + iter.map(|flow_raw| -> Result { let commodity = commodities .get(flow_raw.commodity_id.as_str()) .with_context(|| format!("{} is not a valid commodity ID", &flow_raw.commodity_id))?; - let flow = ProcessFlow { + Ok(ProcessFlow { process_id: flow_raw.process_id, commodity: Rc::clone(commodity), flow: flow_raw.flow, flow_type: flow_raw.flow_type, flow_cost: flow_raw.flow_cost, - }; - - flows.push(flow); - } - - Ok(flows) + }) + }) + .collect::>>()? + .into_iter() + .into_id_map(process_ids) } fn read_process_flows( @@ -330,18 +329,9 @@ fn read_process_flows( commodities: &HashMap, Rc>, ) -> Result, Vec>> { let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME); - let mut process_flow_raws: HashMap, Vec> = - read_csv_grouped_by_id(&file_path, process_ids)?; - - let mut process_flows = HashMap::new(); - - for process_id in process_ids.iter() { - let iter = process_flow_raws.remove(process_id).unwrap().into_iter(); - let flows = read_process_flows_from_iter(iter, commodities)?; - process_flows.insert(process_id.to_owned(), flows); - } - - Ok(process_flows) + let process_flow_csv = read_csv(&file_path)?; + read_process_flows_from_iter(process_flow_csv, process_ids, commodities) + .with_context(|| input_err_msg(&file_path)) } fn read_process_parameters_from_iter( @@ -495,6 +485,7 @@ pub fn read_processes( #[cfg(test)] mod tests { + use crate::commodity::{CommodityCostMap, CommodityType}; use crate::time_slice::TimeSliceLevel; @@ -641,6 +632,7 @@ mod tests { #[test] fn test_read_process_flows_from_iter_good() { + let process_ids = ["id1".into(), "id2".into()].into_iter().collect(); let commodities: HashMap, Rc> = ["commodity1", "commodity2"] .into_iter() .map(|id| { @@ -681,36 +673,47 @@ mod tests { }, ]; - let expected: Vec = vec![ - ProcessFlow { - process_id: "id1".into(), - commodity: commodities.get("commodity1").unwrap().clone(), - flow: 1.0, - flow_type: FlowType::Fixed, - flow_cost: 1.0, - }, - ProcessFlow { - process_id: "id1".into(), - commodity: commodities.get("commodity2").unwrap().clone(), - flow: 1.0, - flow_type: FlowType::Fixed, - flow_cost: 1.0, - }, - ProcessFlow { - process_id: "id2".into(), - commodity: commodities.get("commodity1").unwrap().clone(), - flow: 1.0, - flow_type: FlowType::Fixed, - flow_cost: 1.0, - }, - ]; + let expected = HashMap::from([ + ( + "id1".into(), + vec![ + ProcessFlow { + process_id: "id1".into(), + commodity: commodities.get("commodity1").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ProcessFlow { + process_id: "id1".into(), + commodity: commodities.get("commodity2").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }, + ], + ), + ( + "id2".into(), + vec![ProcessFlow { + process_id: "id2".into(), + commodity: commodities.get("commodity1").unwrap().clone(), + flow: 1.0, + flow_type: FlowType::Fixed, + flow_cost: 1.0, + }], + ), + ]); - let actual = read_process_flows_from_iter(flows_raw.into_iter(), &commodities).unwrap(); + let actual = + read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities) + .unwrap(); assert_eq!(expected, actual); } #[test] fn test_read_process_flows_from_iter_bad_commodity_id() { + let process_ids = ["id1".into(), "id2".into()].into_iter().collect(); let commodities = ["commodity1", "commodity2"] .into_iter() .map(|id| { @@ -744,7 +747,10 @@ mod tests { }, ]; - assert!(read_process_flows_from_iter(flows_raw.into_iter(), &commodities).is_err()); + assert!( + read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities) + .is_err() + ); } #[test] From c29a99aba0be4a2b45ea698bde05fe011d5db9f8 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Fri, 20 Dec 2024 11:01:32 +0000 Subject: [PATCH 8/8] Addresses PR feedback Removes the unused read_csv_grouped_by_id function. Adds a comment to read_process_flows_from_iter so it's clear what it's doing. Refactors code to makes use of process_results. --- src/input.rs | 82 -------------------------------------------------- src/process.rs | 5 ++- 2 files changed, 2 insertions(+), 85 deletions(-) diff --git a/src/input.rs b/src/input.rs index c9ca85804..163e733ad 100644 --- a/src/input.rs +++ b/src/input.rs @@ -166,28 +166,6 @@ where } } -/// Read a CSV file, grouping the entries by ID -/// -/// # Arguments -/// -/// * `file_path` - Path to CSV file -/// * `ids` - All possible IDs that will be encountered -/// -/// # Returns -/// -/// A HashMap with ID as a key and a vector of CSV data as a value or an error. -pub fn read_csv_grouped_by_id( - file_path: &Path, - ids: &HashSet>, -) -> Result, Vec>> -where - T: HasID + DeserializeOwned, -{ - read_csv(file_path)? - .into_id_map(ids) - .with_context(|| input_err_msg(file_path)) -} - #[cfg(test)] mod tests { use super::*; @@ -285,64 +263,4 @@ mod tests { assert!(deserialise_f64(f64::NAN).is_err()); assert!(deserialise_f64(f64::INFINITY).is_err()); } - - fn create_ids() -> HashSet> { - HashSet::from(["A".into(), "B".into()]) - } - - #[test] - fn test_read_csv_grouped_by_id() { - let dir = tempdir().unwrap(); - let file_path = dir.path().join("data.csv"); - { - let file_path: &Path = &file_path; // cast - let mut file = File::create(file_path).unwrap(); - writeln!(file, "id,value\nA,1\nB,2\nA,3").unwrap(); - } - - let expected = HashMap::from([ - ( - "A".into(), - vec![ - Record { - id: "A".to_string(), - value: 1, - }, - Record { - id: "A".to_string(), - value: 3, - }, - ], - ), - ( - "B".into(), - vec![Record { - id: "B".to_string(), - value: 2, - }], - ), - ]); - let process_ids = create_ids(); - let file_path = dir.path().join("data.csv"); - let map = read_csv_grouped_by_id::(&file_path, &process_ids); - assert_eq!(expected, map.unwrap()); - } - - #[test] - #[should_panic] - fn test_read_csv_grouped_by_id_duplicate() { - let dir = tempdir().unwrap(); - let file_path = dir.path().join("data.csv"); - { - let file_path: &Path = &file_path; // cast - let mut file = File::create(file_path).unwrap(); - - // NB: Process ID "C" isn't valid - writeln!(file, "process_id,value\nA,1\nB,2\nC,3").unwrap(); - } - - // Check that it fails if a non-existent process ID is provided - let process_ids = create_ids(); - read_csv_grouped_by_id::(&file_path, &process_ids).unwrap(); - } } diff --git a/src/process.rs b/src/process.rs index ae1b39bb1..95c9a290e 100644 --- a/src/process.rs +++ b/src/process.rs @@ -290,6 +290,7 @@ fn read_process_availabilities( .with_context(|| input_err_msg(&file_path)) } +/// Read 'ProcessFlowRaw' records from an iterator and convert them into 'ProcessFlow' records. fn read_process_flows_from_iter( iter: I, process_ids: &HashSet>, @@ -311,9 +312,7 @@ where flow_cost: flow_raw.flow_cost, }) }) - .collect::>>()? - .into_iter() - .into_id_map(process_ids) + .process_results(|iter| iter.into_id_map(process_ids))? } fn read_process_flows(