Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codestyle/druid-forbidden-apis.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ com.google.common.collect.Sets#newHashSet() @ Create java.util.HashSet directly
com.google.common.collect.Sets#newLinkedHashSet() @ Create java.util.LinkedHashSet directly
com.google.common.collect.Sets#newTreeSet() @ Create java.util.TreeSet directly
com.google.common.collect.Sets#newTreeSet(java.util.Comparator) @ Create java.util.TreeSet directly
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL() instead
java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common.concurrent;


import com.google.common.util.concurrent.AbstractListeningExecutorService;

import javax.annotation.concurrent.GuardedBy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

// Copy of Guava's Apache 2.0 licensed https://github.com/google/guava/blob/a5cafa67da64a12444037bd4f4c30c39a0c184aa/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L240-L339

/**
* Creates an executor service that runs each task in the thread that invokes {@code
* execute/submit}, as in {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy} This applies both to individually submitted
* tasks and to collections of tasks submitted via {@code invokeAll} or {@code invokeAny}. In the
* latter case, tasks will run serially on the calling thread. Tasks are run to completion before
* a {@code Future} is returned to the caller (unless the executor has been shutdown).
*
* <p>Although all tasks are immediately executed in the thread that submitted the task, this
* {@code ExecutorService} imposes a small locking overhead on each task submission in order to
* implement shutdown and termination behavior.
*
* <p>The implementation deviates from the {@code ExecutorService} specification with regards to
* the {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
* implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
* tasks. Second, the returned list will always be empty, as any submitted task is considered to
* have started execution. This applies also to tasks given to {@code invokeAll} or {@code
* invokeAny} which are pending serial execution, even the subset of the tasks that have not yet
* started execution. It is unclear from the {@code ExecutorService} specification if these should
* be included, and it's much easier to implement the interpretation that they not be. Finally, a
* call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls to {@code
* invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the tasks may
* already have been executed.
*/
public class DirectExecutorService extends AbstractListeningExecutorService
{

/**
* Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor
*/
private final Object lock = new Object();

/*
* Conceptually, these two variables describe the executor being in
* one of three states:
* - Active: shutdown == false
* - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true
*/
@GuardedBy("lock")
private int runningTasks = 0;

@GuardedBy("lock")
private boolean shutdown = false;

@Override
public void execute(Runnable command)
{
startTask();
try {
command.run();
}
finally {
endTask();
}
}

@Override
public boolean isShutdown()
{
synchronized (lock) {
return shutdown;
}
}

@Override
public void shutdown()
{
synchronized (lock) {
shutdown = true;
if (runningTasks == 0) {
lock.notifyAll();
}
}
}

// See newDirectExecutorService javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow()
{
shutdown();
return Collections.emptyList();
}

@Override
public boolean isTerminated()
{
synchronized (lock) {
return shutdown && runningTasks == 0;
}
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
synchronized (lock) {
while (true) {
if (shutdown && runningTasks == 0) {
return true;
} else if (nanos <= 0) {
return false;
} else {
long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
}
}
}
}

/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException if the executor has been previously shutdown
*/
private void startTask()
{
synchronized (lock) {
if (shutdown) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
}
}

/**
* Decrements the running task count.
*/
private void endTask()
{
synchronized (lock) {
int numRunning = --runningTasks;
if (numRunning == 0) {
lock.notifyAll();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.annotation.Nullable;
Expand All @@ -38,9 +39,11 @@
import java.util.concurrent.TimeUnit;

/**
*
*/
public class Execs
{

/**
* Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks.
*/
Expand Down Expand Up @@ -152,4 +155,9 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
}
);
}

public static ListeningExecutorService directExecutor()
{
return new DirectExecutorService();
}
}
6 changes: 6 additions & 0 deletions core/src/test/java/org/apache/druid/concurrent/ExecsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,10 @@ public void run()
blockingExecutor.shutdown();
producer.shutdown();
}

@Test
public void testDirectExecutorFactory()
{
Assert.assertNotNull(Execs.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.java.util.common.guava;

import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -40,11 +40,11 @@ public void testConsistentEffectApplicationOrder()
.simple(Arrays.asList(1, 2, 3))
.withEffect(
() -> effect1.set(counter.incrementAndGet()),
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
)
.withEffect(
() -> effect2.set(counter.incrementAndGet()),
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
// Run sequence via accumulate
sequence.toList();
Expand All @@ -70,7 +70,7 @@ public void testEffectExecutedIfWrappedSequenceThrowsExceptionFromClose()
});
final AtomicBoolean effectExecuted = new AtomicBoolean();
Sequence<Integer> seqWithEffect =
throwingSeq.withEffect(() -> effectExecuted.set(true), MoreExecutors.sameThreadExecutor());
throwingSeq.withEffect(() -> effectExecuted.set(true), Execs.directExecutor());
try {
seqWithEffect.toList();
Assert.fail("expected RuntimeException");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void onFailure(Throwable t)
}
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
this.future = future;
final Stopwatch stopwatch = Stopwatch.createStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2452,7 +2452,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2765,7 +2765,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.druid.query.aggregation.variance;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand All @@ -47,6 +47,7 @@
import java.util.List;

/**
*
*/
@RunWith(Parameterized.class)
public class VarianceGroupByQueryTest
Expand All @@ -72,7 +73,7 @@ public VarianceGroupByQueryTest(
this.testName = testName;
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner));
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.concurrent.TimeoutException;

/**
*
*/
public class IndexGeneratorJob implements Jobby
{
Expand Down Expand Up @@ -240,7 +241,8 @@ public Map<String, Object> getStats()

Map<String, Object> metrics = TaskMetricsUtils.makeIngestionRowMetrics(
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER)
.getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue()
);
Expand Down Expand Up @@ -345,7 +347,9 @@ protected void innerMap(
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}

final long truncatedTimestamp = granularitySpec.getQueryGranularity().bucketStart(inputRow.getTimestamp()).getMillis();
final long truncatedTimestamp = granularitySpec.getQueryGranularity()
.bucketStart(inputRow.getTimestamp())
.getMillis();
final byte[] hashedDimensions = hashFunction.hashBytes(
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
Rows.toGroupKey(
Expand All @@ -359,17 +363,17 @@ protected void innerMap(
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggsForSerializingSegmentInputRow
)
:
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggregators
);
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggsForSerializingSegmentInputRow
)
:
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggregators
);

context.write(
new SortableBytes(
Expand Down Expand Up @@ -678,7 +682,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
persistExecutor = Execs.directExecutor();
}

for (final BytesWritable bw : values) {
Expand Down Expand Up @@ -786,7 +790,10 @@ public void doRun()
// ShardSpec to be published.
final ShardSpec shardSpecForPublishing;
if (config.isForceExtendableShardSpecs()) {
shardSpecForPublishing = new NumberedShardSpec(shardSpecForPartitioning.getPartitionNum(), config.getShardSpecCount(bucket));
shardSpecForPublishing = new NumberedShardSpec(
shardSpecForPartitioning.getPartitionNum(),
config.getShardSpecCount(bucket)
);
} else {
shardSpecForPublishing = shardSpecForPartitioning;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ public void statusChanged(String taskId, TaskStatus status)
{
notices.add(new RunNotice());
}
}, MoreExecutors.sameThreadExecutor()
}, Execs.directExecutor()
);
listenerRegistered = true;
}
Expand Down
Loading