Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/ingestion/update-existing-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ For example

#### `multi`

This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec.
This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. You can also use a `multi` inputSpec to combine data from multiple dataSources. However, each particular dataSource can only be specified one time.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each particular dataSource can only be specified one time

Out of curiosity, do we have this restriction before this PR? There should be no issue for production environments, but I wonder why this restriction is needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, you could only do one dataSource, so yeah we still had this restriction.

It's not really strictly necessary to have this restriction - I guess we could support adding multiple input specs referring to the same dataSource, but it would make the code more complicated and I didn't see a clear use case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, sounds good.

Note that, "useNewAggs" must be set to default value false to support delta-ingestion.

|Field|Type|Description|Required|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -56,82 +56,74 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
private static final Logger logger = new Logger(DatasourceInputFormat.class);

public static final String CONF_INPUT_SEGMENTS = "druid.segments";
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema";
public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec";
public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size";
private static final String CONF_DATASOURCES = "druid.datasource.input.datasources";
private static final String CONF_SCHEMA = "druid.datasource.input.schema";
private static final String CONF_SEGMENTS = "druid.datasource.input.segments";
private static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.input.split.max.size";

@Override
public List<InputSplit> getSplits(JobContext context) throws IOException
{
JobConf conf = new JobConf(context.getConfiguration());

String segmentsStr = Preconditions.checkNotNull(
conf.get(CONF_INPUT_SEGMENTS),
"No segments found to read"
);
List<WindowedDataSegment> segments = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
segmentsStr,
new TypeReference<List<WindowedDataSegment>>()
{
}
);
if (segments == null || segments.size() == 0) {
throw new ISE("No segments found to read");
}
List<String> dataSources = getDataSources(conf);
List<InputSplit> splits = new ArrayList<>();

// Note: log is splitted into two lines so that a new String is not generated to print it.
// segmentsStr could be quite large when re-indexing multiple months of data.
logger.info("Segment to read are...");
logger.info(segmentsStr);
for (String dataSource : dataSources) {
List<WindowedDataSegment> segments = getSegments(conf, dataSource);
if (segments == null || segments.size() == 0) {
throw new ISE("No segments found to read for dataSource[%s]", dataSource);
}

long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0);
if (maxSize < 0) {
long totalSize = 0;
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
// Note: Each segment is logged separately to avoid creating a huge String if we are loading lots of segments.
for (int i = 0; i < segments.size(); i++) {
final WindowedDataSegment segment = segments.get(i);
logger.info(
"Segment %,d/%,d for dataSource[%s] has identifier[%s], interval[%s]",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about debug?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is replacing an older log message that was also at info level so I thought it made sense to keep it that way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found it not very very useful to log all the segments especially if dealing with more than dozen but it is okay.

i,
segments.size(),
dataSource,
segment.getSegment().getIdentifier(),
segment.getInterval()
);
}
int mapTask = conf.getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;

long maxSize = getMaxSplitSize(conf, dataSource);
if (maxSize < 0) {
long totalSize = 0;
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
}
int mapTask = conf.getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
}
}

if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they
//are combined appropriately
Collections.sort(
segments,
new Comparator<WindowedDataSegment>()
{
@Override
public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
{
return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize());
}
}
);
}
if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they
//are combined appropriately
segments.sort(Comparator.comparingLong(s -> s.getSegment().getSize()));
}

List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;

List<InputSplit> splits = Lists.newArrayList();
org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(toDataSourceSplit(list, fio, conf));
list = new ArrayList<>();
size = 0;
}

List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;
list.add(segment);
size += segment.getSegment().getSize();
}

org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
if (list.size() > 0) {
splits.add(toDataSourceSplit(list, fio, conf));
list = Lists.newArrayList();
size = 0;
}

list.add(segment);
size += segment.getSegment().getSize();
}

if (list.size() > 0) {
splits.add(toDataSourceSplit(list, fio, conf));
}

