Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.indexing.kafka;

import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public KafkaIndexTask(
)
{
super(
id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id,
getOrMakeId(id, dataSchema.getDataSource(), TYPE),
taskResource,
dataSchema,
tuningConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
Expand All @@ -49,7 +50,6 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public KinesisIndexTask(
)
{
super(
id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id,
getOrMakeId(id, dataSchema.getDataSource(), TYPE),
taskResource,
dataSchema,
tuningConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
Expand All @@ -54,7 +55,6 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
Expand Down Expand Up @@ -90,8 +91,10 @@ static String getOrMakeId(String id, final String typeName, String dataSource, @
}

final List<Object> objects = new ArrayList<>();
final String suffix = RandomIdUtils.getRandomId();
objects.add(typeName);
objects.add(dataSource);
objects.add(suffix);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -88,6 +87,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
private static final String TYPE = "index_hadoop";
private TaskConfig taskConfig = null;

private static String getTheDataSource(HadoopIngestionSpec spec)
Expand Down Expand Up @@ -152,7 +152,7 @@ public HadoopIndexTask(
)
{
super(
id != null ? id : StringUtils.format("index_hadoop_%s_%s", getTheDataSource(spec), DateTimes.nowUtc()),
getOrMakeId(id, TYPE, getTheDataSource(spec)),
getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
* under the License.
*/

package org.apache.druid.indexing.seekablestream.utils;
package org.apache.druid.indexing.common.task.utils;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class RandomIdUtils
{
private static final Random RANDOM = ThreadLocalRandom.current();

public static String getRandomId()
{
final StringBuilder suffix = new StringBuilder(8);
for (int i = 0; i < Integer.BYTES * 2; ++i) {
suffix.append((char) ('a' + ((RANDOM.nextInt() >>> (i * 4)) & 0x0F)));
suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
}
return suffix.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
Expand All @@ -40,7 +39,6 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
Expand Down Expand Up @@ -125,17 +123,6 @@ public SeekableStreamIndexTask(
: LockGranularity.SEGMENT;
}

private static String makeTaskId(String dataSource, String type)
{
final String suffix = RandomIdUtils.getRandomId();
return Joiner.on("_").join(type, dataSource, suffix);
}

protected static String getFormattedId(String dataSource, String type)
{
return makeTaskId(dataSource, type);
}

protected static String getFormattedGroupId(String dataSource, String type)
{
return StringUtils.format("%s_%s", type, dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down