From 3b497a915cf1d52f2ac328b76bb4db349ef905a9 Mon Sep 17 00:00:00 2001 From: Michael Lazear Date: Fri, 15 Nov 2024 13:42:26 -0800 Subject: [PATCH 1/8] feat: writer for mz_parquet v0.2 format --- Writer/ParquetSpectrumWriter.cs | 325 +++++++++++++------------------- 1 file changed, 130 insertions(+), 195 deletions(-) diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 2703286..01d7b20 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -1,257 +1,192 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Reflection; using log4net; -using Parquet; -using Parquet.Data; +using Parquet.Serialization; using ThermoFisher.CommonCore.Data; using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; using ThermoFisher.CommonCore.Data.Interfaces; -using ThermoRawFileParser.Writer.MzML; namespace ThermoRawFileParser.Writer { + + struct MzParquet + { + public uint scan; + public uint level; + public float rt; + public float mz; + public float intensity; + public float? ion_mobility; + public float? isolation_lower; + public float? isolation_upper; + public uint? precursor_scan; + public float? precursor_mz; + public uint? precursor_charge; + } + public class ParquetSpectrumWriter : SpectrumWriter { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - private IRawDataPlus _rawFile; - - // Dictionary to keep track of the different mass analyzers (key: Thermo MassAnalyzerType; value: the reference string) - private readonly Dictionary _massAnalyzers = - new Dictionary(); - - private readonly Dictionary _ionizationTypes = - new Dictionary(); - public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { } - public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastScanNumber) + public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber) { - _rawFile = rawFile; - if (rawFile.HasMsData) - { - List pScans = new List(); - WritePScans(ParseInput.OutputDirectory, rawFile.FileName, rawFile, pScans); - } - else + if (!raw.HasMsData) { throw new RawFileParserException("No MS data in RAW file, no output will be produced"); } - } - private static void WritePScans(string outputDirectory, string fileName, - IRawDataPlus raw, - List scans) - { var enumerator = raw.GetFilteredScanEnumerator(" "); - foreach (var scanNumber in enumerator - ) // note in my tests serial is faster than Parallel.Foreach() (this involves disk access, so it makes sense) - { - //trailer information is extracted via index - var trailers = raw.GetTrailerExtraValues(scanNumber); - var trailerLabels = raw.GetTrailerExtraInformation(scanNumber); - object chargeState = 0; - for (int i = 0; i < trailerLabels.Labels.Length; i++) - { - if (trailerLabels.Labels[i] == "Charge State:") - { - chargeState = raw.GetTrailerExtraValue(scanNumber, i); - break; - } - } + // NB: replace with more robust strategy? + var output = ParseInput.OutputDirectory + "//" + Path.GetFileNameWithoutExtension(ParseInput.RawFilePath) + ".mzparquet"; + + ParquetSerializerOptions opts = new ParquetSerializerOptions(); + opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest; + opts.CompressionMethod = Parquet.CompressionMethod.Zstd; + + var data = new List(); + + // map last (msOrder - 1) -> scan number (e.g. mapping precursors) + // note, this assumes time dependence of MS1 -> MS2 -> MSN + var last_scans = new Dictionary(); + + foreach (var scanNumber in enumerator) + { var scanFilter = raw.GetFilterForScanNumber(scanNumber); - var scanStats = raw.GetScanStatsForScanNumber(scanNumber); CentroidStream centroidStream = new CentroidStream(); - //check for FT mass analyzer data + // Pull out m/z and intensity values + // NB: is this the best way to do this? if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerFTMS) { centroidStream = raw.GetCentroidStream(scanNumber, false); } - - //check for IT mass analyzer data - if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS) + else if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS) { var scanData = raw.GetSimplifiedScan(scanNumber); centroidStream.Masses = scanData.Masses; centroidStream.Intensities = scanData.Intensities; } - - var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder; - - if (msOrder == MSOrderType.Ms) + else { - var pscan = GetPScan(scanStats, centroidStream, fileName, Convert.ToInt32(chargeState)); - scans.Add(pscan); + var scanData = raw.GetSimplifiedCentroids(scanNumber); + centroidStream.Masses = scanData.Masses; + centroidStream.Intensities = scanData.Intensities; } - if (msOrder == MSOrderType.Ms2) - { - var precursorMz = raw.GetScanEventForScanNumber(scanNumber).GetReaction(0).PrecursorMass; - var pscan = GetPScan(scanStats, centroidStream, fileName, precursorMz, - Convert.ToInt32(chargeState)); - scans.Add(pscan); - } - var t = raw.GetTrailerExtraValues(scanNumber); - } + var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder; - WriteScans(outputDirectory, scans, fileName); - } + last_scans[(int)msOrder] = (uint)scanNumber; - private static PScan GetPScan(ScanStatistics scanStats, CentroidStream centroidStream, - string fileName, double? precursorMz = null, int? precursorCharge = null) - { - var scan = new PScan - { - FileName = fileName, - BasePeakMass = scanStats.BasePeakMass, - ScanType = scanStats.ScanType, - BasePeakIntensity = scanStats.BasePeakIntensity, - PacketType = scanStats.PacketType, - ScanNumber = scanStats.ScanNumber, - RetentionTime = scanStats.StartTime, - Masses = centroidStream.Masses.ToArray(), - Intensities = centroidStream.Intensities.ToArray(), - LowMass = scanStats.LowMass, - HighMass = scanStats.HighMass, - TIC = scanStats.TIC, - FileId = fileName, - PrecursorMz = precursorMz, - PrecursorCharge = precursorCharge, - MsOrder = 1 - }; - return scan; - } + double rt = raw.RetentionTimeFromScanNumber(scanNumber); + float? isolation_lower = null; + float? isolation_upper = null; + uint? precursor_scan = null; + float? precursor_mz = null; + uint? precursor_charge = null; - public static void WriteScans(string outputDirectory, List scans, string sourceRawFileName) - { - throw new NotImplementedException(); + if ((int)msOrder > 1) + { + var rx = scanFilter.GetReaction(0); - //Needs refactoring since Parquet.NET API changed - - //var output = outputDirectory + "//" + Path.GetFileNameWithoutExtension(sourceRawFileName); + // this assumes symmetrical quad window + isolation_lower = (float)(rx.PrecursorMass - rx.IsolationWidth / 2); + isolation_upper = (float)(rx.PrecursorMass + rx.IsolationWidth / 2); + precursor_mz = (float)rx.PrecursorMass; - //var ds = new DataSet(new DataField("BasePeakIntensity"), - // new DataField("BasePeakMass"), - // new DataField("Baselines"), - // new DataField("Charges"), - // new DataField("FileId"), - // new DataField("FileName"), - // new DataField("HighMass"), - // new DataField("Intensities"), - // new DataField("LowMass"), - // new DataField("Masses"), - // new DataField("MsOrder"), - // new DataField("Noises"), - // new DataField("PacketType"), - // new DataField("PrecursorCharge"), - // new DataField("PrecursorMass"), - // new DataField("Resolutions"), - // new DataField("RetentionTime"), - // new DataField("ScanNumber"), - // new DataField("ScanType"), - // new DataField("TIC")); + // Try to retrieve last scan that occurred at the previous msOrder + uint t; + if (last_scans.TryGetValue((int)msOrder - 1, out t)) + { + precursor_scan = t; + } + } - //foreach (var scan in scans) - //{ - // //we can't store null values? - // double[] dummyVal = new double[1]; - // if (scan.Noises == null) - // { - // scan.Noises = dummyVal; - // } + var trailer = raw.GetTrailerExtraInformation(scanNumber); + for (var i = 0l; i < trailer.Length; i++) + { - // if (scan.Charges == null) - // { - // scan.Charges = dummyVal; - // } + if (trailer.Labels[i].StartsWith("Monoisotopic M/Z")) + { + var val = float.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_mz = val; + } + } - // if (scan.Baselines == null) - // { - // scan.Baselines = dummyVal; - // } + // Overwrite precursor_scan with value from trailer, if it exists + if (trailer.Labels[i].StartsWith("Master Scan")) + { + var val = Int64.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_scan = (uint)val; + } + } - // if (scan.Resolutions == null) - // { - // scan.Resolutions = dummyVal; - // } + if (trailer.Labels[i].StartsWith("Charge")) + { + var val = uint.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_charge = val; + } + } + } - // if (scan.PrecursorMz == null) - // { - // scan.PrecursorMz = 0; - // scan.PrecursorCharge = 0; - // } + // Add a row to parquet file for every m/z value in this scan + for (int i = 0; i < centroidStream.Masses.Length; i++) + { + MzParquet m; + m.rt = (float)rt; + m.scan = (uint)scanNumber; + m.level = ((uint)msOrder); + m.intensity = (float)centroidStream.Intensities[i]; + m.mz = (float)centroidStream.Masses[i]; + m.isolation_lower = isolation_lower; + m.isolation_upper = isolation_upper; + m.precursor_scan = precursor_scan; + m.precursor_mz = precursor_mz; + m.precursor_charge = precursor_charge; + m.ion_mobility = null; + data.Add(m); + } + + // If we have enough ions to write a row group, do so + // - some row groups might have more than this number of ions + // but this ensures that all ions from a single scan are always + // present in the same row group (critical property of mzparquet) + if (data.Count >= 1_048_576) + { + var task = ParquetSerializer.SerializeAsync(data, output, opts); + task.Wait(); + opts.Append = true; + data.Clear(); + Log.Info("writing row group"); + } - // ds.Add(scan.BasePeakIntensity, - // scan.BasePeakMass, - // scan.Baselines, - // scan.Charges, - // scan.FileId, - // scan.FileName, - // scan.HighMass, - // scan.Intensities, - // scan.LowMass, - // scan.Masses, - // scan.MsOrder, - // scan.Noises, - // scan.PacketType, - // scan.PrecursorCharge, - // scan.PrecursorMz, - // scan.Resolutions, - // scan.RetentionTime, - // scan.ScanNumber, - // scan.ScanType, - // scan.TIC); - //} + } - //using (Stream fileStream = File.OpenWrite(output + ".parquet")) - //{ - // using (var writer = new ParquetWriter(fileStream)) - // { - // writer.Write(ds); - // } - //} + // serialize any remaining ions into the final row group + if (data.Count > 0) + { + var task = ParquetSerializer.SerializeAsync(data, output, opts); + task.Wait(); + Log.Info("writing row group"); + } } } - /// PSCan meaing Parsec Scan (because Commoncore has a Scan class) - /// - public class PScan - { - /// - /// Unique ID per file (foreign key in data store) - /// - public string FileId { get; set; } - - public string FileName { get; set; } - public double BasePeakIntensity { get; set; } - public double BasePeakMass { get; set; } - public double[] Baselines { get; set; } - public double[] Charges { get; set; } - public double HighMass { get; set; } - public double[] Intensities { get; set; } - public double LowMass { get; set; } - public double[] Masses { get; set; } - public double[] Noises { get; set; } - public int PacketType { get; set; } // : 20, - public double RetentionTime { get; set; } - public double[] Resolutions { get; set; } - public int ScanNumber { get; set; } - public string ScanType { get; set; } // : FTMS + c ESI d Full ms2 335.9267@hcd30.00 [130.0000-346.0000], - public double TIC { get; set; } - public int MsOrder { get; set; } - public double? PrecursorMz { get; set; } - public int? PrecursorCharge { get; set; } - } } \ No newline at end of file From 14afbf68361a79f0ae0450096d0c0e19b51b1cdf Mon Sep 17 00:00:00 2001 From: caetera Date: Tue, 25 Feb 2025 20:12:28 +0100 Subject: [PATCH 2/8] Unified output path resolution --- MainClass.cs | 3 ++ ThermoRawFileParserTest/WriterTests.cs | 25 +++++++++++++++++ Writer/ParquetSpectrumWriter.cs | 13 +++++---- Writer/SpectrumWriter.cs | 39 ++++++++------------------ 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/MainClass.cs b/MainClass.cs index 6ae27e1..99ed51a 100644 --- a/MainClass.cs +++ b/MainClass.cs @@ -768,6 +768,9 @@ private static void RegularParametersParsing(string[] args) if (parseInput.OutputFormat == OutputFormat.IndexMzML) parseInput.OutputFormat = OutputFormat.MzML; } + // Switch off gzip compression for Parquet + if (parseInput.OutputFormat == OutputFormat.Parquet) parseInput.Gzip = false; + parseInput.MaxLevel = parseInput.MsLevel.Max(); if (parseInput.S3Url != null && parseInput.S3AccessKeyId != null && diff --git a/ThermoRawFileParserTest/WriterTests.cs b/ThermoRawFileParserTest/WriterTests.cs index 53e89b0..4d20cbb 100644 --- a/ThermoRawFileParserTest/WriterTests.cs +++ b/ThermoRawFileParserTest/WriterTests.cs @@ -281,5 +281,30 @@ public void TestMzML_MS2() Assert.That(testMzMl.run.chromatogramList.chromatogram[0].defaultArrayLength, Is.EqualTo(95)); } + + [Test] + public void TestParquet() + { + // Get temp path for writing the test mzML + var tempFilePath = Path.GetTempPath(); + + var testRawFile = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"Data/small.RAW"); + var parseInput = new ParseInput(testRawFile, null, tempFilePath, OutputFormat.Parquet); + + RawFileParser.Parse(parseInput); + + // Actual test + //var xmlSerializer = new XmlSerializer(typeof(mzMLType)); + //var testMzMl = (mzMLType)xmlSerializer.Deserialize(new FileStream( + // Path.Combine(tempFilePath, "small.mzML"), FileMode.Open, FileAccess.Read, FileShare.ReadWrite)); + + //Assert.That(testMzMl.run.spectrumList.count, Is.EqualTo("48")); + //Assert.That(testMzMl.run.spectrumList.spectrum.Length, Is.EqualTo(48)); + + //Assert.That(testMzMl.run.chromatogramList.count, Is.EqualTo("1")); + //Assert.That(testMzMl.run.chromatogramList.chromatogram.Length, Is.EqualTo(1)); + + //Assert.That(testMzMl.run.chromatogramList.chromatogram[0].defaultArrayLength, Is.EqualTo(48)); + } } } \ No newline at end of file diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 01d7b20..2285455 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.IO; using System.Reflection; using log4net; using Parquet.Serialization; @@ -43,17 +42,18 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu throw new RawFileParserException("No MS data in RAW file, no output will be produced"); } + //TODO: Correct iterator based on MS-level filter var enumerator = raw.GetFilteredScanEnumerator(" "); - // NB: replace with more robust strategy? - var output = ParseInput.OutputDirectory + "//" + Path.GetFileNameWithoutExtension(ParseInput.RawFilePath) + ".mzparquet"; - + ConfigureWriter(".mzparquet"); + ParquetSerializerOptions opts = new ParquetSerializerOptions(); opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest; opts.CompressionMethod = Parquet.CompressionMethod.Zstd; var data = new List(); + //TODO Precursor tree // map last (msOrder - 1) -> scan number (e.g. mapping precursors) // note, this assumes time dependence of MS1 -> MS2 -> MSN var last_scans = new Dictionary(); @@ -63,6 +63,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu { var scanFilter = raw.GetFilterForScanNumber(scanNumber); + //TODO Centroiding if centroidStream is not available CentroidStream centroidStream = new CentroidStream(); // Pull out m/z and intensity values @@ -170,7 +171,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu // present in the same row group (critical property of mzparquet) if (data.Count >= 1_048_576) { - var task = ParquetSerializer.SerializeAsync(data, output, opts); + var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); task.Wait(); opts.Append = true; data.Clear(); @@ -182,7 +183,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu // serialize any remaining ions into the final row group if (data.Count > 0) { - var task = ParquetSerializer.SerializeAsync(data, output, opts); + var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); task.Wait(); Log.Info("writing row group"); } diff --git a/Writer/SpectrumWriter.cs b/Writer/SpectrumWriter.cs index 3b1d02b..3f466c4 100644 --- a/Writer/SpectrumWriter.cs +++ b/Writer/SpectrumWriter.cs @@ -68,42 +68,27 @@ protected void ConfigureWriter(string extension) return; } - if (ParseInput.OutputFile == null) + var fileName = NormalizeFileName(ParseInput.OutputFile, extension, ParseInput.Gzip); + if (ParseInput.OutputFormat == OutputFormat.Parquet) { - var fullExtension = ParseInput.Gzip ? extension + ".gz" : extension; - if (!ParseInput.Gzip || ParseInput.OutputFormat == OutputFormat.IndexMzML) - { - Writer = File.CreateText(ParseInput.OutputDirectory + "//" + - ParseInput.RawFileNameWithoutExtension + - extension); - } - else - { - var fileStream = File.Create(ParseInput.OutputDirectory + "//" + - ParseInput.RawFileNameWithoutExtension + fullExtension); - var compress = new GZipStream(fileStream, CompressionMode.Compress); - Writer = new StreamWriter(compress); - } + Writer = new StreamWriter(File.Create(fileName)); + } + else if (!ParseInput.Gzip || ParseInput.OutputFormat == OutputFormat.IndexMzML) + { + Writer = File.CreateText(fileName); } else { - var fileName = NormalizeFileName(ParseInput.OutputFile, extension, ParseInput.Gzip); - if (!ParseInput.Gzip || ParseInput.OutputFormat == OutputFormat.IndexMzML) - { - Writer = File.CreateText(fileName); - } - else - { - var fileStream = File.Create(fileName); - var compress = new GZipStream(fileStream, CompressionMode.Compress); - Writer = new StreamWriter(compress); - } + var fileStream = File.Create(fileName); + var compress = new GZipStream(fileStream, CompressionMode.Compress); + Writer = new StreamWriter(compress); } + } private string NormalizeFileName(string outputFile, string extension, bool gzip) { - string result = outputFile; + string result = outputFile == null ? Path.Combine(ParseInput.OutputDirectory, ParseInput.RawFileNameWithoutExtension) : outputFile; string tail = ""; string[] extensions; From a26a5ab3393992637deabf95d4b7de0fa1ae365f Mon Sep 17 00:00:00 2001 From: caetera Date: Wed, 26 Feb 2025 19:41:51 +0100 Subject: [PATCH 3/8] Precursor logic streamlined The same precursor selection logic as for other file formats TBD: Move some of the service functions to the SpectrumWriter base class --- Writer/MzMlSpectrumWriter.cs | 2 +- Writer/ParquetSpectrumWriter.cs | 296 ++++++++++++++++++++++++++------ 2 files changed, 241 insertions(+), 57 deletions(-) diff --git a/Writer/MzMlSpectrumWriter.cs b/Writer/MzMlSpectrumWriter.cs index 7868b39..940b8c3 100644 --- a/Writer/MzMlSpectrumWriter.cs +++ b/Writer/MzMlSpectrumWriter.cs @@ -1286,7 +1286,7 @@ private SpectrumType ConstructMSSpectrum(int scanNumber) int? charge = trailerData.AsPositiveInt("Charge State:"); double? monoisotopicMz = trailerData.AsDouble("Monoisotopic M/Z:"); double? ionInjectionTime = trailerData.AsDouble("Ion Injection Time (ms):"); - double? isolationWidth = trailerData.AsDouble("MS" + (int) scanFilter.MSOrder + " Isolation Width:"); + double? isolationWidth = trailerData.AsDouble("MS" + msLevel + " Isolation Width:"); double? FAIMSCV = null; if (trailerData.AsBool("FAIMS Voltage On:").GetValueOrDefault(false)) FAIMSCV = trailerData.AsDouble("FAIMS CV:"); diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 2285455..42849fe 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Reflection; +using System.Text.RegularExpressions; using log4net; using Parquet.Serialization; using ThermoFisher.CommonCore.Data; @@ -10,7 +12,6 @@ namespace ThermoRawFileParser.Writer { - struct MzParquet { public uint scan; @@ -21,18 +22,35 @@ struct MzParquet public float? ion_mobility; public float? isolation_lower; public float? isolation_upper; - public uint? precursor_scan; + public int? precursor_scan; public float? precursor_mz; public uint? precursor_charge; } + struct PrecursorData + { + public float? mz; + public float? isolation_lower; + public float? isolation_upper; + } + public class ParquetSpectrumWriter : SpectrumWriter { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private readonly Regex _filterStringIsolationMzPattern = new Regex(@"ms\d+ (.+?) \["); + + // Precursor scan number (value) and isolation m/z (key) for reference in the precursor element of an MSn spectrum + private readonly Dictionary _precursorScanNumbers = new Dictionary(); + + //Precursor information for scans + private Dictionary _precursorTree = new Dictionary(); + public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { + _precursorScanNumbers[""] = -1; + _precursorTree[-1] = new PrecursorInfo(); } public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber) @@ -53,15 +71,15 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu var data = new List(); - //TODO Precursor tree - // map last (msOrder - 1) -> scan number (e.g. mapping precursors) - // note, this assumes time dependence of MS1 -> MS2 -> MSN - var last_scans = new Dictionary(); - - foreach (var scanNumber in enumerator) { var scanFilter = raw.GetFilterForScanNumber(scanNumber); + + // Get the scan event for this scan number + var scanEvent = raw.GetScanEventForScanNumber(scanNumber); + + // Get scan ms level + var msLevel = (int)scanFilter.MSOrder; //TODO Centroiding if centroidStream is not available CentroidStream centroidStream = new CentroidStream(); @@ -85,66 +103,113 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu centroidStream.Intensities = scanData.Intensities; } + ScanTrailer trailerData; - var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder; + try + { + trailerData = new ScanTrailer(raw.GetTrailerExtraInformation(scanNumber)); + } + catch (Exception ex) + { + Log.WarnFormat("Cannot load trailer infromation for scan {0} due to following exception\n{1}", scanNumber, ex.Message); + ParseInput.NewWarn(); + trailerData = new ScanTrailer(); + } - last_scans[(int)msOrder] = (uint)scanNumber; + int? trailer_charge = trailerData.AsPositiveInt("Charge State:"); + double? trailer_mz = trailerData.AsDouble("Monoisotopic M/Z:"); + double? trailer_isolationWidth = trailerData.AsDouble("MS" + msLevel + " Isolation Width:"); + double? FAIMSCV = null; + if (trailerData.AsBool("FAIMS Voltage On:").GetValueOrDefault(false)) + FAIMSCV = trailerData.AsDouble("FAIMS CV:"); double rt = raw.RetentionTimeFromScanNumber(scanNumber); - float? isolation_lower = null; - float? isolation_upper = null; - uint? precursor_scan = null; - float? precursor_mz = null; - uint? precursor_charge = null; - - if ((int)msOrder > 1) + int precursor_scan = 0; + PrecursorData precursor_data = new PrecursorData { - var rx = scanFilter.GetReaction(0); - - // this assumes symmetrical quad window - isolation_lower = (float)(rx.PrecursorMass - rx.IsolationWidth / 2); - isolation_upper = (float)(rx.PrecursorMass + rx.IsolationWidth / 2); - precursor_mz = (float)rx.PrecursorMass; + isolation_lower = null, + isolation_upper = null, + mz = null + + }; - // Try to retrieve last scan that occurred at the previous msOrder - uint t; - if (last_scans.TryGetValue((int)msOrder - 1, out t)) - { - precursor_scan = t; - } + if (msLevel == 1) + { + // Keep track of scan number for precursor reference + _precursorScanNumbers[""] = scanNumber; + _precursorTree[scanNumber] = new PrecursorInfo(); } - - var trailer = raw.GetTrailerExtraInformation(scanNumber); - for (var i = 0l; i < trailer.Length; i++) + else if (msLevel > 1) { - - if (trailer.Labels[i].StartsWith("Monoisotopic M/Z")) + // Keep track of scan number and isolation m/z for precursor reference + var result = _filterStringIsolationMzPattern.Match(scanEvent.ToString()); + if (result.Success) { - var val = float.Parse(trailer.Values[i]); - if (val > 0) + if (_precursorScanNumbers.ContainsKey(result.Groups[1].Value)) { - precursor_mz = val; + _precursorScanNumbers.Remove(result.Groups[1].Value); } + + _precursorScanNumbers.Add(result.Groups[1].Value, scanNumber); } - // Overwrite precursor_scan with value from trailer, if it exists - if (trailer.Labels[i].StartsWith("Master Scan")) + //update precursor scan if it is provided in trailer data + var trailerMasterScan = trailerData.AsPositiveInt("Master Scan Number:"); + if (trailerMasterScan.HasValue) { - var val = Int64.Parse(trailer.Values[i]); - if (val > 0) - { - precursor_scan = (uint)val; - } + precursor_scan = trailerMasterScan.Value; + } + else //try getting it from the scan filter + { + precursor_scan = GetParentFromScanString(result.Groups[1].Value); } - if (trailer.Labels[i].StartsWith("Charge")) + //finding precursor scan failed + if (precursor_scan == -2) { - var val = uint.Parse(trailer.Values[i]); - if (val > 0) + Log.Warn($"Cannot find precursor scan for scan# {scanNumber}"); + _precursorTree[-2] = new PrecursorInfo(0, msLevel, FindLastReaction(scanEvent, msLevel), null); + } + + //Parsing the last reaction + try + { + try //since there is no direct way to get the number of reactions available, it is necessary to try and fail { - precursor_charge = val; + scanEvent.GetReaction(_precursorTree[precursor_scan].ReactionCount); } + catch (ArgumentOutOfRangeException ex) + { + Log.Debug($"Using Tribrid decision tree fix for scan# {scanNumber}"); + //Is it a decision tree scheduled scan on tribrid? + if (msLevel == _precursorTree[precursor_scan].MSLevel) + { + precursor_scan = GetParentFromScanString(result.Groups[1].Value); + } + else + { + throw new RawFileParserException( + $"Tribrid decision tree fix failed - cannot get reaction# {_precursorTree[precursor_scan].ReactionCount} from {scanEvent.ToString()}", + ex); + } + } + + // Get Precursor m/z and isolation window borders + precursor_data = GetPrecursorData(precursor_scan, scanEvent, trailer_mz, trailer_isolationWidth, out var reactionCount); + + //save precursor information for later reference + _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, msLevel, reactionCount, null); + } + catch (Exception e) + { + var extra = (e.InnerException is null) ? "" : $"\n{e.InnerException.StackTrace}"; + + Log.Warn($"Could not get precursor data for scan# {scanNumber} - precursor information for this and dependent scans will be empty\nException details:{e.Message}\n{e.StackTrace}\n{extra}"); + ParseInput.NewWarn(); + + _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, 1, 0, null); } + } // Add a row to parquet file for every m/z value in this scan @@ -153,15 +218,15 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu MzParquet m; m.rt = (float)rt; m.scan = (uint)scanNumber; - m.level = ((uint)msOrder); + m.level = (uint)msLevel; m.intensity = (float)centroidStream.Intensities[i]; m.mz = (float)centroidStream.Masses[i]; - m.isolation_lower = isolation_lower; - m.isolation_upper = isolation_upper; + m.isolation_lower = precursor_data.isolation_lower; + m.isolation_upper = precursor_data.isolation_upper; m.precursor_scan = precursor_scan; - m.precursor_mz = precursor_mz; - m.precursor_charge = precursor_charge; - m.ion_mobility = null; + m.precursor_mz = precursor_data.mz; + m.precursor_charge = (uint?)trailer_charge; + m.ion_mobility = (float?)FAIMSCV; data.Add(m); } @@ -175,7 +240,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu task.Wait(); opts.Append = true; data.Clear(); - Log.Info("writing row group"); + Log.Debug("Writing next row group"); } } @@ -185,9 +250,128 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu { var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); task.Wait(); - Log.Info("writing row group"); + Log.Debug("Writing final row group"); + } + } + + private int GetParentFromScanString(string scanString) + { + var parts = Regex.Split(scanString, " "); + + //find the position of the first (from the end) precursor with a different mass + //to account for possible supplementary activations written in the filter + var lastIonMass = parts.Last().Split('@').First(); + int last = parts.Length; + while (last > 0 && + parts[last - 1].Split('@').First() == lastIonMass) + { + last--; + } + + string parentFilter = String.Join(" ", parts.Take(last)); + if (_precursorScanNumbers.ContainsKey(parentFilter)) + { + return _precursorScanNumbers[parentFilter]; + } + + return -2; //unsuccessful parsing + } + + + private int FindLastReaction(IScanEvent scanEvent, int msLevel) + { + int lastReactionIndex = msLevel - 2; + + //iteratively trying find the last available index for reaction + while (true) + { + try + { + scanEvent.GetReaction(lastReactionIndex + 1); + } + catch (ArgumentOutOfRangeException) + { + //stop trying + break; + } + + lastReactionIndex++; + } + + //supplemental activation flag is on -> one of the levels (not necissirily the last one) used supplemental activation + //check last two activations + if (scanEvent.SupplementalActivation == TriState.On) + { + var lastActivation = scanEvent.GetReaction(lastReactionIndex).ActivationType; + var beforeLastActivation = scanEvent.GetReaction(lastReactionIndex - 1).ActivationType; + + if ((beforeLastActivation == ActivationType.ElectronTransferDissociation || beforeLastActivation == ActivationType.ElectronCaptureDissociation) && + (lastActivation == ActivationType.CollisionInducedDissociation || lastActivation == ActivationType.HigherEnergyCollisionalDissociation)) + return lastReactionIndex - 1; //ETD or ECD followed by HCD or CID -> supplemental activation in the last level (move the last reaction one step back) + else + return lastReactionIndex; + } + else //just use the last one + { + return lastReactionIndex; } } + + private PrecursorData GetPrecursorData(int precursorScanNumber, IScanEventBase scanEvent, + double? monoisotopicMz, double? isolationWidth, out int reactionCount) + { + double? isolation_lower = null; + double? isolation_upper = null; + + // Get precursors from earlier levels + var prevPrecursors = _precursorTree[precursorScanNumber]; + reactionCount = prevPrecursors.ReactionCount; + + var reaction = scanEvent.GetReaction(reactionCount); + + //if isolation width was not found in the trailer, try to get one from the reaction + if (isolationWidth == null) isolationWidth = reaction.IsolationWidth; + if (isolationWidth < 0) isolationWidth = null; + + // Selected ion MZ + var selectedIonMz = CalculateSelectedIonMz(reaction, monoisotopicMz, isolationWidth); + + if (isolationWidth != null) + { + var offset = isolationWidth.Value / 2 + reaction.IsolationWidthOffset; + isolation_lower = reaction.PrecursorMass - isolationWidth.Value + offset; + isolation_upper = reaction.PrecursorMass + offset; + } + + // Activation only to keep track of the reactions + //increase reaction count + reactionCount++; + + //Sometimes the property of supplemental activation is not set (Tune v4 on Tribrid), + //or is On if *at least* one of the levels had SA (i.e. not necissirily the last one), thus we need to try (and posibly fail) + try + { + reaction = scanEvent.GetReaction(reactionCount); + + if (reaction != null) + { + //increase reaction count after successful parsing + reactionCount++; + } + } + catch (IndexOutOfRangeException) + { + // If we failed do nothing + } + + return new PrecursorData + { + mz = (float?)selectedIonMz, + isolation_lower = (float?)isolation_lower, + isolation_upper = (float?)isolation_upper + }; + + } } } \ No newline at end of file From 5d955bed80b2247aadd36459b3db4f79503d83b6 Mon Sep 17 00:00:00 2001 From: caetera Date: Wed, 26 Feb 2025 20:27:09 +0100 Subject: [PATCH 4/8] Unified mass spectrometry data Use the same method for mass-spec data creation --- Writer/ParquetSpectrumWriter.cs | 57 +++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 42849fe..b129b0f 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -81,27 +81,8 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu // Get scan ms level var msLevel = (int)scanFilter.MSOrder; - //TODO Centroiding if centroidStream is not available - CentroidStream centroidStream = new CentroidStream(); - - // Pull out m/z and intensity values - // NB: is this the best way to do this? - if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerFTMS) - { - centroidStream = raw.GetCentroidStream(scanNumber, false); - } - else if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS) - { - var scanData = raw.GetSimplifiedScan(scanNumber); - centroidStream.Masses = scanData.Masses; - centroidStream.Intensities = scanData.Intensities; - } - else - { - var scanData = raw.GetSimplifiedCentroids(scanNumber); - centroidStream.Masses = scanData.Masses; - centroidStream.Intensities = scanData.Intensities; - } + // Get Scan + var scan = Scan.FromFile(raw, scanNumber); ScanTrailer trailerData; @@ -212,15 +193,43 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu } + double[] masses; + double[] intensities; + + if (!ParseInput.NoPeakPicking.Contains(msLevel)) + { + // Check if the scan has a centroid stream + if (scan.HasCentroidStream) + { + masses = scan.CentroidScan.Masses; + intensities = scan.CentroidScan.Intensities; + } + else // otherwise take the segmented (low res) scan + { + // If the spectrum is profile perform centroiding + var segmentedScan = scanEvent.ScanData == ScanDataType.Profile + ? Scan.ToCentroid(scan).SegmentedScan + : scan.SegmentedScan; + + masses = segmentedScan.Positions; + intensities = segmentedScan.Intensities; + } + } + else // use the segmented data as is + { + masses = scan.SegmentedScan.Positions; + intensities = scan.SegmentedScan.Intensities; + } + // Add a row to parquet file for every m/z value in this scan - for (int i = 0; i < centroidStream.Masses.Length; i++) + for (int i = 0; i < masses.Length; i++) { MzParquet m; m.rt = (float)rt; m.scan = (uint)scanNumber; m.level = (uint)msLevel; - m.intensity = (float)centroidStream.Intensities[i]; - m.mz = (float)centroidStream.Masses[i]; + m.intensity = (float)intensities[i]; + m.mz = (float)masses[i]; m.isolation_lower = precursor_data.isolation_lower; m.isolation_upper = precursor_data.isolation_upper; m.precursor_scan = precursor_scan; From 8d880ad79053ff694e16f2443ea8765d7059fe50 Mon Sep 17 00:00:00 2001 From: caetera Date: Thu, 27 Feb 2025 19:58:56 +0100 Subject: [PATCH 5/8] MS level filter and progress --- Writer/ParquetSpectrumWriter.cs | 330 +++++++++++++++++--------------- 1 file changed, 179 insertions(+), 151 deletions(-) diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index b129b0f..de0cd11 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -9,6 +9,7 @@ using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; using ThermoFisher.CommonCore.Data.Interfaces; +using ThermoRawFileParser.Writer.MzML; namespace ThermoRawFileParser.Writer { @@ -60,9 +61,6 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu throw new RawFileParserException("No MS data in RAW file, no output will be produced"); } - //TODO: Correct iterator based on MS-level filter - var enumerator = raw.GetFilteredScanEnumerator(" "); - ConfigureWriter(".mzparquet"); ParquetSerializerOptions opts = new ParquetSerializerOptions(); @@ -71,195 +69,225 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu var data = new List(); - foreach (var scanNumber in enumerator) - { - var scanFilter = raw.GetFilterForScanNumber(scanNumber); - - // Get the scan event for this scan number - var scanEvent = raw.GetScanEventForScanNumber(scanNumber); - - // Get scan ms level - var msLevel = (int)scanFilter.MSOrder; + var lastScanProgress = 0; - // Get Scan - var scan = Scan.FromFile(raw, scanNumber); + Log.Info(String.Format("Processing {0} MS scans", +(1 + lastScanNumber - firstScanNumber))); - ScanTrailer trailerData; + for (var scanNumber = firstScanNumber; scanNumber <= lastScanNumber; scanNumber++) + { + if (ParseInput.LogFormat == LogFormat.DEFAULT) + { + var scanProgress = (int)((double)scanNumber / (lastScanNumber - firstScanNumber + 1) * 100); + if (scanProgress % ProgressPercentageStep == 0) + { + if (scanProgress != lastScanProgress) + { + Console.Write("" + scanProgress + "% "); + lastScanProgress = scanProgress; + } + } + } try { - trailerData = new ScanTrailer(raw.GetTrailerExtraInformation(scanNumber)); + int level = (int)raw.GetScanEventForScanNumber(scanNumber).MSOrder; //applying MS level filter + if (ParseInput.MsLevel.Contains(level)) + AddScan(raw, scanNumber, data); } catch (Exception ex) { - Log.WarnFormat("Cannot load trailer infromation for scan {0} due to following exception\n{1}", scanNumber, ex.Message); - ParseInput.NewWarn(); - trailerData = new ScanTrailer(); + Log.Error($"Scan #{scanNumber} cannot be processed because of the following exception: {ex.Message}\n{ex.StackTrace}"); + ParseInput.NewError(); } - int? trailer_charge = trailerData.AsPositiveInt("Charge State:"); - double? trailer_mz = trailerData.AsDouble("Monoisotopic M/Z:"); - double? trailer_isolationWidth = trailerData.AsDouble("MS" + msLevel + " Isolation Width:"); - double? FAIMSCV = null; - if (trailerData.AsBool("FAIMS Voltage On:").GetValueOrDefault(false)) - FAIMSCV = trailerData.AsDouble("FAIMS CV:"); - - double rt = raw.RetentionTimeFromScanNumber(scanNumber); - int precursor_scan = 0; - PrecursorData precursor_data = new PrecursorData - { - isolation_lower = null, - isolation_upper = null, - mz = null - - }; - - if (msLevel == 1) + // If we have enough ions to write a row group, do so + // - some row groups might have more than this number of ions + // but this ensures that all ions from a single scan are always + // present in the same row group (critical property of mzparquet) + if (data.Count >= 1_048_576) { - // Keep track of scan number for precursor reference - _precursorScanNumbers[""] = scanNumber; - _precursorTree[scanNumber] = new PrecursorInfo(); + var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); + task.Wait(); + opts.Append = true; + data.Clear(); + Log.Debug("Writing next row group"); } - else if (msLevel > 1) - { - // Keep track of scan number and isolation m/z for precursor reference - var result = _filterStringIsolationMzPattern.Match(scanEvent.ToString()); - if (result.Success) - { - if (_precursorScanNumbers.ContainsKey(result.Groups[1].Value)) - { - _precursorScanNumbers.Remove(result.Groups[1].Value); - } + } - _precursorScanNumbers.Add(result.Groups[1].Value, scanNumber); - } + // serialize any remaining ions into the final row group + if (data.Count > 0) + { + var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); + task.Wait(); + Log.Debug("Writing final row group"); + } + } - //update precursor scan if it is provided in trailer data - var trailerMasterScan = trailerData.AsPositiveInt("Master Scan Number:"); - if (trailerMasterScan.HasValue) - { - precursor_scan = trailerMasterScan.Value; - } - else //try getting it from the scan filter - { - precursor_scan = GetParentFromScanString(result.Groups[1].Value); - } + private void AddScan(IRawDataPlus raw, int scanNumber, List data) + { + var scanFilter = raw.GetFilterForScanNumber(scanNumber); - //finding precursor scan failed - if (precursor_scan == -2) - { - Log.Warn($"Cannot find precursor scan for scan# {scanNumber}"); - _precursorTree[-2] = new PrecursorInfo(0, msLevel, FindLastReaction(scanEvent, msLevel), null); - } + // Get the scan event for this scan number + var scanEvent = raw.GetScanEventForScanNumber(scanNumber); - //Parsing the last reaction - try - { - try //since there is no direct way to get the number of reactions available, it is necessary to try and fail - { - scanEvent.GetReaction(_precursorTree[precursor_scan].ReactionCount); - } - catch (ArgumentOutOfRangeException ex) - { - Log.Debug($"Using Tribrid decision tree fix for scan# {scanNumber}"); - //Is it a decision tree scheduled scan on tribrid? - if (msLevel == _precursorTree[precursor_scan].MSLevel) - { - precursor_scan = GetParentFromScanString(result.Groups[1].Value); - } - else - { - throw new RawFileParserException( - $"Tribrid decision tree fix failed - cannot get reaction# {_precursorTree[precursor_scan].ReactionCount} from {scanEvent.ToString()}", - ex); - } - } + // Get scan ms level + var msLevel = (int)scanFilter.MSOrder; - // Get Precursor m/z and isolation window borders - precursor_data = GetPrecursorData(precursor_scan, scanEvent, trailer_mz, trailer_isolationWidth, out var reactionCount); + // Get Scan + var scan = Scan.FromFile(raw, scanNumber); + ScanTrailer trailerData; - //save precursor information for later reference - _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, msLevel, reactionCount, null); - } - catch (Exception e) - { - var extra = (e.InnerException is null) ? "" : $"\n{e.InnerException.StackTrace}"; + try + { + trailerData = new ScanTrailer(raw.GetTrailerExtraInformation(scanNumber)); + } + catch (Exception ex) + { + Log.WarnFormat("Cannot load trailer infromation for scan {0} due to following exception\n{1}", scanNumber, ex.Message); + ParseInput.NewWarn(); + trailerData = new ScanTrailer(); + } + + int? trailer_charge = trailerData.AsPositiveInt("Charge State:"); + double? trailer_mz = trailerData.AsDouble("Monoisotopic M/Z:"); + double? trailer_isolationWidth = trailerData.AsDouble("MS" + msLevel + " Isolation Width:"); + double? FAIMSCV = null; + if (trailerData.AsBool("FAIMS Voltage On:").GetValueOrDefault(false)) + FAIMSCV = trailerData.AsDouble("FAIMS CV:"); - Log.Warn($"Could not get precursor data for scan# {scanNumber} - precursor information for this and dependent scans will be empty\nException details:{e.Message}\n{e.StackTrace}\n{extra}"); - ParseInput.NewWarn(); + double rt = raw.RetentionTimeFromScanNumber(scanNumber); + int precursor_scan = 0; + PrecursorData precursor_data = new PrecursorData + { + isolation_lower = null, + isolation_upper = null, + mz = null + + }; - _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, 1, 0, null); + if (msLevel == 1) + { + // Keep track of scan number for precursor reference + _precursorScanNumbers[""] = scanNumber; + _precursorTree[scanNumber] = new PrecursorInfo(); + } + else if (msLevel > 1) + { + // Keep track of scan number and isolation m/z for precursor reference + var result = _filterStringIsolationMzPattern.Match(scanEvent.ToString()); + if (result.Success) + { + if (_precursorScanNumbers.ContainsKey(result.Groups[1].Value)) + { + _precursorScanNumbers.Remove(result.Groups[1].Value); } + _precursorScanNumbers.Add(result.Groups[1].Value, scanNumber); } - double[] masses; - double[] intensities; + //update precursor scan if it is provided in trailer data + var trailerMasterScan = trailerData.AsPositiveInt("Master Scan Number:"); + if (trailerMasterScan.HasValue) + { + precursor_scan = trailerMasterScan.Value; + } + else //try getting it from the scan filter + { + precursor_scan = GetParentFromScanString(result.Groups[1].Value); + } - if (!ParseInput.NoPeakPicking.Contains(msLevel)) + //finding precursor scan failed + if (precursor_scan == -2) { - // Check if the scan has a centroid stream - if (scan.HasCentroidStream) + Log.Warn($"Cannot find precursor scan for scan# {scanNumber}"); + _precursorTree[-2] = new PrecursorInfo(0, msLevel, FindLastReaction(scanEvent, msLevel), null); + } + + //Parsing the last reaction + try + { + try //since there is no direct way to get the number of reactions available, it is necessary to try and fail { - masses = scan.CentroidScan.Masses; - intensities = scan.CentroidScan.Intensities; + scanEvent.GetReaction(_precursorTree[precursor_scan].ReactionCount); } - else // otherwise take the segmented (low res) scan + catch (ArgumentOutOfRangeException ex) { - // If the spectrum is profile perform centroiding - var segmentedScan = scanEvent.ScanData == ScanDataType.Profile - ? Scan.ToCentroid(scan).SegmentedScan - : scan.SegmentedScan; - - masses = segmentedScan.Positions; - intensities = segmentedScan.Intensities; + Log.Debug($"Using Tribrid decision tree fix for scan# {scanNumber}"); + //Is it a decision tree scheduled scan on tribrid? + if (msLevel == _precursorTree[precursor_scan].MSLevel) + { + precursor_scan = GetParentFromScanString(result.Groups[1].Value); + } + else + { + throw new RawFileParserException( + $"Tribrid decision tree fix failed - cannot get reaction# {_precursorTree[precursor_scan].ReactionCount} from {scanEvent.ToString()}", + ex); + } } + + // Get Precursor m/z and isolation window borders + precursor_data = GetPrecursorData(precursor_scan, scanEvent, trailer_mz, trailer_isolationWidth, out var reactionCount); + + //save precursor information for later reference + _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, msLevel, reactionCount, null); } - else // use the segmented data as is + catch (Exception e) { - masses = scan.SegmentedScan.Positions; - intensities = scan.SegmentedScan.Intensities; - } + var extra = (e.InnerException is null) ? "" : $"\n{e.InnerException.StackTrace}"; - // Add a row to parquet file for every m/z value in this scan - for (int i = 0; i < masses.Length; i++) - { - MzParquet m; - m.rt = (float)rt; - m.scan = (uint)scanNumber; - m.level = (uint)msLevel; - m.intensity = (float)intensities[i]; - m.mz = (float)masses[i]; - m.isolation_lower = precursor_data.isolation_lower; - m.isolation_upper = precursor_data.isolation_upper; - m.precursor_scan = precursor_scan; - m.precursor_mz = precursor_data.mz; - m.precursor_charge = (uint?)trailer_charge; - m.ion_mobility = (float?)FAIMSCV; - data.Add(m); + Log.Warn($"Could not get precursor data for scan# {scanNumber} - precursor information for this and dependent scans will be empty\nException details:{e.Message}\n{e.StackTrace}\n{extra}"); + ParseInput.NewWarn(); + + _precursorTree[scanNumber] = new PrecursorInfo(precursor_scan, 1, 0, null); } - // If we have enough ions to write a row group, do so - // - some row groups might have more than this number of ions - // but this ensures that all ions from a single scan are always - // present in the same row group (critical property of mzparquet) - if (data.Count >= 1_048_576) + } + + double[] masses; + double[] intensities; + + if (!ParseInput.NoPeakPicking.Contains(msLevel)) + { + // Check if the scan has a centroid stream + if (scan.HasCentroidStream) { - var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); - task.Wait(); - opts.Append = true; - data.Clear(); - Log.Debug("Writing next row group"); + masses = scan.CentroidScan.Masses; + intensities = scan.CentroidScan.Intensities; } + else // otherwise take the segmented (low res) scan + { + // If the spectrum is profile perform centroiding + var segmentedScan = scanEvent.ScanData == ScanDataType.Profile + ? Scan.ToCentroid(scan).SegmentedScan + : scan.SegmentedScan; + masses = segmentedScan.Positions; + intensities = segmentedScan.Intensities; + } + } + else // use the segmented data as is + { + masses = scan.SegmentedScan.Positions; + intensities = scan.SegmentedScan.Intensities; } - // serialize any remaining ions into the final row group - if (data.Count > 0) + // Add a row to parquet file for every m/z value in this scan + for (int i = 0; i < masses.Length; i++) { - var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); - task.Wait(); - Log.Debug("Writing final row group"); + MzParquet m; + m.rt = (float)rt; + m.scan = (uint)scanNumber; + m.level = (uint)msLevel; + m.intensity = (float)intensities[i]; + m.mz = (float)masses[i]; + m.isolation_lower = precursor_data.isolation_lower; + m.isolation_upper = precursor_data.isolation_upper; + m.precursor_scan = precursor_scan; + m.precursor_mz = precursor_data.mz; + m.precursor_charge = (uint?)trailer_charge; + m.ion_mobility = (float?)FAIMSCV; + data.Add(m); } } From 1e9b500ba480393fa5adb0c9d47286ef45942bcc Mon Sep 17 00:00:00 2001 From: caetera Date: Fri, 28 Feb 2025 16:38:36 +0100 Subject: [PATCH 6/8] Move PrecursorTree to general SpectrumWriter --- Writer/MgfSpectrumWriter.cs | 30 ++----------- Writer/MzMlSpectrumWriter.cs | 75 +------------------------------ Writer/ParquetSpectrumWriter.cs | 78 +-------------------------------- Writer/SpectrumWriter.cs | 76 ++++++++++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 178 deletions(-) diff --git a/Writer/MgfSpectrumWriter.cs b/Writer/MgfSpectrumWriter.cs index a6449bf..bf282af 100644 --- a/Writer/MgfSpectrumWriter.cs +++ b/Writer/MgfSpectrumWriter.cs @@ -1,9 +1,6 @@ using System; -using System.Collections.Generic; using System.Globalization; -using System.Linq; using System.Reflection; -using System.Text.RegularExpressions; using log4net; using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; @@ -19,15 +16,9 @@ public class MgfSpectrumWriter : SpectrumWriter private const string PositivePolarity = "+"; private const string NegativePolarity = "-"; - // Filter string - private readonly Regex _filterStringIsolationMzPattern = new Regex(@"ms\d+ (.+?) \["); - // Precursor scan number for MSn scans private int _precursorScanNumber; - // Precursor scan number (value) and isolation m/z (key) for reference in the precursor element of an MSn spectrum - private readonly Dictionary _precursorScanNumbers = new Dictionary(); - public MgfSpectrumWriter(ParseInput parseInput) : base(parseInput) { ParseInput.MsLevel.Remove(1); // MS1 spectra are not supposed to be in MGF @@ -126,23 +117,7 @@ public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastSc } else //try getting it from the scan filter { - var parts = Regex.Split(result.Groups[1].Value, " "); - - //find the position of the first (from the end) precursor with a different mass - //to account for possible supplementary activations written in the filter - var lastIonMass = parts.Last().Split('@').First(); - int last = parts.Length; - while (last > 0 && - parts[last - 1].Split('@').First() == lastIonMass) - { - last--; - } - - string parentFilter = String.Join(" ", parts.Take(last)); - if (_precursorScanNumbers.ContainsKey(parentFilter)) - { - _precursorScanNumber = _precursorScanNumbers[parentFilter]; - } + _precursorScanNumber = GetParentFromScanString(result.Groups[1].Value); } if (_precursorScanNumber > 0) @@ -151,7 +126,8 @@ public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastSc } else { - Log.Error($"Failed finding precursor for {scanNumber}"); + Log.Error($"Cannot find precursor scan for scan# {scanNumber}"); + _precursorTree[-2] = new PrecursorInfo(0, msLevel, FindLastReaction(scanEvent, msLevel), null); ParseInput.NewError(); } } diff --git a/Writer/MzMlSpectrumWriter.cs b/Writer/MzMlSpectrumWriter.cs index 940b8c3..12f17e7 100644 --- a/Writer/MzMlSpectrumWriter.cs +++ b/Writer/MzMlSpectrumWriter.cs @@ -27,8 +27,6 @@ public class MzMlSpectrumWriter : SpectrumWriter private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - private readonly Regex _filterStringIsolationMzPattern = new Regex(@"ms\d+ (.+?) \["); - // Tune version < 3 produces multiple trailer entry like "SPS Mass [number]" private readonly Regex _spSentry = new Regex(@"SPS Mass\s+\d+:"); @@ -45,12 +43,6 @@ public class MzMlSpectrumWriter : SpectrumWriter private readonly Dictionary _ionizationTypes = new Dictionary(); - // Precursor scan number (value) and isolation m/z (key) for reference in the precursor element of an MSn spectrum - private readonly Dictionary _precursorScanNumbers = new Dictionary(); - - //Precursor information for scans - private Dictionary _precursorTree = new Dictionary(); - private const string SourceFileId = "RAW1"; private readonly XmlSerializerFactory _factory = new XmlSerializerFactory(); private const string Ns = "http://psi.hupo.org/ms/mzml"; @@ -68,8 +60,6 @@ public MzMlSpectrumWriter(ParseInput parseInput) : base(parseInput) _mzMlNamespace.Add(string.Empty, "http://psi.hupo.org/ms/mzml"); _doIndexing = ParseInput.OutputFormat == OutputFormat.IndexMzML; _osOffset = Environment.NewLine == "\n" ? 0 : 1; - _precursorScanNumbers[""] = -1; - _precursorTree[-1] = new PrecursorInfo(); } /// @@ -639,7 +629,6 @@ public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastSc _writer.WriteValue(BitConverter.ToString(hash).Replace("-", "").ToLowerInvariant()); _writer.WriteEndElement(); // fileChecksum - _writer.WriteEndElement(); // indexedmzML } @@ -1374,6 +1363,7 @@ private SpectrumType ConstructMSSpectrum(int scanNumber) { Log.Warn($"Cannot find precursor scan for scan# {scanNumber}"); _precursorTree[-2] = new PrecursorInfo(0, msLevel, FindLastReaction(scanEvent, msLevel), new PrecursorType[0]); + ParseInput.NewWarn(); } try @@ -1938,46 +1928,6 @@ private SpectrumType ConstructMSSpectrum(int scanNumber) return spectrum; } - - private int FindLastReaction(IScanEvent scanEvent, int msLevel) - { - int lastReactionIndex = msLevel - 2; - - //iteratively trying find the last available index for reaction - while(true) - { - try - { - scanEvent.GetReaction(lastReactionIndex + 1); - } - catch (ArgumentOutOfRangeException) - { - //stop trying - break; - } - - lastReactionIndex++; - } - - //supplemental activation flag is on -> one of the levels (not necissirily the last one) used supplemental activation - //check last two activations - if (scanEvent.SupplementalActivation == TriState.On) - { - var lastActivation = scanEvent.GetReaction(lastReactionIndex).ActivationType; - var beforeLastActivation = scanEvent.GetReaction(lastReactionIndex - 1).ActivationType; - - if ((beforeLastActivation == ActivationType.ElectronTransferDissociation || beforeLastActivation == ActivationType.ElectronCaptureDissociation) && - (lastActivation == ActivationType.CollisionInducedDissociation || lastActivation == ActivationType.HigherEnergyCollisionalDissociation)) - return lastReactionIndex - 1; //ETD or ECD followed by HCD or CID -> supplemental activation in the last level (move the last reaction one step back) - else - return lastReactionIndex; - } - else //just use the last one - { - return lastReactionIndex; - } - } - private SpectrumType ConstructPDASpectrum(int scanNumber, int instrumentNumber) { // Get each scan from the RAW file @@ -2558,29 +2508,6 @@ private PrecursorListType ConstructPrecursorList(int precursorScanNumber, IScanE } - private int GetParentFromScanString(string scanString) - { - var parts = Regex.Split(scanString, " "); - - //find the position of the first (from the end) precursor with a different mass - //to account for possible supplementary activations written in the filter - var lastIonMass = parts.Last().Split('@').First(); - int last = parts.Length; - while (last > 0 && - parts[last - 1].Split('@').First() == lastIonMass) - { - last--; - } - - string parentFilter = String.Join(" ", parts.Take(last)); - if (_precursorScanNumbers.ContainsKey(parentFilter)) - { - return _precursorScanNumbers[parentFilter]; - } - - return -2; //unsuccessful parsing - } - /// /// Populate the scan list element. Full version used for mass spectra, /// having Scan Event, scan Filter etc diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index de0cd11..ef705d4 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -1,15 +1,11 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text.RegularExpressions; using log4net; using Parquet.Serialization; -using ThermoFisher.CommonCore.Data; using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; using ThermoFisher.CommonCore.Data.Interfaces; -using ThermoRawFileParser.Writer.MzML; namespace ThermoRawFileParser.Writer { @@ -40,18 +36,9 @@ public class ParquetSpectrumWriter : SpectrumWriter private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - private readonly Regex _filterStringIsolationMzPattern = new Regex(@"ms\d+ (.+?) \["); - - // Precursor scan number (value) and isolation m/z (key) for reference in the precursor element of an MSn spectrum - private readonly Dictionary _precursorScanNumbers = new Dictionary(); - - //Precursor information for scans - private Dictionary _precursorTree = new Dictionary(); - public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { - _precursorScanNumbers[""] = -1; - _precursorTree[-1] = new PrecursorInfo(); + //nothing to do here } public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber) @@ -291,69 +278,6 @@ private void AddScan(IRawDataPlus raw, int scanNumber, List data) } } - private int GetParentFromScanString(string scanString) - { - var parts = Regex.Split(scanString, " "); - - //find the position of the first (from the end) precursor with a different mass - //to account for possible supplementary activations written in the filter - var lastIonMass = parts.Last().Split('@').First(); - int last = parts.Length; - while (last > 0 && - parts[last - 1].Split('@').First() == lastIonMass) - { - last--; - } - - string parentFilter = String.Join(" ", parts.Take(last)); - if (_precursorScanNumbers.ContainsKey(parentFilter)) - { - return _precursorScanNumbers[parentFilter]; - } - - return -2; //unsuccessful parsing - } - - - private int FindLastReaction(IScanEvent scanEvent, int msLevel) - { - int lastReactionIndex = msLevel - 2; - - //iteratively trying find the last available index for reaction - while (true) - { - try - { - scanEvent.GetReaction(lastReactionIndex + 1); - } - catch (ArgumentOutOfRangeException) - { - //stop trying - break; - } - - lastReactionIndex++; - } - - //supplemental activation flag is on -> one of the levels (not necissirily the last one) used supplemental activation - //check last two activations - if (scanEvent.SupplementalActivation == TriState.On) - { - var lastActivation = scanEvent.GetReaction(lastReactionIndex).ActivationType; - var beforeLastActivation = scanEvent.GetReaction(lastReactionIndex - 1).ActivationType; - - if ((beforeLastActivation == ActivationType.ElectronTransferDissociation || beforeLastActivation == ActivationType.ElectronCaptureDissociation) && - (lastActivation == ActivationType.CollisionInducedDissociation || lastActivation == ActivationType.HigherEnergyCollisionalDissociation)) - return lastReactionIndex - 1; //ETD or ECD followed by HCD or CID -> supplemental activation in the last level (move the last reaction one step back) - else - return lastReactionIndex; - } - else //just use the last one - { - return lastReactionIndex; - } - } - private PrecursorData GetPrecursorData(int precursorScanNumber, IScanEventBase scanEvent, double? monoisotopicMz, double? isolationWidth, out int reactionCount) { diff --git a/Writer/SpectrumWriter.cs b/Writer/SpectrumWriter.cs index 3f466c4..8015a4d 100644 --- a/Writer/SpectrumWriter.cs +++ b/Writer/SpectrumWriter.cs @@ -1,13 +1,16 @@ using System; +using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Reflection; +using System.Text.RegularExpressions; using log4net; using ThermoFisher.CommonCore.Data; using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; using ThermoFisher.CommonCore.Data.Interfaces; using ThermoRawFileParser.Util; +using System.Linq; namespace ThermoRawFileParser.Writer { @@ -42,6 +45,15 @@ public abstract class SpectrumWriter : ISpectrumWriter /// private static LimitedSizeDictionary precursorCache; + // Precursor scan number (value) and isolation m/z (key) for reference in the precursor element of an MSn spectrum + private protected readonly Dictionary _precursorScanNumbers = new Dictionary(); + + //Precursor information for scans + private protected Dictionary _precursorTree = new Dictionary(); + + // Filter string regex to extract an isoaltion entry + private protected readonly Regex _filterStringIsolationMzPattern = new Regex(@"ms\d+ (.+?) \["); + /// /// Constructor. /// @@ -50,6 +62,8 @@ protected SpectrumWriter(ParseInput parseInput) { ParseInput = parseInput; precursorCache = new LimitedSizeDictionary(10); + _precursorScanNumbers[""] = -1; + _precursorTree[-1] = new PrecursorInfo(); } /// @@ -252,5 +266,67 @@ public static IReaction GetReaction(IScanEvent scanEvent, int scanNumber) return precursorIntensity; } + + private protected int GetParentFromScanString(string scanString) + { + var parts = Regex.Split(scanString, " "); + + //find the position of the first (from the end) precursor with a different mass + //to account for possible supplementary activations written in the filter + var lastIonMass = parts.Last().Split('@').First(); + int last = parts.Length; + while (last > 0 && + parts[last - 1].Split('@').First() == lastIonMass) + { + last--; + } + + string parentFilter = String.Join(" ", parts.Take(last)); + if (_precursorScanNumbers.ContainsKey(parentFilter)) + { + return _precursorScanNumbers[parentFilter]; + } + + return -2; //unsuccessful parsing + } + + private protected int FindLastReaction(IScanEvent scanEvent, int msLevel) + { + int lastReactionIndex = msLevel - 2; + + //iteratively trying find the last available index for reaction + while (true) + { + try + { + scanEvent.GetReaction(lastReactionIndex + 1); + } + catch (ArgumentOutOfRangeException) + { + //stop trying + break; + } + + lastReactionIndex++; + } + + //supplemental activation flag is on -> one of the levels (not necissirily the last one) used supplemental activation + //check last two activations + if (scanEvent.SupplementalActivation == TriState.On) + { + var lastActivation = scanEvent.GetReaction(lastReactionIndex).ActivationType; + var beforeLastActivation = scanEvent.GetReaction(lastReactionIndex - 1).ActivationType; + + if ((beforeLastActivation == ActivationType.ElectronTransferDissociation || beforeLastActivation == ActivationType.ElectronCaptureDissociation) && + (lastActivation == ActivationType.CollisionInducedDissociation || lastActivation == ActivationType.HigherEnergyCollisionalDissociation)) + return lastReactionIndex - 1; //ETD or ECD followed by HCD or CID -> supplemental activation in the last level (move the last reaction one step back) + else + return lastReactionIndex; + } + else //just use the last one + { + return lastReactionIndex; + } + } } } \ No newline at end of file From 5021515007297f697446129978d42b05a5a181bd Mon Sep 17 00:00:00 2001 From: caetera Date: Mon, 3 Mar 2025 20:53:50 +0100 Subject: [PATCH 7/8] Test for ParquetWriter --- .../ThermoRawFileParserTest.csproj | 1 + ThermoRawFileParserTest/WriterTests.cs | 47 +++++++++++++++---- Writer/MzMlSpectrumWriter.cs | 15 ------ Writer/ParquetSpectrumWriter.cs | 10 +++- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/ThermoRawFileParserTest/ThermoRawFileParserTest.csproj b/ThermoRawFileParserTest/ThermoRawFileParserTest.csproj index 7bb3f14..b1715cd 100644 --- a/ThermoRawFileParserTest/ThermoRawFileParserTest.csproj +++ b/ThermoRawFileParserTest/ThermoRawFileParserTest.csproj @@ -32,6 +32,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/ThermoRawFileParserTest/WriterTests.cs b/ThermoRawFileParserTest/WriterTests.cs index 4d20cbb..a28dda8 100644 --- a/ThermoRawFileParserTest/WriterTests.cs +++ b/ThermoRawFileParserTest/WriterTests.cs @@ -5,6 +5,7 @@ using System.Xml.Serialization; using IO.Mgf; using NUnit.Framework; +using Parquet; using ThermoRawFileParser; using ThermoRawFileParser.Writer.MzML; @@ -283,7 +284,7 @@ public void TestMzML_MS2() } [Test] - public void TestParquet() + public void TestParquetCentroid() { // Get temp path for writing the test mzML var tempFilePath = Path.GetTempPath(); @@ -294,17 +295,45 @@ public void TestParquet() RawFileParser.Parse(parseInput); // Actual test - //var xmlSerializer = new XmlSerializer(typeof(mzMLType)); - //var testMzMl = (mzMLType)xmlSerializer.Deserialize(new FileStream( - // Path.Combine(tempFilePath, "small.mzML"), FileMode.Open, FileAccess.Read, FileShare.ReadWrite)); + var parquetFilePath = Path.Combine(tempFilePath, "small.mzparquet"); - //Assert.That(testMzMl.run.spectrumList.count, Is.EqualTo("48")); - //Assert.That(testMzMl.run.spectrumList.spectrum.Length, Is.EqualTo(48)); + using (var parquetReader = ParquetReader.CreateAsync(parquetFilePath).Result) + { + var groupReader = parquetReader.OpenRowGroupReader(0); + var schema = parquetReader.Schema; + var scanColumn = groupReader.ReadColumnAsync(schema.FindDataField("scan")).Result; - //Assert.That(testMzMl.run.chromatogramList.count, Is.EqualTo("1")); - //Assert.That(testMzMl.run.chromatogramList.chromatogram.Length, Is.EqualTo(1)); + Assert.That(scanColumn.NumValues, Is.EqualTo(48520)); + Assert.That(scanColumn.Statistics.DistinctCount, Is.EqualTo(48)); + Assert.That((from int p in scanColumn.Data where p == 22 select p).Count(), Is.EqualTo(1632)); + } + } + + [Test] + public void TestParquetProfile() + { + // Get temp path for writing the test mzML + var tempFilePath = Path.GetTempPath(); - //Assert.That(testMzMl.run.chromatogramList.chromatogram[0].defaultArrayLength, Is.EqualTo(48)); + var testRawFile = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, @"Data/small.RAW"); + var parseInput = new ParseInput(testRawFile, null, tempFilePath, OutputFormat.Parquet); + parseInput.NoPeakPicking = new HashSet { 1, 2 }; + + RawFileParser.Parse(parseInput); + + // Actual test + var parquetFilePath = Path.Combine(tempFilePath, "small.mzparquet"); + + using (var parquetReader = ParquetReader.CreateAsync(parquetFilePath).Result) + { + var groupReader = parquetReader.OpenRowGroupReader(0); + var schema = parquetReader.Schema; + var scanColumn = groupReader.ReadColumnAsync(schema.FindDataField("scan")).Result; + + Assert.That(scanColumn.NumValues, Is.EqualTo(305213)); + Assert.That(scanColumn.Statistics.DistinctCount, Is.EqualTo(48)); + Assert.That((from int p in scanColumn.Data where p == 22 select p).Count(), Is.EqualTo(17758)); + } } } } \ No newline at end of file diff --git a/Writer/MzMlSpectrumWriter.cs b/Writer/MzMlSpectrumWriter.cs index 12f17e7..81677de 100644 --- a/Writer/MzMlSpectrumWriter.cs +++ b/Writer/MzMlSpectrumWriter.cs @@ -641,21 +641,6 @@ public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastSc Writer.Flush(); Writer.Close(); - - //This section is not necessary? - /*if (_doIndexing) - { - try - { - cryptoStream.Flush(); - cryptoStream.Close(); - } - catch (System.ObjectDisposedException e) - { - // Cannot access a closed file. CryptoStream was already closed when closing _writer - Log.Warn($"Warning: {e.Message}"); - } - }*/ } // In case of indexed mzML, change the extension from xml to mzML and check for the gzip option diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index ef705d4..0d503a6 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -36,6 +36,8 @@ public class ParquetSpectrumWriter : SpectrumWriter private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + private const int ParquetRowGroupSize = 1_048_576; + public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { //nothing to do here @@ -49,7 +51,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu } ConfigureWriter(".mzparquet"); - + ParquetSerializerOptions opts = new ParquetSerializerOptions(); opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest; opts.CompressionMethod = Parquet.CompressionMethod.Zstd; @@ -91,7 +93,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu // - some row groups might have more than this number of ions // but this ensures that all ions from a single scan are always // present in the same row group (critical property of mzparquet) - if (data.Count >= 1_048_576) + if (data.Count >= ParquetRowGroupSize) { var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); task.Wait(); @@ -108,6 +110,10 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu task.Wait(); Log.Debug("Writing final row group"); } + + // Release the OS file handle + Writer.Flush(); + Writer.Close(); } private void AddScan(IRawDataPlus raw, int scanNumber, List data) From 1ef4d8869ab5daef9d51e989457a85bcafb20b37 Mon Sep 17 00:00:00 2001 From: caetera Date: Tue, 4 Mar 2025 18:44:03 +0100 Subject: [PATCH 8/8] Precursor scan fix for MS level restricted files --- Writer/ParquetSpectrumWriter.cs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 0d503a6..438aa37 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -36,7 +36,7 @@ public class ParquetSpectrumWriter : SpectrumWriter private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - private const int ParquetRowGroupSize = 1_048_576; + private const int ParquetSliceSize = 1_048_576; public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { @@ -80,8 +80,13 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu try { int level = (int)raw.GetScanEventForScanNumber(scanNumber).MSOrder; //applying MS level filter - if (ParseInput.MsLevel.Contains(level)) - AddScan(raw, scanNumber, data); + if (level <= ParseInput.MaxLevel) // Primary MS level filter + { + var scanData = ReadScan(raw, scanNumber); + if (scanData != null && ParseInput.MsLevel.Contains(level)) // Final MS level filter + data.AddRange(scanData); + } + } catch (Exception ex) { @@ -93,7 +98,7 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu // - some row groups might have more than this number of ions // but this ensures that all ions from a single scan are always // present in the same row group (critical property of mzparquet) - if (data.Count >= ParquetRowGroupSize) + if (data.Count >= ParquetSliceSize) { var task = ParquetSerializer.SerializeAsync(data, Writer.BaseStream, opts); task.Wait(); @@ -111,12 +116,17 @@ public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNu Log.Debug("Writing final row group"); } + if (ParseInput.LogFormat == LogFormat.DEFAULT) //Add new line after progress bar + { + Console.WriteLine(); + } + // Release the OS file handle Writer.Flush(); Writer.Close(); } - private void AddScan(IRawDataPlus raw, int scanNumber, List data) + private List ReadScan(IRawDataPlus raw, int scanNumber) { var scanFilter = raw.GetFilterForScanNumber(scanNumber); @@ -265,6 +275,7 @@ private void AddScan(IRawDataPlus raw, int scanNumber, List data) intensities = scan.SegmentedScan.Intensities; } + List scanData = new List(masses.Length); // Add a row to parquet file for every m/z value in this scan for (int i = 0; i < masses.Length; i++) { @@ -280,8 +291,10 @@ private void AddScan(IRawDataPlus raw, int scanNumber, List data) m.precursor_mz = precursor_data.mz; m.precursor_charge = (uint?)trailer_charge; m.ion_mobility = (float?)FAIMSCV; - data.Add(m); + scanData.Add(m); } + + return scanData; } private PrecursorData GetPrecursorData(int precursorScanNumber, IScanEventBase scanEvent,