Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.concurrent.ConcurrentAwaitableCounter;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.guice.LazySingleton;
Expand Down Expand Up @@ -51,11 +52,11 @@
* // cacheState could be either NoCache or VersionedCache.
* if (cacheState instanceof NoCache) {
* // the cache is not yet created, or already closed
* } else if (cacheState instanceof VersionedCache) {
* } else {
* Map<String, String> cache = ((VersionedCache) cacheState).getCache(); // use the cache
* // Although VersionedCache implements AutoCloseable, versionedCache shouldn't be manually closed
* // when obtained from entry.getCacheState(). If the namespace updates should be ceased completely,
* // entry.close() (see below) should be called, it will close the last VersionedCache itself.
* // entry.close() (see below) should be called, it will close the last VersionedCache as well.
* // On scheduled updates, outdated VersionedCaches are also closed automatically.
* }
* ...
Expand Down Expand Up @@ -105,14 +106,16 @@ Future<?> getUpdaterFuture()
return impl.updaterFuture;
}

@VisibleForTesting
public void awaitTotalUpdates(int totalUpdates) throws InterruptedException
{
impl.updateCounter.awaitTotalUpdates(totalUpdates);
impl.updateCounter.awaitCount(totalUpdates);
}

@VisibleForTesting
void awaitNextUpdates(int nextUpdates) throws InterruptedException
{
impl.updateCounter.awaitNextUpdates(nextUpdates);
impl.updateCounter.awaitNextIncrements(nextUpdates);
}

