Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion codestyle/druid-forbidden-apis.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use io.druid.java.util.common.concurrent.Execs#sameThreadExecutor
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync
com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator()
com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator()
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void onFailure(Throwable t)
}
}
},
MoreExecutors.sameThreadExecutor()
Execs.sameThreadExecutor()
);
this.future = future;
final Stopwatch stopwatch = Stopwatch.createStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,8 @@ public void statusChanged(String taskId, TaskStatus status)
{
notices.add(new RunNotice());
}
}, MoreExecutors.sameThreadExecutor()
},
Execs.sameThreadExecutor()
);

listenerRegistered = true;
Expand Down Expand Up @@ -1770,13 +1771,21 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
final String checkpoints = sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(taskGroups.get(groupId).sequenceOffsets);
final Map<String, Object> context = spec.getContext() == null
? ImmutableMap.of("checkpoints", checkpoints, IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
: ImmutableMap.<String, Object>builder()
.put("checkpoints", checkpoints)
.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
.putAll(spec.getContext())
.build();
final Map<String, Object> context;
if (spec.getContext() == null) {
context = ImmutableMap.of(
"checkpoints",
checkpoints,
IS_INCREMENTAL_HANDOFF_SUPPORTED,
true
);
} else {
context = ImmutableMap.<String, Object>builder()
.put("checkpoints", checkpoints)
.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
.putAll(spec.getContext())
.build();
}
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(sequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2019,7 +2019,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Row;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
Expand Down Expand Up @@ -67,12 +67,17 @@ public static Collection<?> constructorFeeder()
return GroupByQueryRunnerTest.constructorFeeder();
}

public VarianceGroupByQueryTest(String testName, GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
public VarianceGroupByQueryTest(
String testName,
GroupByQueryConfig config,
GroupByQueryRunnerFactory factory,
QueryRunner runner
)
{
this.testName = testName;
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
this.runner = factory.mergeRunners(Execs.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ protected void innerMap(
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}

final long truncatedTimestamp = granularitySpec.getQueryGranularity().bucketStart(inputRow.getTimestamp()).getMillis();
final long truncatedTimestamp = granularitySpec.getQueryGranularity()
.bucketStart(inputRow.getTimestamp())
.getMillis();
final byte[] hashedDimensions = hashFunction.hashBytes(
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
Rows.toGroupKey(
Expand Down Expand Up @@ -475,7 +477,8 @@ public int getPartition(BytesWritable bytesWritable, Writable value, int numPart
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt();
if ("local".equals(config.get("mapreduce.jobtracker.address")) || "local".equals(config.get("mapred.job.tracker"))) {
if ("local".equals(config.get("mapreduce.jobtracker.address"))
|| "local".equals(config.get("mapred.job.tracker"))) {
return shardNum % numPartitions;
} else {
if (shardNum >= numPartitions) {
Expand Down Expand Up @@ -622,7 +625,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
persistExecutor = Execs.sameThreadExecutor();
}

for (final BytesWritable bw : values) {
Expand Down Expand Up @@ -732,7 +735,10 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
// ShardSpec to be published.
final ShardSpec shardSpecForPublishing;
if (config.isForceExtendableShardSpecs()) {
shardSpecForPublishing = new NumberedShardSpec(shardSpecForPartitioning.getPartitionNum(), config.getShardSpecCount(bucket));
shardSpecForPublishing = new NumberedShardSpec(
shardSpecForPartitioning.getPartitionNum(),
config.getShardSpecCount(bucket)
);
} else {
shardSpecForPublishing = shardSpecForPartitioning;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.curator.CuratorUtils;
import io.druid.curator.announcement.Announcer;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -77,7 +77,7 @@ public WorkerCuratorCoordinator(
this.curatorFramework = curatorFramework;
this.worker = worker;

this.announcer = new Announcer(curatorFramework, MoreExecutors.sameThreadExecutor());
this.announcer = new Announcer(curatorFramework, Execs.sameThreadExecutor());

this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import io.druid.client.indexing.IndexingService;
Expand Down Expand Up @@ -69,7 +68,7 @@

/**
* This class manages the list of tasks assigned to this worker.
*
* <p>
* It persists the list of assigned and completed tasks on disk. assigned task from disk is deleted as soon as it
* starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for
* active tasks to see which completed tasks are safe to delete.
Expand Down Expand Up @@ -226,7 +225,7 @@ public void statusChanged(final String taskId, final TaskStatus status)
// do nothing
}
},
MoreExecutors.sameThreadExecutor()
Execs.sameThreadExecutor()
);
}

Expand Down Expand Up @@ -456,9 +455,12 @@ private void scheduleCompletedTasksCleanup()
);
if (fullResponseHolder.getStatus().getCode() == 200) {
String responseContent = fullResponseHolder.getContent();
taskStatusesFromOverlord = jsonMapper.readValue(responseContent, new TypeReference<Map<String, TaskStatus>>()
{
});
taskStatusesFromOverlord = jsonMapper.readValue(
responseContent,
new TypeReference<Map<String, TaskStatus>>()
{
}
);
log.debug("Received completed task status response [%s].", responseContent);
} else if (fullResponseHolder.getStatus().getCode() == 404) {
// NOTE: this is to support backward compatibility, when overlord doesn't have "activeTasks" endpoint.
Expand Down Expand Up @@ -516,7 +518,7 @@ private void scheduleCompletedTasksCleanup()
TimeUnit.MINUTES
);
}

public void workerEnabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
Expand Down Expand Up @@ -717,5 +719,6 @@ public void handle()
//in Overlord as well as MiddleManagers then WorkerTaskMonitor should be deleted, this class should no longer be abstract
//and the methods below should be removed.
protected abstract void taskStarted(String taskId);

protected abstract void taskAnnouncementChanged(TaskAnnouncement announcement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.utils.Runnables;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
Expand Down Expand Up @@ -215,7 +216,7 @@ public InputRow nextRow()
@Override
public Runnable commit()
{
return () -> {};
return Runnables.getNoopRunnable();
}

@Override
Expand Down Expand Up @@ -1217,7 +1218,6 @@ public List<StorageLocationConfig> getLocations()
return Lists.newArrayList();
}
};

taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
Expand All @@ -1230,7 +1230,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
Expand All @@ -1250,16 +1250,17 @@ public List<StorageLocationConfig> getLocations()
public long sumMetric(final Task task, final DimFilter filter, final String metric)
{
// Do a query.
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test_ds")
.filters(filter)
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory(metric, metric)
)
).granularity(Granularities.ALL)
.intervals("2000/3000")
.build();
TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
.dataSource("test_ds")
.filters(filter)
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory(metric, metric)
)
).granularity(Granularities.ALL)
.intervals("2000/3000")
.build();

List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -206,7 +207,7 @@ public InputRow nextRow()
@Override
public Runnable commit()
{
return () -> {};
return Runnables.getNoopRunnable();
}

@Override
Expand Down Expand Up @@ -1071,7 +1072,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
Expand Down Expand Up @@ -76,6 +75,7 @@
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -522,8 +522,15 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
Preconditions.checkNotNull(emitter);

taskLockbox = new TaskLockbox(taskStorage);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
SupervisorManager.class)));
tac = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(
taskLockbox,
mdc,
emitter,
EasyMock.createMock(SupervisorManager.class)
)
);
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);

Expand Down Expand Up @@ -592,7 +599,7 @@ public void unannounceSegments(Iterable<DataSegment> segments)
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
MoreExecutors.sameThreadExecutor(), // query executor service
Execs.sameThreadExecutor(), // query executor service
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper())
Expand Down Expand Up @@ -1088,7 +1095,22 @@ public void testResumeTasks() throws Exception
mapper
),
new IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null, null)
new IndexTuningConfig(
10000,
10,
null,
null,
null,
indexSpec,
null,
false,
null,
null,
null,
null,
null,
null
)
),
null
);
Expand Down
Loading