Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 130 additions & 195 deletions Writer/ParquetSpectrumWriter.cs
Original file line number Diff line number Diff line change
@@ -1,257 +1,192 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using log4net;
using Parquet;
using Parquet.Data;
using Parquet.Serialization;
using ThermoFisher.CommonCore.Data;
using ThermoFisher.CommonCore.Data.Business;
using ThermoFisher.CommonCore.Data.FilterEnums;
using ThermoFisher.CommonCore.Data.Interfaces;
using ThermoRawFileParser.Writer.MzML;

namespace ThermoRawFileParser.Writer
{

struct MzParquet
{
public uint scan;
public uint level;
public float rt;
public float mz;
public float intensity;
public float? ion_mobility;
public float? isolation_lower;
public float? isolation_upper;
public uint? precursor_scan;
public float? precursor_mz;
public uint? precursor_charge;
}

public class ParquetSpectrumWriter : SpectrumWriter
{
private static readonly ILog Log =
LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

private IRawDataPlus _rawFile;

// Dictionary to keep track of the different mass analyzers (key: Thermo MassAnalyzerType; value: the reference string)
private readonly Dictionary<MassAnalyzerType, string> _massAnalyzers =
new Dictionary<MassAnalyzerType, string>();

private readonly Dictionary<IonizationModeType, CVParamType> _ionizationTypes =
new Dictionary<IonizationModeType, CVParamType>();

public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput)
{
}

public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastScanNumber)
public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber)
{
_rawFile = rawFile;
if (rawFile.HasMsData)
{
List<PScan> pScans = new List<PScan>();
WritePScans(ParseInput.OutputDirectory, rawFile.FileName, rawFile, pScans);
}
else
if (!raw.HasMsData)
{
throw new RawFileParserException("No MS data in RAW file, no output will be produced");
}
}

private static void WritePScans(string outputDirectory, string fileName,
IRawDataPlus raw,
List<PScan> scans)
{
var enumerator = raw.GetFilteredScanEnumerator(" ");

foreach (var scanNumber in enumerator
) // note in my tests serial is faster than Parallel.Foreach() (this involves disk access, so it makes sense)
{
//trailer information is extracted via index
var trailers = raw.GetTrailerExtraValues(scanNumber);
var trailerLabels = raw.GetTrailerExtraInformation(scanNumber);
object chargeState = 0;
for (int i = 0; i < trailerLabels.Labels.Length; i++)
{
if (trailerLabels.Labels[i] == "Charge State:")
{
chargeState = raw.GetTrailerExtraValue(scanNumber, i);
break;
}
}
// NB: replace with more robust strategy?
var output = ParseInput.OutputDirectory + "//" + Path.GetFileNameWithoutExtension(ParseInput.RawFilePath) + ".mzparquet";

ParquetSerializerOptions opts = new ParquetSerializerOptions();
opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest;
opts.CompressionMethod = Parquet.CompressionMethod.Zstd;

var data = new List<MzParquet>();

// map last (msOrder - 1) -> scan number (e.g. mapping precursors)
// note, this assumes time dependence of MS1 -> MS2 -> MSN
var last_scans = new Dictionary<int, uint>();


foreach (var scanNumber in enumerator)
{
var scanFilter = raw.GetFilterForScanNumber(scanNumber);
var scanStats = raw.GetScanStatsForScanNumber(scanNumber);

CentroidStream centroidStream = new CentroidStream();

//check for FT mass analyzer data
// Pull out m/z and intensity values
// NB: is this the best way to do this?
if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerFTMS)
{
centroidStream = raw.GetCentroidStream(scanNumber, false);
}

//check for IT mass analyzer data
if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS)
else if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS)
{
var scanData = raw.GetSimplifiedScan(scanNumber);
centroidStream.Masses = scanData.Masses;
centroidStream.Intensities = scanData.Intensities;
}

var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder;

