newRecordKeysSorted = new PriorityQueue<>();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index 9ab44d0f62f1b..ebbc7a5c28ea1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -28,11 +28,14 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* A HoodieCreateHandle which writes all data into a single file.
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
+@NotThreadSafe
public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 957a0ff52e91d..f2536378807d6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -31,13 +31,18 @@
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the canWrite()
+ *
+ * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
+@NotThreadSafe
public class HoodieParquetWriter
extends ParquetWriter implements HoodieFileWriter {
@@ -107,4 +112,9 @@ public void writeAvro(String key, IndexedRecord object) throws IOException {
writeSupport.add(key);
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 2b4a5d1608eec..2b42ba8aba2bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -99,13 +99,16 @@ public void runMerge(HoodieTable>, HoodieData computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 38d4e60f648ec..31312655251ab 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index f91dd5019a275..9821aedc875cd 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -74,6 +74,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 7878d857761ea..46dd30a7cb773 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -102,13 +102,16 @@ public void runMerge(HoodieTable>, List, List
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index a8a9e49c01c00..df5bd2d3f458c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -95,6 +95,7 @@ protected List computeNext() {
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index e3d0e9b3c69d4..96ac794dcbc82 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -80,10 +80,11 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle, Path so
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index d07ea771bc557..5f45629ba8023 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -68,9 +68,9 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException {
void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor wrapper = null;
+ ParquetReader reader =
+ AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
- ParquetReader reader =
- AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
@@ -84,10 +84,12 @@ void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
+ reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 91f9cbc96e6ed..386b5c5e2cd75 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -28,6 +28,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
@@ -35,7 +36,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Iterator;
import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import scala.Tuple2;
@@ -44,6 +55,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -105,6 +117,7 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
@@ -152,6 +165,144 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
+ }
+ }
+ }
+
+ @Test
+ public void testExecutorTermination() throws ExecutionException, InterruptedException {
+ // HUDI-2875: sleep time in this UT is designed deliberately. It represents the case that
+ // consumer is slower than producer and the queue connecting them is non-empty.
+ // firstly test a nonSafe usage
+ ExecutorService executionThread = Executors.newSingleThreadExecutor();
+ Future testResult = executionThread.submit(new ExecutorConcurrentUsageTask(false));
+ // let executor run some time
+ sleepUninterruptibly(2 * 1000);
+ executionThread.shutdownNow();
+ boolean concurrentSafe = !testResult.get();
+ assertFalse(concurrentSafe, "Should find concurrent issue");
+ // test a thread safe usage
+ executionThread = Executors.newSingleThreadExecutor();
+ testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
+ sleepUninterruptibly(2 * 1000);
+ executionThread.shutdownNow();
+ concurrentSafe = !testResult.get();
+ assertTrue(concurrentSafe, "Should not find concurrent issue");
+ }
+
+ private static void sleepUninterruptibly(int milliseconds) {
+ long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
+ long end = System.nanoTime() + remainingNanos;
+ while (true) {
+ try {
+ TimeUnit.NANOSECONDS.sleep(remainingNanos);
+ return;
+ } catch (InterruptedException interruptedException) {
+ remainingNanos = end - System.nanoTime();
+ }
+ }
+ }
+
+ private class ExecutorConcurrentUsageTask implements Callable {
+ private final boolean correct;
+
+ private ExecutorConcurrentUsageTask(boolean correct) {
+ this.correct = correct;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 * 1024);
+
+ Iterator unboundedRecordIter = new Iterator() {
+ private final Random random = new Random();
+ private final HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public GenericRecord next() {
+ String randomStr = UUID.randomUUID().toString();
+ return dataGenerator.generateRecordForTripSchema(randomStr, randomStr, randomStr, random.nextLong());
+ }
+ };
+
+ NonThreadSafeConsumer nonThreadSafeConsumer = new NonThreadSafeConsumer();
+ BoundedInMemoryExecutor, Integer> executor = null;
+ try {
+ executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, nonThreadSafeConsumer,
+ rec -> rec, getPreExecuteRunnable());
+ executor.execute();
+ } catch (Exception e) {
+ if (!(e instanceof HoodieException) || !(e.getCause() instanceof InterruptedException)) {
+ fail("Unexpected exception thrown here: ", e);
+ }
+ } finally {
+ // here we simulate correct order to close executor and consumer
+ if (correct) {
+ if (executor != null) {
+ executor.shutdownNow();
+ executor.awaitTermination();
+ }
+ nonThreadSafeConsumer.close(2);
+ } else {
+ // here we simulate incorrect order to close executor and consumer
+ nonThreadSafeConsumer.close(2);
+ if (executor != null) {
+ executor.shutdownNow();
+ executor.awaitTermination();
+ }
+ }
+ }
+ return nonThreadSafeConsumer.foundConcurrentUsage;
+ }
+ }
+
+ private static class NonThreadSafeConsumer extends BoundedInMemoryQueueConsumer {
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean foundConcurrentUsage = false;
+
+ @Override
+ protected void consumeOneRecord(GenericRecord record) {
+ boolean getLock = lock.tryLock();
+ if (!getLock) {
+ foundConcurrentUsage = true;
+ }
+ if (getLock) {
+ try {
+ // simulate write avro into parquet. It is slower than the speed producer produce.
+ sleepUninterruptibly(10);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ protected void finish() {
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+
+ public void close(int seconds) {
+ boolean getLock = lock.tryLock();
+ if (!getLock) {
+ foundConcurrentUsage = true;
+ }
+ if (getLock) {
+ try {
+ sleepUninterruptibly(seconds * 1000);
+ } finally {
+ lock.unlock();
+ }
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index d1e5e66083196..9677335157b0d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -37,6 +37,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,7 +49,7 @@
public class BoundedInMemoryExecutor {
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
-
+ private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread.
private final ExecutorService producerExecutorService;
// Executor service used for launching read thread.
@@ -168,6 +169,35 @@ public boolean isRemaining() {
public void shutdownNow() {
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
+ // close queue to force producer stop
+ queue.close();
+ }
+
+ public boolean awaitTermination() {
+ boolean interruptedBefore = Thread.currentThread().isInterrupted();
+ boolean producerTerminated = false;
+ boolean consumerTerminated = false;
+ try {
+ producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ return producerTerminated && consumerTerminated;
+ } catch (InterruptedException ie) {
+ if (!interruptedBefore) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ // if current thread has been interrupted before awaitTermination was called.
+ // We still give executorService a chance to wait termination as
+ // what is wanted to be interrupted may not be the waiting process.
+ try {
+ producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException expected) {
+ // awaiting process is interrupted again.
+ }
+ Thread.currentThread().interrupt();
+ }
+ return producerTerminated && consumerTerminated;
}
public BoundedInMemoryQueue getQueue() {
From bd04a51bda81fe0c614da9a0143105a72b054812 Mon Sep 17 00:00:00 2001
From: guanziyue
Date: Sun, 20 Mar 2022 16:51:03 +0800
Subject: [PATCH 2/3] rebase master and change the UT
---
.../TestBoundedInMemoryExecutorInSpark.java | 168 ++++--------------
.../util/queue/BoundedInMemoryExecutor.java | 22 +--
2 files changed, 38 insertions(+), 152 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 386b5c5e2cd75..a714d60d0033a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -38,15 +38,6 @@
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import scala.Tuple2;
@@ -55,7 +46,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -171,139 +161,43 @@ protected Integer getResult() {
}
@Test
- public void testExecutorTermination() throws ExecutionException, InterruptedException {
- // HUDI-2875: sleep time in this UT is designed deliberately. It represents the case that
- // consumer is slower than producer and the queue connecting them is non-empty.
- // firstly test a nonSafe usage
- ExecutorService executionThread = Executors.newSingleThreadExecutor();
- Future testResult = executionThread.submit(new ExecutorConcurrentUsageTask(false));
- // let executor run some time
- sleepUninterruptibly(2 * 1000);
- executionThread.shutdownNow();
- boolean concurrentSafe = !testResult.get();
- assertFalse(concurrentSafe, "Should find concurrent issue");
- // test a thread safe usage
- executionThread = Executors.newSingleThreadExecutor();
- testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
- sleepUninterruptibly(2 * 1000);
- executionThread.shutdownNow();
- concurrentSafe = !testResult.get();
- assertTrue(concurrentSafe, "Should not find concurrent issue");
- }
-
- private static void sleepUninterruptibly(int milliseconds) {
- long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
- long end = System.nanoTime() + remainingNanos;
- while (true) {
- try {
- TimeUnit.NANOSECONDS.sleep(remainingNanos);
- return;
- } catch (InterruptedException interruptedException) {
- remainingNanos = end - System.nanoTime();
+ public void testExecutorTermination() {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
+ Iterator unboundedRecordIter = new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return true;
}
- }
- }
-
- private class ExecutorConcurrentUsageTask implements Callable {
- private final boolean correct;
-
- private ExecutorConcurrentUsageTask(boolean correct) {
- this.correct = correct;
- }
- @Override
- public Boolean call() throws Exception {
- HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
- when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 * 1024);
-
- Iterator unboundedRecordIter = new Iterator() {
- private final Random random = new Random();
- private final HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-
- @Override
- public boolean hasNext() {
- return true;
- }
-
- @Override
- public GenericRecord next() {
- String randomStr = UUID.randomUUID().toString();
- return dataGenerator.generateRecordForTripSchema(randomStr, randomStr, randomStr, random.nextLong());
- }
- };
-
- NonThreadSafeConsumer nonThreadSafeConsumer = new NonThreadSafeConsumer();
- BoundedInMemoryExecutor, Integer> executor = null;
- try {
- executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, nonThreadSafeConsumer,
- rec -> rec, getPreExecuteRunnable());
- executor.execute();
- } catch (Exception e) {
- if (!(e instanceof HoodieException) || !(e.getCause() instanceof InterruptedException)) {
- fail("Unexpected exception thrown here: ", e);
- }
- } finally {
- // here we simulate correct order to close executor and consumer
- if (correct) {
- if (executor != null) {
- executor.shutdownNow();
- executor.awaitTermination();
- }
- nonThreadSafeConsumer.close(2);
- } else {
- // here we simulate incorrect order to close executor and consumer
- nonThreadSafeConsumer.close(2);
- if (executor != null) {
- executor.shutdownNow();
- executor.awaitTermination();
- }
- }
+ @Override
+ public GenericRecord next() {
+ return dataGen.generateGenericRecord();
}
- return nonThreadSafeConsumer.foundConcurrentUsage;
- }
- }
-
- private static class NonThreadSafeConsumer extends BoundedInMemoryQueueConsumer {
- private final ReentrantLock lock = new ReentrantLock();
- private boolean foundConcurrentUsage = false;
+ };
- @Override
- protected void consumeOneRecord(GenericRecord record) {
- boolean getLock = lock.tryLock();
- if (!getLock) {
- foundConcurrentUsage = true;
- }
- if (getLock) {
- try {
- // simulate write avro into parquet. It is slower than the speed producer produce.
- sleepUninterruptibly(10);
- } finally {
- lock.unlock();
- }
- }
- }
+ BoundedInMemoryQueueConsumer, Integer> consumer =
+ new BoundedInMemoryQueueConsumer, Integer>() {
+ @Override
+ protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) {
+ }
- @Override
- protected void finish() {
- }
+ @Override
+ protected void finish() {
+ }
- @Override
- protected Integer getResult() {
- return 0;
- }
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
- public void close(int seconds) {
- boolean getLock = lock.tryLock();
- if (!getLock) {
- foundConcurrentUsage = true;
- }
- if (getLock) {
- try {
- sleepUninterruptibly(seconds * 1000);
- } finally {
- lock.unlock();
- }
- }
- }
+ BoundedInMemoryExecutor>, Integer> executor =
+ new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
+ consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ getPreExecuteRunnable());
+ executor.shutdownNow();
+ boolean terminatedGracefully = executor.awaitTermination();
+ assertTrue(terminatedGracefully);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index 9677335157b0d..46ef5dc40caf8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -174,27 +174,19 @@ public void shutdownNow() {
}
public boolean awaitTermination() {
- boolean interruptedBefore = Thread.currentThread().isInterrupted();
+ // if current thread has been interrupted before awaitTermination was called, we still give
+ // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
+ boolean interruptedBefore = Thread.interrupted();
boolean producerTerminated = false;
boolean consumerTerminated = false;
try {
producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
- return producerTerminated && consumerTerminated;
} catch (InterruptedException ie) {
- if (!interruptedBefore) {
- Thread.currentThread().interrupt();
- return false;
- }
- // if current thread has been interrupted before awaitTermination was called.
- // We still give executorService a chance to wait termination as
- // what is wanted to be interrupted may not be the waiting process.
- try {
- producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
- consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
- } catch (InterruptedException expected) {
- // awaiting process is interrupted again.
- }
+ // fail silently for any other interruption
+ }
+ // reset interrupt flag if needed
+ if (interruptedBefore) {
Thread.currentThread().interrupt();
}
return producerTerminated && consumerTerminated;
From b1b66d9e789e16ab90b6cd20d31befd643e45edc Mon Sep 17 00:00:00 2001
From: guanziyue
Date: Sun, 20 Mar 2022 17:01:39 +0800
Subject: [PATCH 3/3] Add a method to generate generic record in test data gen
---
.../hudi/common/testutils/HoodieTestDataGenerator.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 3e147b7fdd47c..dcb9fc639c34f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -860,12 +860,14 @@ public boolean deleteExistingKeyIfPresent(HoodieKey key) {
return false;
}
+ public GenericRecord generateGenericRecord() {
+ return generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
+ genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong());
+ }
+
public List generateGenericRecords(int numRecords) {
List list = new ArrayList<>();
- IntStream.range(0, numRecords).forEach(i -> {
- list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
- genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
- });
+ IntStream.range(0, numRecords).forEach(i -> list.add(generateGenericRecord()));
return list;
}