/**
Expand Down Expand Up @@ -145,7 +148,7 @@ public class EntryImpl<T extends ExtractionNamespace> implements AutoCloseable
private final Future<?> updaterFuture;
private final Cleaner entryCleaner;
private final CacheGenerator<T> cacheGenerator;
private final UpdateCounter updateCounter = new UpdateCounter();
private final ConcurrentAwaitableCounter updateCounter = new ConcurrentAwaitableCounter();
private final CountDownLatch startLatch = new CountDownLatch(1);

private EntryImpl(final T namespace, final Entry<T> entry, final CacheGenerator<T> cacheGenerator)
Expand Down Expand Up @@ -276,7 +279,7 @@ private CacheState swapCacheState(VersionedCache newVersionedCache)
return lastCacheState;
}
} while (!cacheStateHolder.compareAndSet(lastCacheState, newVersionedCache));
updateCounter.update();
updateCounter.increment();
return lastCacheState;
}

Expand Down Expand Up @@ -485,7 +488,7 @@ public Entry scheduleAndWait(ExtractionNamespace namespace, long waitForFirstRun
log.debug("Scheduled new %s", entry);
boolean success = false;
try {
success = entry.impl.updateCounter.awaitFirstUpdate(waitForFirstRunMs, TimeUnit.MILLISECONDS);
success = entry.impl.updateCounter.awaitFirstIncrement(waitForFirstRunMs, TimeUnit.MILLISECONDS);
if (success) {
return entry;
} else {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;

/**
* This synchronization object allows to {@link #increment} a counter without blocking, potentially from multiple
* threads (although in some use cases there is just one incrementer thread), and block in other thread(s), awaiting
* when the count reaches the provided value: see {@link #awaitCount}, or the specified number of events since the
* call: see {@link #awaitNextIncrements}.
*
* This counter wraps around {@link Long#MAX_VALUE} and starts from 0 again, so "next" count should be generally
* obtained by calling {@link #nextCount nextCount(currentCount)} rather than {@code currentCount + 1}.
*
* Memory consistency effects: actions in threads prior to calling {@link #increment} while the count was less than the
* awaited value happen-before actions following count awaiting methods such as {@link #awaitCount}.
*/
public final class ConcurrentAwaitableCounter
{
private static final long MAX_COUNT = Long.MAX_VALUE;

/**
* This method should be called to obtain the next total increment count to be passed to {@link #awaitCount} methods,
* instead of just adding 1 to the previous count, because the count must wrap around {@link Long#MAX_VALUE} and start
* from 0 again.
*/
public static long nextCount(long prevCount)
{
return (prevCount + 1) & MAX_COUNT;
}

private static class Sync extends AbstractQueuedLongSynchronizer
{
@Override
protected long tryAcquireShared(long countWhenWaitStarted)
{
long currentCount = getState();
return compareCounts(currentCount, countWhenWaitStarted) > 0 ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(long increment)
{
long count;
long nextCount;
do {
count = getState();
nextCount = (count + increment) & MAX_COUNT;
} while (!compareAndSetState(count, nextCount));
return true;
}

long getCount()
{
return getState();
}
}

private final Sync sync = new Sync();

/**
* Increment the count. This method could be safely called from concurrent threads.
*/
public void increment()
{
sync.releaseShared(1);
}

/**
* Await until the {@link #increment} is called on this counter object the specified number of times from the creation
* of this counter object.
*/
public void awaitCount(long totalCount) throws InterruptedException
{
checkTotalCount(totalCount);
long currentCount = sync.getCount();
while (compareCounts(totalCount, currentCount) > 0) {
sync.acquireSharedInterruptibly(currentCount);
currentCount = sync.getCount();
}
}

private static void checkTotalCount(long totalCount)
{
if (totalCount < 0) {
throw new AssertionError(
"Total count must always be >= 0, even in the face of overflow. "
+ "The next count should always be obtained by calling ConcurrentAwaitableCounter.nextCount(prevCount), "
+ "not just +1"
);
}
}

/**
* Await until the {@link #increment} is called on this counter object the specified number of times from the creation
* of this counter object, for not longer than the specified period of time. If by this time the target increment
* count is not reached, {@link TimeoutException} is thrown.
*/
public void awaitCount(long totalCount, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
checkTotalCount(totalCount);
long nanos = unit.toNanos(timeout);
long currentCount = sync.getCount();
while (compareCounts(totalCount, currentCount) > 0) {
if (!sync.tryAcquireSharedNanos(currentCount, nanos)) {
throw new TimeoutException();
}
currentCount = sync.getCount();
}
}

private static int compareCounts(long count1, long count2)
{
long diff = (count1 - count2) & MAX_COUNT;
if (diff == 0) {
return 0;
}
return diff < MAX_COUNT / 2 ? 1 : -1;
}

/**
* Somewhat loosely defined wait for "next N increments", because the starting point is not defined from the Java
* Memory Model perspective.
*/
public void awaitNextIncrements(long nextIncrements) throws InterruptedException
{
if (nextIncrements <= 0) {
throw new IllegalArgumentException("nextIncrements is not positive: " + nextIncrements);
}
if (nextIncrements > MAX_COUNT / 4) {
throw new UnsupportedOperationException("Couldn't wait for so many increments: " + nextIncrements);
}
awaitCount((sync.getCount() + nextIncrements) & MAX_COUNT);
}

/**
* The difference between this method and {@link #awaitCount(long, long, TimeUnit)} with argument 1 is that {@code
* awaitFirstIncrement()} returns boolean designating whether the count was await (while waiting for no longer than
* for the specified period of time), while {@code awaitCount()} throws {@link TimeoutException} if the count was not
* awaited.
*/
public boolean awaitFirstIncrement(long timeout, TimeUnit unit) throws InterruptedException
{
return sync.tryAcquireSharedNanos(0, unit.toNanos(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ private static boolean isEmittingAllowed(long state)

/**
* Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0.
* It's a boxed Integer rather than int, because we want to minimize the number of allocations done in
* It's a boxed Long rather than primitive long, because we want to minimize the number of allocations done in
* {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}.
* @see HttpPostEmitter#onSealExclusive
* @see HttpPostEmitter#concurrentBatch
*/
final Integer batchNumber;
final Long batchNumber;

/**
* The number of events in this batch, needed for event count-based batch emitting.
Expand All @@ -107,7 +107,7 @@ private static boolean isEmittingAllowed(long state)
*/
private long firstEventTimestamp = -1;

Batch(HttpPostEmitter emitter, byte[] buffer, int batchNumber)
Batch(HttpPostEmitter emitter, byte[] buffer, long batchNumber)
{
this.emitter = emitter;
this.buffer = buffer;
Expand Down
Loading