logger.info("Number of splits [%d]", splits.size());
Expand Down Expand Up @@ -167,7 +159,7 @@ protected boolean isSplitable(FileSystem fs, Path file)
protected FileStatus[] listStatus(JobConf job) throws IOException
{
// to avoid globbing which needs input path should be hadoop-compatible (':' is not acceptable in path, etc.)
List<FileStatus> statusList = Lists.newArrayList();
List<FileStatus> statusList = new ArrayList<>();
for (Path path : FileInputFormat.getInputPaths(job)) {
// load spec in segment points specifically zip file itself
statusList.add(path.getFileSystem(job).getFileStatus(path));
Expand Down Expand Up @@ -250,4 +242,90 @@ static String[] getFrequentLocations(final Stream<String> locations)
.map(Map.Entry::getKey)
.toArray(String[]::new);
}

public static List<String> getDataSources(final Configuration conf) throws IOException
{
final String currentDatasources = conf.get(CONF_DATASOURCES);

if (currentDatasources == null) {
return Collections.emptyList();
}

return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
currentDatasources,
new TypeReference<List<String>>() {}
);
}

public static DatasourceIngestionSpec getIngestionSpec(final Configuration conf, final String dataSource)
throws IOException
{
final String specString = conf.get(StringUtils.format("%s.%s", CONF_SCHEMA, dataSource));
if (specString == null) {
throw new NullPointerException(StringUtils.format("null spec for dataSource[%s]", dataSource));
}

final DatasourceIngestionSpec spec = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
specString,
DatasourceIngestionSpec.class
);

if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}

if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}

return spec;
}

public static List<WindowedDataSegment> getSegments(final Configuration conf, final String dataSource)
throws IOException
{
return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
conf.get(StringUtils.format("%s.%s", CONF_SEGMENTS, dataSource)),
new TypeReference<List<WindowedDataSegment>>() {}
);
}

public static long getMaxSplitSize(final Configuration conf, final String dataSource)
{
return conf.getLong(StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, dataSource), 0L);
}

public static void addDataSource(
final Configuration conf,
final DatasourceIngestionSpec spec,
final List<WindowedDataSegment> segments,
final long maxSplitSize
) throws IOException
{
final List<String> dataSources = getDataSources(conf);

if (dataSources.contains(spec.getDataSource())) {
throw new ISE("Oops, cannot load the same dataSource twice!");
}

final List<String> newDataSources = new ArrayList<>(dataSources);
newDataSources.add(spec.getDataSource());

conf.set(CONF_DATASOURCES, HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(newDataSources));

conf.set(
StringUtils.format("%s.%s", CONF_SCHEMA, spec.getDataSource()),
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(spec)
);

conf.set(
StringUtils.format("%s.%s", CONF_SEGMENTS, spec.getDataSource()),
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments)
);

conf.set(
StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, spec.getDataSource()),
String.valueOf(maxSplitSize)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@

package io.druid.indexer.hadoop;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -63,11 +60,18 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
private int numRows;

@Override
public void initialize(InputSplit split, final TaskAttemptContext context)
public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException
{
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.JSON_MAPPER);

List<WindowedDataSegment> segments = ((DatasourceInputSplit) split).getSegments();
String dataSource = Iterators.getOnlyElement(
segments.stream()
.map(s -> s.getSegment().getDataSource())
.distinct()
.iterator()
);

spec = DatasourceInputFormat.getIngestionSpec(context.getConfiguration(), dataSource);
logger.info("load schema [%s]", spec);

List<WindowedStorageAdapter> adapters = Lists.transform(
segments,
Expand Down Expand Up @@ -160,27 +164,4 @@ public void close() throws IOException
FileUtils.deleteDirectory(dir);
}
}

private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper)
{
try {
String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema");
logger.info("load schema [%s]", schema);

DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class);

if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}

if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}

return spec;
}
catch (IOException ex) {
throw new RuntimeException("couldn't load segment load spec", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,8 @@ public Iterable<String> apply(WindowedDataSegment dataSegment)
config.getSchema().getDataSchema().getTransformSpec()
);

job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
DatasourceInputFormat.addDataSource(job.getConfiguration(), updatedIngestionSpec, segments, maxSplitSize);
MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class);

return job;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testReindexing() throws Exception
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"testds",
"interval",
INTERVAL_FULL
),
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testReindexingWithNewAggregators() throws Exception
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"testds",
"interval",
INTERVAL_FULL
),
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testReindexingWithPartialWindow() throws Exception
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"testds",
"interval",
INTERVAL_FULL
),
Expand Down Expand Up @@ -314,7 +314,7 @@ public void testDeltaIngestion() throws Exception
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"testds",
"interval",
INTERVAL_FULL
),
Expand Down
Loading