From 8cdb8fbd22ec4e1d806417ef28d8a69fcdf6a1a5 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 4 May 2016 18:34:20 -0700 Subject: [PATCH 1/5] - after shutting down, wait for the threads to actually finish - handle a possible race between poll thread and close(). --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 4f353df865ff..b29c45851d50 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -81,6 +81,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -686,7 +687,7 @@ private static class UnboundedKafkaReader extends UnboundedReader> availableRecordsQueue = new SynchronousQueue<>(); - private volatile boolean closed = false; + private AtomicBoolean closed = new AtomicBoolean(false); // Backlog support : // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd() @@ -792,7 +793,7 @@ public PartitionState apply(TopicPartition tp) { private void consumerPollLoop() { // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue - while (!closed) { + while (!closed.get()) { try { ConsumerRecords records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); if (!records.isEmpty()) { @@ -1041,11 +1042,26 @@ public long getSplitBacklogBytes() { @Override public void close() throws IOException { - closed = true; - availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread. - consumer.wakeup(); + closed.set(true); consumerPollThread.shutdown(); offsetFetcherThread.shutdown(); + while (true) { + // drain unread batch, this unblocks consumer thread. trying this in a loop to + // handle a small race where poll thread might try to enqueue after we drain. + consumer.wakeup(); + availableRecordsQueue.poll(); + try { + if (consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) + && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS)) { + break; // done + } + } catch (InterruptedException e) { + throw new RuntimeException(e); // not expected + } + + LOG.warn("An internal thread is taking a long time to shutdown. will retry."); + } + Closeables.close(offsetConsumer, true); Closeables.close(consumer, true); } From 932fdb06cf4ffc3e9377c40ddb48c7d59ccea7e6 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 4 May 2016 18:45:35 -0700 Subject: [PATCH 2/5] check closed flag to reduce race window further --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index b29c45851d50..ea37057e78b1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -796,7 +796,7 @@ private void consumerPollLoop() { while (!closed.get()) { try { ConsumerRecords records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); - if (!records.isEmpty()) { + if (!records.isEmpty() && !closed.get()) { availableRecordsQueue.put(records); // blocks until dequeued. } } catch (InterruptedException e) { From 5afbc01f7b3ffbc5add6bf3a1e3bffb80285044d Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 5 May 2016 10:07:53 -0700 Subject: [PATCH 3/5] rearrange comments and the while loop a bit --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ea37057e78b1..6bbac4e385b6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1045,21 +1045,26 @@ public void close() throws IOException { closed.set(true); consumerPollThread.shutdown(); offsetFetcherThread.shutdown(); - while (true) { - // drain unread batch, this unblocks consumer thread. trying this in a loop to - // handle a small race where poll thread might try to enqueue after we drain. + + boolean isShutdown = false; + + // Wait for threads to shutdown. Trying this a loop to handle a tiny race where poll thread + // might block to enqueue right after availableRecordsQueue.poll() below. + while (!isShutdown) { + consumer.wakeup(); - availableRecordsQueue.poll(); + offsetConsumer.wakeup(); + availableRecordsQueue.poll();// drain unread batch, this unblocks consumer thread. try { - if (consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) - && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS)) { - break; // done - } + isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) + && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); // not expected } - LOG.warn("An internal thread is taking a long time to shutdown. will retry."); + if (!isShutdown) { + LOG.warn("An internal thread is taking a long time to shutdown. will retry."); + } } Closeables.close(offsetConsumer, true); From 67ec4ccaf84486a5b309e43d563600b32b9718b9 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 5 May 2016 10:20:37 -0700 Subject: [PATCH 4/5] fix checkstyle error --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6bbac4e385b6..cd197b270517 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1054,7 +1054,7 @@ public void close() throws IOException { consumer.wakeup(); offsetConsumer.wakeup(); - availableRecordsQueue.poll();// drain unread batch, this unblocks consumer thread. + availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread. try { isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS); From aaf076f50aa89ec787a329892409e31abbeab7c7 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 5 May 2016 11:41:03 -0700 Subject: [PATCH 5/5] set interrupt flag --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index cd197b270517..3d228242b8a5 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -818,6 +818,7 @@ private void nextBatch() { records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.warn("{}: Unexpected", this, e); return; } @@ -1059,6 +1060,7 @@ public void close() throws IOException { isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS) && offsetFetcherThread.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); // not expected }