From 37537b6c4e192ecea22b579d3bb57842f92bf25a Mon Sep 17 00:00:00 2001 From: leventov Date: Sat, 7 Apr 2018 18:47:32 +0300 Subject: [PATCH 1/4] Replace EmittedBatchCounter and UpdateCounter with (both not safe for concurrent increments/updates) with ConcurrentAwaitableCounter (safe for concurrent increments) --- .../namespace/cache/CacheScheduler.java | 15 +- .../lookup/namespace/cache/UpdateCounter.java | 88 ----------- .../ConcurrentAwaitableCounter.java | 141 ++++++++++++++++++ .../druid/java/util/emitter/core/Batch.java | 6 +- .../emitter/core/EmittedBatchCounter.java | 73 --------- .../util/emitter/core/HttpPostEmitter.java | 13 +- .../ConcurrentAwaitableCounterTest.java | 65 ++++++++ 7 files changed, 224 insertions(+), 177 deletions(-) delete mode 100644 extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java create mode 100644 java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java delete mode 100644 java-util/src/main/java/io/druid/java/util/emitter/core/EmittedBatchCounter.java create mode 100644 java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java index 7669168d1f4a..60bdbef9581e 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.inject.Inject; +import io.druid.concurrent.ConcurrentAwaitableCounter; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; import io.druid.guice.LazySingleton; @@ -51,11 +52,11 @@ * // cacheState could be either NoCache or VersionedCache. * if (cacheState instanceof NoCache) { * // the cache is not yet created, or already closed - * } else if (cacheState instanceof VersionedCache) { + * } else { * Map cache = ((VersionedCache) cacheState).getCache(); // use the cache * // Although VersionedCache implements AutoCloseable, versionedCache shouldn't be manually closed * // when obtained from entry.getCacheState(). If the namespace updates should be ceased completely, - * // entry.close() (see below) should be called, it will close the last VersionedCache itself. + * // entry.close() (see below) should be called, it will close the last VersionedCache as well. * // On scheduled updates, outdated VersionedCaches are also closed automatically. * } * ... @@ -107,12 +108,12 @@ Future getUpdaterFuture() public void awaitTotalUpdates(int totalUpdates) throws InterruptedException { - impl.updateCounter.awaitTotalUpdates(totalUpdates); + impl.updateCounter.awaitCount(totalUpdates); } void awaitNextUpdates(int nextUpdates) throws InterruptedException { - impl.updateCounter.awaitNextUpdates(nextUpdates); + impl.updateCounter.awaitNextIncrements(nextUpdates); } /** @@ -145,7 +146,7 @@ public class EntryImpl implements AutoCloseable private final Future updaterFuture; private final Cleaner entryCleaner; private final CacheGenerator cacheGenerator; - private final UpdateCounter updateCounter = new UpdateCounter(); + private final ConcurrentAwaitableCounter updateCounter = new ConcurrentAwaitableCounter(); private final CountDownLatch startLatch = new CountDownLatch(1); private EntryImpl(final T namespace, final Entry entry, final CacheGenerator cacheGenerator) @@ -276,7 +277,7 @@ private CacheState swapCacheState(VersionedCache newVersionedCache) return lastCacheState; } } while (!cacheStateHolder.compareAndSet(lastCacheState, newVersionedCache)); - updateCounter.update(); + updateCounter.increment(); return lastCacheState; } @@ -485,7 +486,7 @@ public Entry scheduleAndWait(ExtractionNamespace namespace, long waitForFirstRun log.debug("Scheduled new %s", entry); boolean success = false; try { - success = entry.impl.updateCounter.awaitFirstUpdate(waitForFirstRunMs, TimeUnit.MILLISECONDS); + success = entry.impl.updateCounter.awaitFirstIncrement(waitForFirstRunMs, TimeUnit.MILLISECONDS); if (success) { return entry; } else { diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java deleted file mode 100644 index ed229b45f891..000000000000 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/UpdateCounter.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.lookup.namespace.cache; - -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -final class UpdateCounter -{ - /** - * Max {@link Phaser}'s phase, specified in it's javadoc. Then it wraps to zero. - */ - private static final int MAX_PHASE = Integer.MAX_VALUE; - - private final Phaser phaser = new Phaser(1); - - void update() - { - phaser.arrive(); - } - - void awaitTotalUpdates(int totalUpdates) throws InterruptedException - { - totalUpdates &= MAX_PHASE; - int currentUpdates = phaser.getPhase(); - checkNotTerminated(currentUpdates); - while (comparePhases(totalUpdates, currentUpdates) > 0) { - currentUpdates = phaser.awaitAdvanceInterruptibly(currentUpdates); - checkNotTerminated(currentUpdates); - } - } - - private static int comparePhases(int phase1, int phase2) - { - int diff = (phase1 - phase2) & MAX_PHASE; - if (diff == 0) { - return 0; - } - return diff < MAX_PHASE / 2 ? 1 : -1; - } - - private void checkNotTerminated(int phase) - { - if (phase < 0) { - throw new IllegalStateException("Phaser[" + phaser + "] unexpectedly terminated."); - } - } - - void awaitNextUpdates(int nextUpdates) throws InterruptedException - { - if (nextUpdates <= 0) { - throw new IllegalArgumentException("nextUpdates is not positive: " + nextUpdates); - } - if (nextUpdates > MAX_PHASE / 4) { - throw new UnsupportedOperationException("Couldn't wait for so many updates: " + nextUpdates); - } - awaitTotalUpdates(phaser.getPhase() + nextUpdates); - } - - boolean awaitFirstUpdate(long timeout, TimeUnit unit) throws InterruptedException - { - try { - phaser.awaitAdvanceInterruptibly(0, timeout, unit); - return true; - } - catch (TimeoutException e) { - return false; - } - } -} diff --git a/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java new file mode 100644 index 000000000000..47df74b54968 --- /dev/null +++ b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java @@ -0,0 +1,141 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.concurrent; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; + +/** + * This synchronization object allows to {@link #increment} a counter without blocking, potentially from multiple + * threads (although in some use cases there is just one incrementer thread), and block in other thread(s), awaiting + * when the count reaches the provided value: see {@link #awaitCount}, or the specified number of events since the + * call: see {@link #awaitNextIncrements}. + * + * This counter wraps around {@link Long#MAX_VALUE} and starts from 0 again, so "next" count should be generally + * obtained by calling {@link #nextCount nextCount(currentCount)} rather than {@code currentCount + 1}. + * + * Memory consistency effects: actions in threads prior to calling {@link #increment} while the count was less than the + * awaited value happen-before actions following count awaiting methods such as {@link #awaitCount}. + */ +public final class ConcurrentAwaitableCounter +{ + private static final long MAX_COUNT = Long.MAX_VALUE; + + public static long nextCount(long prevCount) + { + return (prevCount + 1) & MAX_COUNT; + } + + private static class Sync extends AbstractQueuedLongSynchronizer + { + @Override + protected long tryAcquireShared(long awaitedCount) + { + long currentCount = getState(); + return compareCounts(currentCount, awaitedCount) > 0 ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(long increment) + { + long count; + long nextCount; + do { + count = getState(); + nextCount = (count + increment) & MAX_COUNT; + } while (!compareAndSetState(count, nextCount)); + return true; + } + + long getCount() + { + return getState(); + } + } + + private final Sync sync = new Sync(); + + /** + * Increment the count. This method could be safely called from concurrent threads. + */ + public void increment() + { + sync.releaseShared(1); + } + + /** + * Await until the {@link #increment} is called on this counter object the specified number of times from the creation + * of this counter object. + */ + public void awaitCount(long totalCount) throws InterruptedException + { + totalCount &= MAX_COUNT; + long currentCount; + currentCount = sync.getCount(); + while (compareCounts(totalCount, currentCount) > 0) { + sync.acquireSharedInterruptibly(currentCount); + currentCount = sync.getCount(); + } + } + + public void awaitCount(long totalCount, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + { + long nanos = unit.toNanos(timeout); + totalCount &= MAX_COUNT; + long currentCount; + currentCount = sync.getCount(); + while (compareCounts(totalCount, currentCount) > 0) { + if (!sync.tryAcquireSharedNanos(currentCount, nanos)) { + throw new TimeoutException(); + } + currentCount = sync.getCount(); + } + } + + private static int compareCounts(long count1, long count2) + { + long diff = (count1 - count2) & MAX_COUNT; + if (diff == 0) { + return 0; + } + return diff < MAX_COUNT / 2 ? 1 : -1; + } + + /** + * Somewhat loosely defined wait for "next N increments", because the starting point is not defined from the Java + * Memory Model perspective. + */ + public void awaitNextIncrements(long nextIncrements) throws InterruptedException + { + if (nextIncrements <= 0) { + throw new IllegalArgumentException("nextIncrements is not positive: " + nextIncrements); + } + if (nextIncrements > MAX_COUNT / 4) { + throw new UnsupportedOperationException("Couldn't wait for so many increments: " + nextIncrements); + } + awaitCount(sync.getCount() + nextIncrements); + } + + public boolean awaitFirstIncrement(long timeout, TimeUnit unit) throws InterruptedException + { + return sync.tryAcquireSharedNanos(0, unit.toNanos(timeout)); + } +} diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java index 818f54f91a94..cb0bc88042b1 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/Batch.java @@ -90,12 +90,12 @@ private static boolean isEmittingAllowed(long state) /** * Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0. - * It's a boxed Integer rather than int, because we want to minimize the number of allocations done in + * It's a boxed Long rather than primitive long, because we want to minimize the number of allocations done in * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}. * @see HttpPostEmitter#onSealExclusive * @see HttpPostEmitter#concurrentBatch */ - final Integer batchNumber; + final Long batchNumber; /** * The number of events in this batch, needed for event count-based batch emitting. @@ -107,7 +107,7 @@ private static boolean isEmittingAllowed(long state) */ private long firstEventTimestamp = -1; - Batch(HttpPostEmitter emitter, byte[] buffer, int batchNumber) + Batch(HttpPostEmitter emitter, byte[] buffer, long batchNumber) { this.emitter = emitter; this.buffer = buffer; diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/EmittedBatchCounter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/EmittedBatchCounter.java deleted file mode 100644 index 860e6957fe94..000000000000 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/EmittedBatchCounter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.emitter.core; - -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - - -final class EmittedBatchCounter -{ - /** - * Max {@link Phaser}'s phase, specified in it's javadoc. Then it wraps to zero. - */ - private static final int MAX_PHASE = Integer.MAX_VALUE; - - static int nextBatchNumber(int prevBatchNumber) - { - return (prevBatchNumber + 1) & MAX_PHASE; - } - - private final Phaser phaser = new Phaser(1); - - void batchEmitted() - { - phaser.arrive(); - } - - void awaitBatchEmitted(int batchNumberToAwait, long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException - { - batchNumberToAwait &= MAX_PHASE; - int currentBatch = phaser.getPhase(); - checkNotTerminated(currentBatch); - while (comparePhases(batchNumberToAwait, currentBatch) >= 0) { - currentBatch = phaser.awaitAdvanceInterruptibly(currentBatch, timeout, unit); - checkNotTerminated(currentBatch); - } - } - - private static int comparePhases(int phase1, int phase2) - { - int diff = (phase1 - phase2) & MAX_PHASE; - if (diff == 0) { - return 0; - } - return diff < MAX_PHASE / 2 ? 1 : -1; - } - - private void checkNotTerminated(int phase) - { - if (phase < 0) { - throw new IllegalStateException("Phaser[" + phaser + "] unexpectedly terminated."); - } - } -} diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java index 42a2d9a17fbb..1e609b2b5ae5 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java @@ -25,6 +25,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.primitives.Ints; +import io.druid.concurrent.ConcurrentAwaitableCounter; import io.druid.java.util.common.ISE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; @@ -133,7 +134,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter */ private final AtomicInteger approximateLargeEventsToEmitCount = new AtomicInteger(); - private final EmittedBatchCounter emittedBatchCounter = new EmittedBatchCounter(); + private final ConcurrentAwaitableCounter emittedBatchCounter = new ConcurrentAwaitableCounter(); private final EmittingThread emittingThread; private final AtomicLong totalEmittedEvents = new AtomicLong(); private final AtomicInteger allocatedBuffers = new AtomicInteger(); @@ -331,7 +332,7 @@ private void doOnSealExclusive(Batch batch, long elapsedTimeMillis) addBatchToEmitQueue(batch); wakeUpEmittingThread(); if (!isTerminated()) { - int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(batch.batchNumber); + long nextBatchNumber = ConcurrentAwaitableCounter.nextCount(batch.batchNumber); byte[] newBuffer = acquireBuffer(); if (!concurrentBatch.compareAndSet(batch, new Batch(this, newBuffer, nextBatchNumber))) { buffersToReuse.add(newBuffer); @@ -345,7 +346,7 @@ private void doOnSealExclusive(Batch batch, long elapsedTimeMillis) private void tryRecoverCurrentBatch(Integer failedBatchNumber) { log.info("Trying to recover currentBatch"); - int nextBatchNumber = EmittedBatchCounter.nextBatchNumber(failedBatchNumber); + long nextBatchNumber = ConcurrentAwaitableCounter.nextCount(failedBatchNumber); byte[] newBuffer = acquireBuffer(); if (concurrentBatch.compareAndSet(failedBatchNumber, new Batch(this, newBuffer, nextBatchNumber))) { log.info("Successfully recovered currentBatch"); @@ -383,7 +384,7 @@ private void limitBuffersToEmitSize() private void batchFinalized() { // Notify HttpPostEmitter.flush(), that the batch is emitted, or failed, or dropped. - emittedBatchCounter.batchEmitted(); + emittedBatchCounter.increment(); } private Batch pollBatchFromEmitQueue() @@ -422,7 +423,7 @@ private void flush(Batch batch) throws IOException // This check doesn't always awaits for this exact batch to be emitted, because another batch could be dropped // from the queue ahead of this one, in limitBuffersToEmitSize(). But there is no better way currently to wait for // the exact batch, and it's not that important. - emittedBatchCounter.awaitBatchEmitted(batch.batchNumber, config.getFlushTimeOut(), TimeUnit.MILLISECONDS); + emittedBatchCounter.awaitCount(batch.batchNumber, config.getFlushTimeOut(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { String message = StringUtils.format("Timed out after [%d] millis during flushing", config.getFlushTimeOut()); @@ -923,7 +924,7 @@ public ConcurrentTimeCounter getFailedSendingTimeCounter() @VisibleForTesting void waitForEmission(int batchNumber) throws Exception { - emittedBatchCounter.awaitBatchEmitted(batchNumber, 10, TimeUnit.SECONDS); + emittedBatchCounter.awaitCount(batchNumber, 10, TimeUnit.SECONDS); } @VisibleForTesting diff --git a/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java b/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java new file mode 100644 index 000000000000..1b3e010ea124 --- /dev/null +++ b/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.concurrent; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +public class ConcurrentAwaitableCounterTest +{ + + @Test(timeout = 1000) + public void smokeTest() throws InterruptedException + { + ConcurrentAwaitableCounter counter = new ConcurrentAwaitableCounter(); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch finish = new CountDownLatch(7); + for (int i = 0; i < 2; i++) { + new Thread(() -> { + try { + start.await(); + for (int j = 0; j < 10_000; j++) { + counter.increment(); + } + finish.countDown(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + }).start(); + } + for (int awaitCount : new int[] {0, 1, 100, 10_000, 20_000}) { + new Thread(() -> { + try { + start.await(); + counter.awaitCount(awaitCount); + finish.countDown(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + } + start.countDown(); + finish.await(); + } +} From cf56196aa5c2d63bbd0c6d7676b2fe50f0449bf8 Mon Sep 17 00:00:00 2001 From: leventov Date: Sun, 8 Apr 2018 17:18:22 +0300 Subject: [PATCH 2/4] Fixes --- .../namespace/cache/CacheScheduler.java | 2 ++ .../ConcurrentAwaitableCounter.java | 10 ++++++-- .../util/emitter/core/HttpPostEmitter.java | 3 ++- .../ConcurrentAwaitableCounterTest.java | 24 +++++++++++++++++++ .../java/util/emitter/core/EmitterTest.java | 4 ++-- 5 files changed, 38 insertions(+), 5 deletions(-) diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java index 60bdbef9581e..1f1d3e3f8135 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/CacheScheduler.java @@ -106,11 +106,13 @@ Future getUpdaterFuture() return impl.updaterFuture; } + @VisibleForTesting public void awaitTotalUpdates(int totalUpdates) throws InterruptedException { impl.updateCounter.awaitCount(totalUpdates); } + @VisibleForTesting void awaitNextUpdates(int nextUpdates) throws InterruptedException { impl.updateCounter.awaitNextIncrements(nextUpdates); diff --git a/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java index 47df74b54968..1bba4de60b78 100644 --- a/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java +++ b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java @@ -47,10 +47,10 @@ public static long nextCount(long prevCount) private static class Sync extends AbstractQueuedLongSynchronizer { @Override - protected long tryAcquireShared(long awaitedCount) + protected long tryAcquireShared(long countWhenWaitStarted) { long currentCount = getState(); - return compareCounts(currentCount, awaitedCount) > 0 ? 1 : -1; + return compareCounts(currentCount, countWhenWaitStarted) > 0 ? 1 : -1; } @Override @@ -134,6 +134,12 @@ public void awaitNextIncrements(long nextIncrements) throws InterruptedException awaitCount(sync.getCount() + nextIncrements); } + /** + * The difference between this method and {@link #awaitCount(long, long, TimeUnit)} with argument 1 is that {@code + * awaitFirstIncrement()} returns boolean designating whether the count was await (while waiting for no longer than + * for the specified period of time), while {@code awaitCount()} throws {@link TimeoutException} if the count was not + * awaited. + */ public boolean awaitFirstIncrement(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(0, unit.toNanos(timeout)); diff --git a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java index 1e609b2b5ae5..418a8b32cf29 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/core/HttpPostEmitter.java @@ -178,7 +178,8 @@ public HttpPostEmitter(HttpEmitterConfig config, AsyncHttpClient client, ObjectM throw new ISE(e, "Bad URL: %s", config.getRecipientBaseUrl()); } emittingThread = new EmittingThread(config); - concurrentBatch.set(new Batch(this, acquireBuffer(), 0)); + long firstBatchNumber = 1; + concurrentBatch.set(new Batch(this, acquireBuffer(), firstBatchNumber)); // lastFillTimeMillis must not be 0, minHttpTimeoutMillis could be. lastFillTimeMillis = Math.max(config.minHttpTimeoutMillis, 1); } diff --git a/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java b/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java index 1b3e010ea124..c13fa7b7d7c3 100644 --- a/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java +++ b/java-util/src/test/java/io/druid/concurrent/ConcurrentAwaitableCounterTest.java @@ -19,9 +19,11 @@ package io.druid.concurrent; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class ConcurrentAwaitableCounterTest { @@ -62,4 +64,26 @@ public void smokeTest() throws InterruptedException start.countDown(); finish.await(); } + + @Test + public void testAwaitFirstUpdate() throws InterruptedException + { + int[] value = new int[1]; + ConcurrentAwaitableCounter counter = new ConcurrentAwaitableCounter(); + Thread t = new Thread(() -> { + try { + Assert.assertTrue(counter.awaitFirstIncrement(10, TimeUnit.SECONDS)); + Assert.assertEquals(1, value[0]); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + }); + t.start(); + Thread.sleep(2_000); + value[0] = 1; + counter.increment(); + t.join(); + } } diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java index 78e6c0698b2b..344fb1f3d5d5 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java @@ -512,11 +512,11 @@ protected ListenableFuture go(Request request) throws JsonProcessingEx for (UnitEvent event : events) { emitter.emit(event); } - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); Assert.assertEquals(2, emitter.getTotalEmittedEvents()); emitter.flush(); - waitForEmission(emitter, 1); + waitForEmission(emitter, 2); Assert.assertEquals(4, emitter.getTotalEmittedEvents()); closeNoFlush(emitter); Assert.assertTrue(httpClient.succeeded()); From d218daf4c5bd923397b254325dc43f535eba09ac Mon Sep 17 00:00:00 2001 From: leventov Date: Sun, 8 Apr 2018 18:14:25 +0300 Subject: [PATCH 3/4] Fix EmitterTest --- .../java/util/emitter/core/EmitterTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java index 344fb1f3d5d5..0320c76d24cc 100644 --- a/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java +++ b/java-util/src/test/java/io/druid/java/util/emitter/core/EmitterTest.java @@ -239,7 +239,7 @@ protected ListenableFuture go(Request request) throws JsonProcessingEx for (UnitEvent event : events) { emitter.emit(event); } - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); closeNoFlush(emitter); Assert.assertTrue(httpClient.succeeded()); } @@ -281,7 +281,7 @@ protected ListenableFuture go(Request request) throws JsonProcessingEx for (UnitEvent event : events) { emitter.emit(event); } - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); closeNoFlush(emitter); Assert.assertTrue(httpClient.succeeded()); } @@ -297,7 +297,7 @@ public void testSizeBasedEmission() throws Exception httpClient.setGoHandler(GoHandlers.passingHandler(okResponse()).times(1)); emitter.emit(new UnitEvent("test", 3)); - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); httpClient.setGoHandler(GoHandlers.failingHandler()); emitter.emit(new UnitEvent("test", 4)); @@ -337,7 +337,7 @@ protected ListenableFuture go(Request request) timeWaited < timeBetweenEmissions * 2 ); - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); final CountDownLatch thisLatch = new CountDownLatch(1); httpClient.setGoHandler( @@ -362,7 +362,7 @@ protected ListenableFuture go(Request request) timeWaited < timeBetweenEmissions * 2 ); - waitForEmission(emitter, 1); + waitForEmission(emitter, 2); closeNoFlush(emitter); Assert.assertTrue("httpClient.succeeded()", httpClient.succeeded()); } @@ -388,7 +388,7 @@ protected ListenableFuture go(Request request) ); emitter.emit(event1); emitter.flush(); - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); Assert.assertTrue(httpClient.succeeded()); // Failed to emit the first event. @@ -407,7 +407,7 @@ protected ListenableFuture go(Request request) emitter.emit(event2); emitter.flush(); - waitForEmission(emitter, 1); + waitForEmission(emitter, 2); closeNoFlush(emitter); // Failed event is emitted inside emitter thread, there is no other way to wait for it other than joining the // emitterThread @@ -461,7 +461,7 @@ protected ListenableFuture go(Request request) throws JsonProcessingEx emitter.emit(event); } emitter.flush(); - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); closeNoFlush(emitter); Assert.assertTrue(httpClient.succeeded()); } @@ -571,7 +571,7 @@ protected ListenableFuture go(Request request) throws IOException for (UnitEvent event : events) { emitter.emit(event); } - waitForEmission(emitter, 0); + waitForEmission(emitter, 1); closeNoFlush(emitter); Assert.assertTrue(httpClient.succeeded()); } From 5f0e317ce48788a2e74dbcf535b6572af839222c Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 10 Apr 2018 19:58:42 +0700 Subject: [PATCH 4/4] Added Javadoc and make awaitCount() to throw exceptions on wrong count instead of masking errors --- .../ConcurrentAwaitableCounter.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java index 1bba4de60b78..e9ffa61ff726 100644 --- a/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java +++ b/java-util/src/main/java/io/druid/concurrent/ConcurrentAwaitableCounter.java @@ -39,6 +39,11 @@ public final class ConcurrentAwaitableCounter { private static final long MAX_COUNT = Long.MAX_VALUE; + /** + * This method should be called to obtain the next total increment count to be passed to {@link #awaitCount} methods, + * instead of just adding 1 to the previous count, because the count must wrap around {@link Long#MAX_VALUE} and start + * from 0 again. + */ public static long nextCount(long prevCount) { return (prevCount + 1) & MAX_COUNT; @@ -87,21 +92,35 @@ public void increment() */ public void awaitCount(long totalCount) throws InterruptedException { - totalCount &= MAX_COUNT; - long currentCount; - currentCount = sync.getCount(); + checkTotalCount(totalCount); + long currentCount = sync.getCount(); while (compareCounts(totalCount, currentCount) > 0) { sync.acquireSharedInterruptibly(currentCount); currentCount = sync.getCount(); } } + private static void checkTotalCount(long totalCount) + { + if (totalCount < 0) { + throw new AssertionError( + "Total count must always be >= 0, even in the face of overflow. " + + "The next count should always be obtained by calling ConcurrentAwaitableCounter.nextCount(prevCount), " + + "not just +1" + ); + } + } + + /** + * Await until the {@link #increment} is called on this counter object the specified number of times from the creation + * of this counter object, for not longer than the specified period of time. If by this time the target increment + * count is not reached, {@link TimeoutException} is thrown. + */ public void awaitCount(long totalCount, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + checkTotalCount(totalCount); long nanos = unit.toNanos(timeout); - totalCount &= MAX_COUNT; - long currentCount; - currentCount = sync.getCount(); + long currentCount = sync.getCount(); while (compareCounts(totalCount, currentCount) > 0) { if (!sync.tryAcquireSharedNanos(currentCount, nanos)) { throw new TimeoutException(); @@ -131,7 +150,7 @@ public void awaitNextIncrements(long nextIncrements) throws InterruptedException if (nextIncrements > MAX_COUNT / 4) { throw new UnsupportedOperationException("Couldn't wait for so many increments: " + nextIncrements); } - awaitCount(sync.getCount() + nextIncrements); + awaitCount((sync.getCount() + nextIncrements) & MAX_COUNT); } /**