From 4529c3aaf7abc231c273c1a464f1270cdbd357f6 Mon Sep 17 00:00:00 2001 From: Rohan Garg Date: Wed, 16 Nov 2022 14:41:12 +0530 Subject: [PATCH 1/2] Improve performance for ReadableInputStreamFrameChannel --- .../channel/ReadableInputStreamFrameChannel.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java index d804e9d1cbb9..57b963b19a71 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java @@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel private volatile boolean keepReading = true; + private final Object readMonitor = new Object(); + private final ExecutorService executorService; /** @@ -152,7 +154,9 @@ private void startReading() while (true) { if (!keepReading) { try { - Thread.sleep(nextRetrySleepMillis(nTry)); + synchronized (readMonitor) { + readMonitor.wait(nextRetrySleepMillis(nTry)); + } synchronized (lock) { if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) { return; @@ -186,7 +190,15 @@ private void startReading() totalInputStreamBytesRead += bytesRead; if (backpressureFuture != null) { keepReading = false; - backpressureFuture.addListener(() -> keepReading = true, Execs.directExecutor()); + backpressureFuture.addListener( + () -> { + keepReading = true; + synchronized (readMonitor) { + readMonitor.notify(); + } + }, + Execs.directExecutor() + ); } else { keepReading = true; // continue adding data to delegate From 86a9876d1fd5b6993416c9fbbe7b0176a35da68b Mon Sep 17 00:00:00 2001 From: Rohan Garg Date: Thu, 17 Nov 2022 22:16:14 +0530 Subject: [PATCH 2/2] Fix race condition leading to unnecessary sleep --- .../frame/channel/ReadableInputStreamFrameChannel.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java index 57b963b19a71..f06302b49200 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java @@ -155,7 +155,9 @@ private void startReading() if (!keepReading) { try { synchronized (readMonitor) { - readMonitor.wait(nextRetrySleepMillis(nTry)); + if (!keepReading) { + readMonitor.wait(nextRetrySleepMillis(nTry)); + } } synchronized (lock) { if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) { @@ -192,8 +194,8 @@ private void startReading() keepReading = false; backpressureFuture.addListener( () -> { - keepReading = true; synchronized (readMonitor) { + keepReading = true; readMonitor.notify(); } },