From ad47026ae084105ba91cef2ab4ec571355bcea23 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 30 Sep 2019 13:54:27 -0500 Subject: [PATCH 1/4] Change hadoop task naming --- .../apache/druid/indexing/kafka/KafkaConsumerConfigs.java | 2 +- .../org/apache/druid/indexing/kafka/KafkaIndexTask.java | 2 +- .../druid/indexing/kafka/supervisor/KafkaSupervisor.java | 2 +- .../apache/druid/indexing/kinesis/KinesisIndexTask.java | 2 +- .../indexing/kinesis/supervisor/KinesisSupervisor.java | 2 +- .../apache/druid/indexing/common/task/AbstractTask.java | 6 +++--- .../apache/druid/indexing/common/task/HadoopIndexTask.java | 4 ++-- .../task}/utils/RandomIdUtils.java | 7 ++----- .../indexing/seekablestream/SeekableStreamIndexTask.java | 2 +- .../druid/tests/indexer/AbstractKafkaIndexerTest.java | 2 +- 10 files changed, 14 insertions(+), 17 deletions(-) rename indexing-service/src/main/java/org/apache/druid/indexing/{seekablestream => common/task}/utils/RandomIdUtils.java (82%) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index cdce155e82d9..39174d562a66 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.java.util.common.StringUtils; import java.util.HashMap; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index a1b14a070577..9f7f9977efb2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -69,7 +69,7 @@ public KafkaIndexTask( ) { super( - id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id, + getOrMakeId(id, dataSchema.getDataSource(), TYPE), taskResource, dataSchema, tuningConfig, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 60406397b3c1..3ed89c07e8dc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; @@ -49,7 +50,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index b2209eaf11c8..e8f49bd34aa3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -56,7 +56,7 @@ public KinesisIndexTask( ) { super( - id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id, + getOrMakeId(id, dataSchema.getDataSource(), TYPE), taskResource, dataSchema, tuningConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index de68024e5884..5e8a79f43ef6 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; import org.apache.druid.indexing.kinesis.KinesisIndexTask; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; @@ -51,7 +52,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.StringUtils; import org.joda.time.DateTime; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index c901e685f7fe..8a9e21974157 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -28,7 +28,7 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.joda.time.Interval; @@ -90,14 +90,14 @@ static String getOrMakeId(String id, final String typeName, String dataSource, @ } final List objects = new ArrayList<>(); + final String suffix = RandomIdUtils.getRandomId(); objects.add(typeName); objects.add(dataSource); + objects.add(suffix); if (interval != null) { objects.add(interval.getStart()); objects.add(interval.getEnd()); } - objects.add(DateTimes.nowUtc().toString()); - return joinId(objects); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 6cb02888eaad..2e2009759c35 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -88,6 +87,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler { private static final Logger log = new Logger(HadoopIndexTask.class); private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json"; + private static final String TYPE = "index_hadoop"; private TaskConfig taskConfig = null; private static String getTheDataSource(HadoopIngestionSpec spec) @@ -152,7 +152,7 @@ public HadoopIndexTask( ) { super( - id != null ? id : StringUtils.format("index_hadoop_%s_%s", getTheDataSource(spec), DateTimes.nowUtc()), + getOrMakeId(id, TYPE, getTheDataSource(spec)), getTheDataSource(spec), hadoopDependencyCoordinates == null ? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java similarity index 82% rename from indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java index 4715db8d3f35..a782b668899e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java @@ -17,20 +17,17 @@ * under the License. */ -package org.apache.druid.indexing.seekablestream.utils; +package org.apache.druid.indexing.common.task.utils; -import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class RandomIdUtils { - private static final Random RANDOM = ThreadLocalRandom.current(); - public static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); for (int i = 0; i < Integer.BYTES * 2; ++i) { - suffix.append((char) ('a' + ((RANDOM.nextInt() >>> (i * 4)) & 0x0F))); + suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); } return suffix.toString(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 2da2f967cb9a..f3749f14098a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -39,8 +39,8 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index 9be8f698f1ad..58a91becc523 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -28,8 +28,8 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; From e2c521e34e5862240c32ba152fa4e17ebf2d1d6d Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 30 Sep 2019 15:41:50 -0500 Subject: [PATCH 2/4] Remove unused --- .../seekablestream/SeekableStreamIndexTask.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index f3749f14098a..2f8118786b31 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -39,7 +38,6 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -125,17 +123,6 @@ public SeekableStreamIndexTask( : LockGranularity.SEGMENT; } - private static String makeTaskId(String dataSource, String type) - { - final String suffix = RandomIdUtils.getRandomId(); - return Joiner.on("_").join(type, dataSource, suffix); - } - - protected static String getFormattedId(String dataSource, String type) - { - return makeTaskId(dataSource, type); - } - protected static String getFormattedGroupId(String dataSource, String type) { return StringUtils.format("%s_%s", type, dataSource); From b15293d46b85da1e7c79f0a739d8152f16b1bcd6 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 4 Oct 2019 09:39:30 -0500 Subject: [PATCH 3/4] Add timestamp --- .../org/apache/druid/indexing/common/task/AbstractTask.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 8a9e21974157..a2b958fb334a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.utils.RandomIdUtils; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.joda.time.Interval; @@ -98,6 +99,8 @@ static String getOrMakeId(String id, final String typeName, String dataSource, @ objects.add(interval.getStart()); objects.add(interval.getEnd()); } + objects.add(DateTimes.nowUtc().toString()); + return joinId(objects); } From 22f666623ba272d391f55e53977bc5e101ff5e2b Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 8 Nov 2019 13:39:43 -0600 Subject: [PATCH 4/4] Fix build --- .../org/apache/druid/indexing/common/task/HadoopIndexTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index f4a14e69129c..57081f52afb6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity;