diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index cdce155e82d9..39174d562a66 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.java.util.common.StringUtils; import java.util.HashMap; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index a1b14a070577..9f7f9977efb2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -69,7 +69,7 @@ public KafkaIndexTask( ) { super( - id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id, + getOrMakeId(id, dataSchema.getDataSource(), TYPE), taskResource, dataSchema, tuningConfig, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e47ef2be7a4c..77c5d680f486 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; @@ -49,7 +50,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index b2209eaf11c8..e8f49bd34aa3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -56,7 +56,7 @@ public KinesisIndexTask( ) { super( - id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id, + getOrMakeId(id, dataSchema.getDataSource(), TYPE), taskResource, dataSchema, tuningConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 8e2b6084b636..74640df82f31 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; import org.apache.druid.indexing.kinesis.KinesisIndexTask; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; @@ -54,7 +55,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.joda.time.DateTime; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index c901e685f7fe..a2b958fb334a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -90,8 +91,10 @@ static String getOrMakeId(String id, final String typeName, String dataSource, @ } final List objects = new ArrayList<>(); + final String suffix = RandomIdUtils.getRandomId(); objects.add(typeName); objects.add(dataSource); + objects.add(suffix); if (interval != null) { objects.add(interval.getStart()); objects.add(interval.getEnd()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 136a57a1220e..57081f52afb6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -88,6 +87,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler { private static final Logger log = new Logger(HadoopIndexTask.class); private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json"; + private static final String TYPE = "index_hadoop"; private TaskConfig taskConfig = null; private static String getTheDataSource(HadoopIngestionSpec spec) @@ -152,7 +152,7 @@ public HadoopIndexTask( ) { super( - id != null ? id : StringUtils.format("index_hadoop_%s_%s", getTheDataSource(spec), DateTimes.nowUtc()), + getOrMakeId(id, TYPE, getTheDataSource(spec)), getTheDataSource(spec), hadoopDependencyCoordinates == null ? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java similarity index 82% rename from indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java rename to indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java index 4715db8d3f35..a782b668899e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/utils/RandomIdUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java @@ -17,20 +17,17 @@ * under the License. */ -package org.apache.druid.indexing.seekablestream.utils; +package org.apache.druid.indexing.common.task.utils; -import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class RandomIdUtils { - private static final Random RANDOM = ThreadLocalRandom.current(); - public static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); for (int i = 0; i < Integer.BYTES * 2; ++i) { - suffix.append((char) ('a' + ((RANDOM.nextInt() >>> (i * 4)) & 0x0F))); + suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); } return suffix.toString(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 2da2f967cb9a..2f8118786b31 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -40,7 +39,6 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; @@ -125,17 +123,6 @@ public SeekableStreamIndexTask( : LockGranularity.SEGMENT; } - private static String makeTaskId(String dataSource, String type) - { - final String suffix = RandomIdUtils.getRandomId(); - return Joiner.on("_").join(type, dataSource, suffix); - } - - protected static String getFormattedId(String dataSource, String type) - { - return makeTaskId(dataSource, type); - } - protected static String getFormattedGroupId(String dataSource, String type) { return StringUtils.format("%s_%s", type, dataSource); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index 9be8f698f1ad..58a91becc523 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -28,8 +28,8 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils;