diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index a9eb40611d0d..96a4accb3317 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -147,6 +147,7 @@ public class AppenderatorImpl implements Appenderator private volatile FileLock basePersistDirLock = null; private volatile FileChannel basePersistDirLockChannel = null; private AtomicBoolean closed = new AtomicBoolean(false); + private final Object sinkLock = new Object(); AppenderatorImpl( DataSchema schema, @@ -474,9 +475,10 @@ public ListenableFuture persistAll(@Nullable final Committer committer) indexesToPersist.add(Pair.of(hydrant, identifier)); } } - - if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), identifier)); + synchronized (sinkLock) { + if (sink.swappable()) { + indexesToPersist.add(Pair.of(sink.swap(), identifier)); + } } } @@ -611,7 +613,8 @@ public ListenableFuture push( private ListenableFuture pushBarrier() { return intermediateTempExecutor.submit( - (Runnable) () -> pushExecutor.submit(() -> {}) + (Runnable) () -> pushExecutor.submit(() -> { + }) ); } @@ -619,8 +622,8 @@ private ListenableFuture pushBarrier() * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only * be run in the single-threaded pushExecutor. * - * @param identifier sink identifier - * @param sink sink to push + * @param identifier sink identifier + * @param sink sink to push * @param useUniquePath true if the segment should be written to a path with a unique identifier * * @return segment descriptor, or null if the sink is no longer valid