if (msOrder == MSOrderType.Ms)
else
{
var pscan = GetPScan(scanStats, centroidStream, fileName, Convert.ToInt32(chargeState));
scans.Add(pscan);
var scanData = raw.GetSimplifiedCentroids(scanNumber);
centroidStream.Masses = scanData.Masses;
centroidStream.Intensities = scanData.Intensities;
}

if (msOrder == MSOrderType.Ms2)
{
var precursorMz = raw.GetScanEventForScanNumber(scanNumber).GetReaction(0).PrecursorMass;
var pscan = GetPScan(scanStats, centroidStream, fileName, precursorMz,
Convert.ToInt32(chargeState));
scans.Add(pscan);
}

var t = raw.GetTrailerExtraValues(scanNumber);
}
var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder;

WriteScans(outputDirectory, scans, fileName);
}
last_scans[(int)msOrder] = (uint)scanNumber;

private static PScan GetPScan(ScanStatistics scanStats, CentroidStream centroidStream,
string fileName, double? precursorMz = null, int? precursorCharge = null)
{
var scan = new PScan
{
FileName = fileName,
BasePeakMass = scanStats.BasePeakMass,
ScanType = scanStats.ScanType,
BasePeakIntensity = scanStats.BasePeakIntensity,
PacketType = scanStats.PacketType,
ScanNumber = scanStats.ScanNumber,
RetentionTime = scanStats.StartTime,
Masses = centroidStream.Masses.ToArray(),
Intensities = centroidStream.Intensities.ToArray(),
LowMass = scanStats.LowMass,
HighMass = scanStats.HighMass,
TIC = scanStats.TIC,
FileId = fileName,
PrecursorMz = precursorMz,
PrecursorCharge = precursorCharge,
MsOrder = 1
};
return scan;
}
double rt = raw.RetentionTimeFromScanNumber(scanNumber);
float? isolation_lower = null;
float? isolation_upper = null;
uint? precursor_scan = null;
float? precursor_mz = null;
uint? precursor_charge = null;

