diff --git a/Writer/ParquetSpectrumWriter.cs b/Writer/ParquetSpectrumWriter.cs index 2703286..01d7b20 100644 --- a/Writer/ParquetSpectrumWriter.cs +++ b/Writer/ParquetSpectrumWriter.cs @@ -1,257 +1,192 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Reflection; using log4net; -using Parquet; -using Parquet.Data; +using Parquet.Serialization; using ThermoFisher.CommonCore.Data; using ThermoFisher.CommonCore.Data.Business; using ThermoFisher.CommonCore.Data.FilterEnums; using ThermoFisher.CommonCore.Data.Interfaces; -using ThermoRawFileParser.Writer.MzML; namespace ThermoRawFileParser.Writer { + + struct MzParquet + { + public uint scan; + public uint level; + public float rt; + public float mz; + public float intensity; + public float? ion_mobility; + public float? isolation_lower; + public float? isolation_upper; + public uint? precursor_scan; + public float? precursor_mz; + public uint? precursor_charge; + } + public class ParquetSpectrumWriter : SpectrumWriter { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - private IRawDataPlus _rawFile; - - // Dictionary to keep track of the different mass analyzers (key: Thermo MassAnalyzerType; value: the reference string) - private readonly Dictionary _massAnalyzers = - new Dictionary(); - - private readonly Dictionary _ionizationTypes = - new Dictionary(); - public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput) { } - public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastScanNumber) + public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber) { - _rawFile = rawFile; - if (rawFile.HasMsData) - { - List pScans = new List(); - WritePScans(ParseInput.OutputDirectory, rawFile.FileName, rawFile, pScans); - } - else + if (!raw.HasMsData) { throw new RawFileParserException("No MS data in RAW file, no output will be produced"); } - } - private static void WritePScans(string outputDirectory, string fileName, - IRawDataPlus raw, - List scans) - { var enumerator = raw.GetFilteredScanEnumerator(" "); - foreach (var scanNumber in enumerator - ) // note in my tests serial is faster than Parallel.Foreach() (this involves disk access, so it makes sense) - { - //trailer information is extracted via index - var trailers = raw.GetTrailerExtraValues(scanNumber); - var trailerLabels = raw.GetTrailerExtraInformation(scanNumber); - object chargeState = 0; - for (int i = 0; i < trailerLabels.Labels.Length; i++) - { - if (trailerLabels.Labels[i] == "Charge State:") - { - chargeState = raw.GetTrailerExtraValue(scanNumber, i); - break; - } - } + // NB: replace with more robust strategy? + var output = ParseInput.OutputDirectory + "//" + Path.GetFileNameWithoutExtension(ParseInput.RawFilePath) + ".mzparquet"; + + ParquetSerializerOptions opts = new ParquetSerializerOptions(); + opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest; + opts.CompressionMethod = Parquet.CompressionMethod.Zstd; + + var data = new List(); + + // map last (msOrder - 1) -> scan number (e.g. mapping precursors) + // note, this assumes time dependence of MS1 -> MS2 -> MSN + var last_scans = new Dictionary(); + + foreach (var scanNumber in enumerator) + { var scanFilter = raw.GetFilterForScanNumber(scanNumber); - var scanStats = raw.GetScanStatsForScanNumber(scanNumber); CentroidStream centroidStream = new CentroidStream(); - //check for FT mass analyzer data + // Pull out m/z and intensity values + // NB: is this the best way to do this? if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerFTMS) { centroidStream = raw.GetCentroidStream(scanNumber, false); } - - //check for IT mass analyzer data - if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS) + else if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS) { var scanData = raw.GetSimplifiedScan(scanNumber); centroidStream.Masses = scanData.Masses; centroidStream.Intensities = scanData.Intensities; } - - var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder; - - if (msOrder == MSOrderType.Ms) + else { - var pscan = GetPScan(scanStats, centroidStream, fileName, Convert.ToInt32(chargeState)); - scans.Add(pscan); + var scanData = raw.GetSimplifiedCentroids(scanNumber); + centroidStream.Masses = scanData.Masses; + centroidStream.Intensities = scanData.Intensities; } - if (msOrder == MSOrderType.Ms2) - { - var precursorMz = raw.GetScanEventForScanNumber(scanNumber).GetReaction(0).PrecursorMass; - var pscan = GetPScan(scanStats, centroidStream, fileName, precursorMz, - Convert.ToInt32(chargeState)); - scans.Add(pscan); - } - var t = raw.GetTrailerExtraValues(scanNumber); - } + var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder; - WriteScans(outputDirectory, scans, fileName); - } + last_scans[(int)msOrder] = (uint)scanNumber; - private static PScan GetPScan(ScanStatistics scanStats, CentroidStream centroidStream, - string fileName, double? precursorMz = null, int? precursorCharge = null) - { - var scan = new PScan - { - FileName = fileName, - BasePeakMass = scanStats.BasePeakMass, - ScanType = scanStats.ScanType, - BasePeakIntensity = scanStats.BasePeakIntensity, - PacketType = scanStats.PacketType, - ScanNumber = scanStats.ScanNumber, - RetentionTime = scanStats.StartTime, - Masses = centroidStream.Masses.ToArray(), - Intensities = centroidStream.Intensities.ToArray(), - LowMass = scanStats.LowMass, - HighMass = scanStats.HighMass, - TIC = scanStats.TIC, - FileId = fileName, - PrecursorMz = precursorMz, - PrecursorCharge = precursorCharge, - MsOrder = 1 - }; - return scan; - } + double rt = raw.RetentionTimeFromScanNumber(scanNumber); + float? isolation_lower = null; + float? isolation_upper = null; + uint? precursor_scan = null; + float? precursor_mz = null; + uint? precursor_charge = null; - public static void WriteScans(string outputDirectory, List scans, string sourceRawFileName) - { - throw new NotImplementedException(); + if ((int)msOrder > 1) + { + var rx = scanFilter.GetReaction(0); - //Needs refactoring since Parquet.NET API changed - - //var output = outputDirectory + "//" + Path.GetFileNameWithoutExtension(sourceRawFileName); + // this assumes symmetrical quad window + isolation_lower = (float)(rx.PrecursorMass - rx.IsolationWidth / 2); + isolation_upper = (float)(rx.PrecursorMass + rx.IsolationWidth / 2); + precursor_mz = (float)rx.PrecursorMass; - //var ds = new DataSet(new DataField("BasePeakIntensity"), - // new DataField("BasePeakMass"), - // new DataField("Baselines"), - // new DataField("Charges"), - // new DataField("FileId"), - // new DataField("FileName"), - // new DataField("HighMass"), - // new DataField("Intensities"), - // new DataField("LowMass"), - // new DataField("Masses"), - // new DataField("MsOrder"), - // new DataField("Noises"), - // new DataField("PacketType"), - // new DataField("PrecursorCharge"), - // new DataField("PrecursorMass"), - // new DataField("Resolutions"), - // new DataField("RetentionTime"), - // new DataField("ScanNumber"), - // new DataField("ScanType"), - // new DataField("TIC")); + // Try to retrieve last scan that occurred at the previous msOrder + uint t; + if (last_scans.TryGetValue((int)msOrder - 1, out t)) + { + precursor_scan = t; + } + } - //foreach (var scan in scans) - //{ - // //we can't store null values? - // double[] dummyVal = new double[1]; - // if (scan.Noises == null) - // { - // scan.Noises = dummyVal; - // } + var trailer = raw.GetTrailerExtraInformation(scanNumber); + for (var i = 0l; i < trailer.Length; i++) + { - // if (scan.Charges == null) - // { - // scan.Charges = dummyVal; - // } + if (trailer.Labels[i].StartsWith("Monoisotopic M/Z")) + { + var val = float.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_mz = val; + } + } - // if (scan.Baselines == null) - // { - // scan.Baselines = dummyVal; - // } + // Overwrite precursor_scan with value from trailer, if it exists + if (trailer.Labels[i].StartsWith("Master Scan")) + { + var val = Int64.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_scan = (uint)val; + } + } - // if (scan.Resolutions == null) - // { - // scan.Resolutions = dummyVal; - // } + if (trailer.Labels[i].StartsWith("Charge")) + { + var val = uint.Parse(trailer.Values[i]); + if (val > 0) + { + precursor_charge = val; + } + } + } - // if (scan.PrecursorMz == null) - // { - // scan.PrecursorMz = 0; - // scan.PrecursorCharge = 0; - // } + // Add a row to parquet file for every m/z value in this scan + for (int i = 0; i < centroidStream.Masses.Length; i++) + { + MzParquet m; + m.rt = (float)rt; + m.scan = (uint)scanNumber; + m.level = ((uint)msOrder); + m.intensity = (float)centroidStream.Intensities[i]; + m.mz = (float)centroidStream.Masses[i]; + m.isolation_lower = isolation_lower; + m.isolation_upper = isolation_upper; + m.precursor_scan = precursor_scan; + m.precursor_mz = precursor_mz; + m.precursor_charge = precursor_charge; + m.ion_mobility = null; + data.Add(m); + } + + // If we have enough ions to write a row group, do so + // - some row groups might have more than this number of ions + // but this ensures that all ions from a single scan are always + // present in the same row group (critical property of mzparquet) + if (data.Count >= 1_048_576) + { + var task = ParquetSerializer.SerializeAsync(data, output, opts); + task.Wait(); + opts.Append = true; + data.Clear(); + Log.Info("writing row group"); + } - // ds.Add(scan.BasePeakIntensity, - // scan.BasePeakMass, - // scan.Baselines, - // scan.Charges, - // scan.FileId, - // scan.FileName, - // scan.HighMass, - // scan.Intensities, - // scan.LowMass, - // scan.Masses, - // scan.MsOrder, - // scan.Noises, - // scan.PacketType, - // scan.PrecursorCharge, - // scan.PrecursorMz, - // scan.Resolutions, - // scan.RetentionTime, - // scan.ScanNumber, - // scan.ScanType, - // scan.TIC); - //} + } - //using (Stream fileStream = File.OpenWrite(output + ".parquet")) - //{ - // using (var writer = new ParquetWriter(fileStream)) - // { - // writer.Write(ds); - // } - //} + // serialize any remaining ions into the final row group + if (data.Count > 0) + { + var task = ParquetSerializer.SerializeAsync(data, output, opts); + task.Wait(); + Log.Info("writing row group"); + } } } - /// PSCan meaing Parsec Scan (because Commoncore has a Scan class) - /// - public class PScan - { - /// - /// Unique ID per file (foreign key in data store) - /// - public string FileId { get; set; } - - public string FileName { get; set; } - public double BasePeakIntensity { get; set; } - public double BasePeakMass { get; set; } - public double[] Baselines { get; set; } - public double[] Charges { get; set; } - public double HighMass { get; set; } - public double[] Intensities { get; set; } - public double LowMass { get; set; } - public double[] Masses { get; set; } - public double[] Noises { get; set; } - public int PacketType { get; set; } // : 20, - public double RetentionTime { get; set; } - public double[] Resolutions { get; set; } - public int ScanNumber { get; set; } - public string ScanType { get; set; } // : FTMS + c ESI d Full ms2 335.9267@hcd30.00 [130.0000-346.0000], - public double TIC { get; set; } - public int MsOrder { get; set; } - public double? PrecursorMz { get; set; } - public int? PrecursorCharge { get; set; } - } } \ No newline at end of file