From 961e31392d3dd356f0e018652edb7fc5523a55e1 Mon Sep 17 00:00:00 2001 From: Sam Vohr Date: Thu, 28 Aug 2025 16:30:03 -0400 Subject: [PATCH 1/4] Fix concurrency with writing split output files Introduces a class SplitFileWriterManager that wraps the PrintWriters required for the split file output so they can be accessed concurrently and closed once after IBD detection is complete. --- src/hapibd/PbwtIbd.java | 16 +++----- src/hapibd/PbwtIbdDriver.java | 11 ++++-- src/hapibd/SplitFileWriterManager.java | 52 ++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 14 deletions(-) create mode 100644 src/hapibd/SplitFileWriterManager.java diff --git a/src/hapibd/PbwtIbd.java b/src/hapibd/PbwtIbd.java index 070c17d..ba03d2a 100644 --- a/src/hapibd/PbwtIbd.java +++ b/src/hapibd/PbwtIbd.java @@ -94,6 +94,8 @@ public final class PbwtIbd implements Runnable { private final String outputPrefix; private final String splitFilename; private final boolean split; + private SplitFileWriterManager splitFileWriterManager; + private boolean useSeedQ = false; private final int nWindows; private final IntList seedList; @@ -125,7 +127,8 @@ private static PrintWriter printWriter(ByteArrayOutputStream out) { public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map, int windowStart, int windowEnd, int nWindows, BlockingQueue seedQ, - SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) { + SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS, + SplitFileWriterManager splitFileWriterManager) { if (gt.isPhased()==false) { throw new IllegalArgumentException("unphased data"); } @@ -162,6 +165,7 @@ public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map, this.ibdOS = ibdOS; this.split = par.split(); + this.splitFileWriterManager = splitFileWriterManager; this.outputPrefix = par.out(); this.splitFilename = par.splitFilename(); this.ibdWriters = new ConcurrentHashMap<>(); @@ -205,14 +209,6 @@ public void run() { catch (Throwable t) { Utilities.exit(t); } - finally { - for (PrintWriter writer : ibdWriters.values()) { - writer.close(); - } - for (PrintWriter writer : hbdWriters.values()) { - writer.close(); - } - } } private int advancePbwtToFirstIbsEnd() { @@ -513,7 +509,7 @@ private void writeSegment(int hap1, int hap2, int start, int inclEnd, } if (split) { - PrintWriter splitWriter = getWriterForProxyKey(hap1ProxyKey, type); + PrintWriter splitWriter = splitFileWriterManager.getWriterForProxyKey(hap1ProxyKey, type); synchronized (splitWriter) { printSegment(splitWriter, hap1ProxyKey, hap2ProxyKey, hap1, hap2, start, inclEnd); splitWriter.println(); // flushes line to file diff --git a/src/hapibd/PbwtIbdDriver.java b/src/hapibd/PbwtIbdDriver.java index 7993827..39c75e8 100644 --- a/src/hapibd/PbwtIbdDriver.java +++ b/src/hapibd/PbwtIbdDriver.java @@ -69,7 +69,8 @@ public static long[] detectIbd(HapIbdPar par) { File ibdFile = new File(par.out() + ".ibd"); try (SampleFileIt it = refIt(par); SynchFileOutputStream hbdOS = new SynchFileOutputStream(hbdFile); - SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile)) { + SynchFileOutputStream ibdOS = new SynchFileOutputStream(ibdFile); + SplitFileWriterManager splitFileWriterManager = par.split() ? new SplitFileWriterManager(par) : null ){ try { nSamplesAndMarkers[0] = it.samples().nSamples(); List recList = new ArrayList<>(1<<14); @@ -80,7 +81,7 @@ public static long[] detectIbd(HapIbdPar par) { if (recList.isEmpty()==false) { RefGT gt = new RefGT(recList.toArray(new RefGTRec[0])); MarkerMap map = MarkerMap.create(genMap, gt.markers()); - PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS); + PbwtIbdDriver.detectIBD(par, gt, map, hbdOS, ibdOS, splitFileWriterManager); nSamplesAndMarkers[1] += gt.nMarkers(); } } @@ -90,11 +91,13 @@ public static long[] detectIbd(HapIbdPar par) { } catch (IOException ex) { Utilities.exit(ex); } + return nSamplesAndMarkers; } private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map, - SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS) { + SynchFileOutputStream hbdOS, SynchFileOutputStream ibdOS, + SplitFileWriterManager splitFileWriterManager) { float minSeed = par.min_seed(); int minMarkers = par.min_markers(); double[] genPos = map.genPos().toArray(); @@ -106,7 +109,7 @@ private static void detectIBD(HapIbdPar par, RefGT gt, MarkerMap map, ExecutorService execService = Executors.newFixedThreadPool(starts.length); for (int j=0; j ibdWriters; + private final Map hbdWriters; + + public SplitFileWriterManager(HapIbdPar par) { + this.outputPrefix = par.out(); + this.splitFilename = par.splitFilename(); + this.ibdWriters = new ConcurrentHashMap<>(); + this.hbdWriters = new ConcurrentHashMap<>(); + } + + public PrintWriter getWriterForProxyKey(int proxyKey, String type) { + Map writers = type.equals("ibd") ? ibdWriters : hbdWriters; + return writers.computeIfAbsent(proxyKey, key -> { + try { + String dirPath = outputPrefix + "/" + key; + File dir = new File(dirPath); + if (!dir.mkdirs() && !dir.isDirectory()) { + Utilities.exit("ERROR: Failed to create directory " + dirPath); + } + String filename = dirPath + "/" + splitFilename + "." + type; + return new PrintWriter(new File(filename)); + } + catch (IOException e) { + Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e); + return null; // This will never be reached due to Utilities.exit + } + }); + } + + public synchronized void close() { + for (PrintWriter writer : ibdWriters.values()) { + writer.close(); + } + for (PrintWriter writer : hbdWriters.values()) { + writer.close(); + } + } +} From d99811a34e001c3ac2ba7990c26aef6c42c3bd81 Mon Sep 17 00:00:00 2001 From: Sam Vohr Date: Thu, 18 Sep 2025 16:56:15 -0400 Subject: [PATCH 2/4] Remove old ibd and hbd writers --- src/hapibd/PbwtIbd.java | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/src/hapibd/PbwtIbd.java b/src/hapibd/PbwtIbd.java index ba03d2a..cc0da69 100644 --- a/src/hapibd/PbwtIbd.java +++ b/src/hapibd/PbwtIbd.java @@ -89,8 +89,6 @@ public final class PbwtIbd implements Runnable { private PrintWriter hbdOut = printWriter(hbdBaos); private PrintWriter ibdOut = printWriter(ibdBaos); - private final Map ibdWriters; - private final Map hbdWriters; private final String outputPrefix; private final String splitFilename; private final boolean split; @@ -168,8 +166,6 @@ public PbwtIbd(HapIbdPar par, RefGT gt, MarkerMap map, this.splitFileWriterManager = splitFileWriterManager; this.outputPrefix = par.out(); this.splitFilename = par.splitFilename(); - this.ibdWriters = new ConcurrentHashMap<>(); - this.hbdWriters = new ConcurrentHashMap<>(); this.pbwt = new PbwtUpdater(nHaps); this.a = IntStream.range(0, nHaps).toArray(); @@ -474,25 +470,6 @@ private void flushIbdBuffer(int byteThreshold) { } } - private PrintWriter getWriterForProxyKey(int proxyKey, String type) { - Map writers = type.equals("ibd") ? ibdWriters : hbdWriters; - return writers.computeIfAbsent(proxyKey, key -> { - try { - String dirPath = outputPrefix + "/" + key; - File dir = new File(dirPath); - if (!dir.mkdirs() && !dir.isDirectory()) { - Utilities.exit("ERROR: Failed to create directory " + dirPath); - } - String filename = dirPath + "/" + splitFilename + "." + type; - return new PrintWriter(new File(filename)); - } - catch (IOException e) { - Utilities.exit("ERROR creating " + type + " file for proxy key " + key + ": ", e); - return null; // This will never be reached due to Utilities.exit - } - }); - } - private void writeSegment(int hap1, int hap2, int start, int inclEnd, PrintWriter out, String type) { // At Embark, the new dog, ie the higher proxy key, comes first From 70663d35818108cc1fb4afd159275673de87a880 Mon Sep 17 00:00:00 2001 From: Sam Vohr Date: Fri, 19 Sep 2025 11:53:35 -0400 Subject: [PATCH 3/4] Use enum for IBD file types --- src/hapibd/PbwtIbd.java | 7 ++++--- src/hapibd/SplitFileWriterManager.java | 10 +++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/hapibd/PbwtIbd.java b/src/hapibd/PbwtIbd.java index cc0da69..768ed2e 100644 --- a/src/hapibd/PbwtIbd.java +++ b/src/hapibd/PbwtIbd.java @@ -40,6 +40,7 @@ import vcf.RefGT; import vcf.Samples; + /** *

Instances of class {@code PbwtIbd} detect IBS segments in phased * genotype data.

@@ -329,11 +330,11 @@ private void processSeed(int hap1, int hap2, int start, int inclEnd) { inclEnd = extendInclEnd(hap1, hap2, inclEnd); if ((genPos[inclEnd] - genPos[start])>=minOutput) { if ((hap1>>1)==(hap2>>1)) { - writeSegment(hap1, hap2, start, inclEnd, hbdOut, "hbd"); + writeSegment(hap1, hap2, start, inclEnd, hbdOut, SplitFileWriterManager.MatchType.HBD); N_HBD_SEGS.incrementAndGet(); } else { - writeSegment(hap1, hap2, start, inclEnd, ibdOut, "ibd"); + writeSegment(hap1, hap2, start, inclEnd, ibdOut, SplitFileWriterManager.MatchType.IBD); N_IBD_SEGS.incrementAndGet(); } } @@ -471,7 +472,7 @@ private void flushIbdBuffer(int byteThreshold) { } private void writeSegment(int hap1, int hap2, int start, int inclEnd, - PrintWriter out, String type) { + PrintWriter out, SplitFileWriterManager.MatchType type) { // At Embark, the new dog, ie the higher proxy key, comes first if (Integer.parseInt(ids[hap1>>1]) < Integer.parseInt(ids[hap2>>1])) { int tmp = hap1; diff --git a/src/hapibd/SplitFileWriterManager.java b/src/hapibd/SplitFileWriterManager.java index 5f44b3e..b3ce11f 100644 --- a/src/hapibd/SplitFileWriterManager.java +++ b/src/hapibd/SplitFileWriterManager.java @@ -8,7 +8,11 @@ import blbutil.Utilities; + public class SplitFileWriterManager implements AutoCloseable { + + public static enum MatchType {HBD, IBD}; + private final String outputPrefix; private final String splitFilename; @@ -22,8 +26,8 @@ public SplitFileWriterManager(HapIbdPar par) { this.hbdWriters = new ConcurrentHashMap<>(); } - public PrintWriter getWriterForProxyKey(int proxyKey, String type) { - Map writers = type.equals("ibd") ? ibdWriters : hbdWriters; + public PrintWriter getWriterForProxyKey(int proxyKey, MatchType type) { + Map writers = type.equals(MatchType.IBD) ? ibdWriters : hbdWriters; return writers.computeIfAbsent(proxyKey, key -> { try { String dirPath = outputPrefix + "/" + key; @@ -31,7 +35,7 @@ public PrintWriter getWriterForProxyKey(int proxyKey, String type) { if (!dir.mkdirs() && !dir.isDirectory()) { Utilities.exit("ERROR: Failed to create directory " + dirPath); } - String filename = dirPath + "/" + splitFilename + "." + type; + String filename = dirPath + "/" + splitFilename + "." + type.toString().toLowerCase(); return new PrintWriter(new File(filename)); } catch (IOException e) { From f1e58988fc1484ccf487908b7b86c985c8ac65ed Mon Sep 17 00:00:00 2001 From: Sam Vohr Date: Fri, 19 Sep 2025 12:36:58 -0400 Subject: [PATCH 4/4] Clean up --- src/hapibd/PbwtIbd.java | 1 - src/hapibd/PbwtIbdDriver.java | 1 - 2 files changed, 2 deletions(-) diff --git a/src/hapibd/PbwtIbd.java b/src/hapibd/PbwtIbd.java index 768ed2e..c4d26ee 100644 --- a/src/hapibd/PbwtIbd.java +++ b/src/hapibd/PbwtIbd.java @@ -40,7 +40,6 @@ import vcf.RefGT; import vcf.Samples; - /** *

Instances of class {@code PbwtIbd} detect IBS segments in phased * genotype data.

diff --git a/src/hapibd/PbwtIbdDriver.java b/src/hapibd/PbwtIbdDriver.java index 39c75e8..7801c96 100644 --- a/src/hapibd/PbwtIbdDriver.java +++ b/src/hapibd/PbwtIbdDriver.java @@ -91,7 +91,6 @@ public static long[] detectIbd(HapIbdPar par) { } catch (IOException ex) { Utilities.exit(ex); } - return nSamplesAndMarkers; }