public static void WriteScans(string outputDirectory, List<PScan> scans, string sourceRawFileName)
{
throw new NotImplementedException();
if ((int)msOrder > 1)
{
var rx = scanFilter.GetReaction(0);

//Needs refactoring since Parquet.NET API changed

//var output = outputDirectory + "//" + Path.GetFileNameWithoutExtension(sourceRawFileName);
// this assumes symmetrical quad window
isolation_lower = (float)(rx.PrecursorMass - rx.IsolationWidth / 2);
isolation_upper = (float)(rx.PrecursorMass + rx.IsolationWidth / 2);
precursor_mz = (float)rx.PrecursorMass;

//var ds = new DataSet(new DataField<double>("BasePeakIntensity"),
// new DataField<double>("BasePeakMass"),
// new DataField<double[]>("Baselines"),
// new DataField<double[]>("Charges"),
// new DataField<string>("FileId"),
// new DataField<string>("FileName"),
// new DataField<double>("HighMass"),
// new DataField<double[]>("Intensities"),
// new DataField<double>("LowMass"),
// new DataField<double[]>("Masses"),
// new DataField<int>("MsOrder"),
// new DataField<double[]>("Noises"),
// new DataField<int>("PacketType"),
// new DataField<int?>("PrecursorCharge"),
// new DataField<double?>("PrecursorMass"),
// new DataField<double[]>("Resolutions"),
// new DataField<double>("RetentionTime"),
// new DataField<int>("ScanNumber"),
// new DataField<string>("ScanType"),
// new DataField<double>("TIC"));
// Try to retrieve last scan that occurred at the previous msOrder
uint t;
if (last_scans.TryGetValue((int)msOrder - 1, out t))
{
precursor_scan = t;
}
}

//foreach (var scan in scans)
//{
// //we can't store null values?
// double[] dummyVal = new double[1];
// if (scan.Noises == null)
// {
// scan.Noises = dummyVal;
// }
var trailer = raw.GetTrailerExtraInformation(scanNumber);
for (var i = 0l; i < trailer.Length; i++)
{

// if (scan.Charges == null)
// {
// scan.Charges = dummyVal;
// }
if (trailer.Labels[i].StartsWith("Monoisotopic M/Z"))
{
var val = float.Parse(trailer.Values[i]);
if (val > 0)
{
precursor_mz = val;
}
}

// if (scan.Baselines == null)
// {
// scan.Baselines = dummyVal;
// }
// Overwrite precursor_scan with value from trailer, if it exists
if (trailer.Labels[i].StartsWith("Master Scan"))
{
var val = Int64.Parse(trailer.Values[i]);
if (val > 0)
{
precursor_scan = (uint)val;
}
}

// if (scan.Resolutions == null)
// {
// scan.Resolutions = dummyVal;
// }
if (trailer.Labels[i].StartsWith("Charge"))
{
var val = uint.Parse(trailer.Values[i]);
if (val > 0)
{
precursor_charge = val;
}
}
}

// if (scan.PrecursorMz == null)
// {
// scan.PrecursorMz = 0;
// scan.PrecursorCharge = 0;
// }
// Add a row to parquet file for every m/z value in this scan
for (int i = 0; i < centroidStream.Masses.Length; i++)
{
MzParquet m;
m.rt = (float)rt;
m.scan = (uint)scanNumber;
m.level = ((uint)msOrder);
m.intensity = (float)centroidStream.Intensities[i];
m.mz = (float)centroidStream.Masses[i];
m.isolation_lower = isolation_lower;
m.isolation_upper = isolation_upper;
m.precursor_scan = precursor_scan;
m.precursor_mz = precursor_mz;
m.precursor_charge = precursor_charge;
m.ion_mobility = null;
data.Add(m);
}

// If we have enough ions to write a row group, do so
// - some row groups might have more than this number of ions
// but this ensures that all ions from a single scan are always
// present in the same row group (critical property of mzparquet)
if (data.Count >= 1_048_576)
{
var task = ParquetSerializer.SerializeAsync(data, output, opts);
task.Wait();
opts.Append = true;
data.Clear();
Log.Info("writing row group");
}

// ds.Add(scan.BasePeakIntensity,
// scan.BasePeakMass,
// scan.Baselines,
// scan.Charges,
// scan.FileId,
// scan.FileName,
// scan.HighMass,
// scan.Intensities,
// scan.LowMass,
// scan.Masses,
// scan.MsOrder,
// scan.Noises,
// scan.PacketType,
// scan.PrecursorCharge,
// scan.PrecursorMz,
// scan.Resolutions,
// scan.RetentionTime,
// scan.ScanNumber,
// scan.ScanType,
// scan.TIC);
//}
}

//using (Stream fileStream = File.OpenWrite(output + ".parquet"))
//{
// using (var writer = new ParquetWriter(fileStream))
// {
// writer.Write(ds);
// }
//}
// serialize any remaining ions into the final row group
if (data.Count > 0)
{
var task = ParquetSerializer.SerializeAsync(data, output, opts);
task.Wait();
Log.Info("writing row group");
}
}
}

/// PSCan meaing Parsec Scan (because Commoncore has a Scan class)
/// </summary>
public class PScan
{
/// <summary>
/// Unique ID per file (foreign key in data store)
/// </summary>
public string FileId { get; set; }

public string FileName { get; set; }
public double BasePeakIntensity { get; set; }
public double BasePeakMass { get; set; }
public double[] Baselines { get; set; }
public double[] Charges { get; set; }
public double HighMass { get; set; }
public double[] Intensities { get; set; }
public double LowMass { get; set; }
public double[] Masses { get; set; }
public double[] Noises { get; set; }
public int PacketType { get; set; } // : 20,
public double RetentionTime { get; set; }
public double[] Resolutions { get; set; }
public int ScanNumber { get; set; }
public string ScanType { get; set; } // : FTMS + c ESI d Full ms2 335.9267@hcd30.00 [130.0000-346.0000],
public double TIC { get; set; }
public int MsOrder { get; set; }
public double? PrecursorMz { get; set; }
public int? PrecursorCharge { get; set; }
}
}