Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

/**
* A class storing some attributes of an input file.
* This information is used to make splits in the parallel indexing.
*
* @see SplitHintSpec
* @see org.apache.druid.data.input.impl.SplittableInputSource
*/
public class InputFileAttribute
{
/**
* The size of the input file.
*/
private final long size;

public InputFileAttribute(long size)
{
this.size = size;
}

public long getSize()
{
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Input unit for distributed batch ingestion. Used in {@link FiniteFirehoseFactory}.
* Input unit for distributed batch ingestion. Used in {@link org.apache.druid.data.input.impl.SplittableInputSource}.
* An {@link InputSplit} represents the input data processed by a {@code org.apache.druid.indexing.common.task.Task}.
*/
public class InputSplit<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;

/**
* A SplitHintSpec that can create splits of multiple files.
* A split created by this class can have one or more input files.
* If there is only one file in the split, its size can be larger than {@link #maxSplitSize}.
* If there are two or more files in the split, their total size cannot be larger than {@link #maxSplitSize}.
*/
public class MaxSizeSplitHintSpec implements SplitHintSpec
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should make spec classes be pure data objects (or beans). Adding methods like split to them makes them complicated and adds logic that makes it hard to version them in the future. We should think of data objects as literals, not as objects with business logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree it is a better structure, but the problem is there are too many classes doing this kind of things especially on the ingestion side. I don't think it's possible to apply the suggested design to all classes anytime soon. Also, I think it's better to promote SQL for ingestion as well so that Druid users don't have to worry about the API changes.

{
public static final String TYPE = "maxSize";

@VisibleForTesting
static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;

private final long maxSplitSize;

@JsonCreator
public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
{
this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
}

@JsonProperty
public long getMaxSplitSize()
{
return maxSplitSize;
}

@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
{
return new Iterator<List<T>>()
{
private T peeking;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simplify the logic of the next method below if you initialize peeking to inputIterator.next(), and only set peeking to null when inputIterator.hasNext() is false. In your next() below, you would just keeping shifting values from inputIterator into current after each iteration as long as there are more inputs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how it works. peeking is to keep the last fetched input from the underlying iterator because it can be returned or not based on the total size of inputs in the current list. If the last fetched input was not added, it should be returned in the following next() call.


@Override
public boolean hasNext()
{
return peeking != null || inputIterator.hasNext();
}

@Override
public List<T> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final List<T> current = new ArrayList<>();
long splitSize = 0;
while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
if (peeking == null) {
peeking = inputIterator.next();
}
final long size = inputAttributeExtractor.apply(peeking).getSize();
if (current.isEmpty() || splitSize + size < maxSplitSize) {
current.add(peeking);
splitSize += size;
peeking = null;
} else {
break;
}
}
assert !current.isEmpty();
return current;
}
};
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MaxSizeSplitHintSpec that = (MaxSizeSplitHintSpec) o;
return maxSplitSize == that.maxSplitSize;
}

@Override
public int hashCode()
{
return Objects.hash(maxSplitSize);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equals and hashCode need unit tests

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

/**
* {@link SplitHintSpec} for IngestSegmentFirehoseFactory.
* {@link SplitHintSpec} for IngestSegmentFirehoseFactory and DruidInputSource.
*
* In DruidInputSource, this spec is converted into {@link MaxSizeSplitHintSpec}. As a result, its {@link #split}
* method is never called (IngestSegmentFirehoseFactory creates splits on its own instead of calling the
* {@code split()} method). This doesn't necessarily mean this class is deprecated in favor of the MaxSizeSplitHintSpec.
* We may want to create more optimized splits in the future. For example, segments can be split to maximize the rollup
* ratio if the segments have different sets of columns or even different value ranges of columns.
*/
public class SegmentsSplitHintSpec implements SplitHintSpec
{
Expand All @@ -41,9 +50,7 @@ public class SegmentsSplitHintSpec implements SplitHintSpec
private final long maxInputSegmentBytesPerTask;

@JsonCreator
public SegmentsSplitHintSpec(
@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask
)
public SegmentsSplitHintSpec(@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask)
{
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
Expand All @@ -56,6 +63,13 @@ public long getMaxInputSegmentBytesPerTask()
return maxInputSegmentBytesPerTask;
}

@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this method really doesn't belong here if not all subclasses or implementation need it? Or should this class be abstract instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment about it.

{
// This method is not supported currently, but we may want to implement in the future to create optimized splits.
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.SplittableInputSource;

import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

/**
* In native parallel indexing, the supervisor task partitions input data into splits and assigns each of them
* to a single sub task. How to create splits could mainly depend on the input file format, but sometimes druid users
Expand All @@ -37,8 +41,16 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class)
@Type(name = SegmentsSplitHintSpec.TYPE, value = SegmentsSplitHintSpec.class),
@Type(name = MaxSizeSplitHintSpec.TYPE, value = MaxSizeSplitHintSpec.class)
})
public interface SplitHintSpec
{
/**
* Returns an iterator of splits. A split has a list of files of the type {@link T}.
*
* @param inputIterator that returns input files.
* @param inputAttributeExtractor to create {@link InputFileAttribute} for each input file.
*/
<T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource
implements SplittableInputSource<CloudObjectLocation>
public abstract class CloudObjectInputSource extends AbstractInputSource
implements SplittableInputSource<List<CloudObjectLocation>>
{
private final List<URI> uris;
private final List<URI> prefixes;
Expand Down Expand Up @@ -90,30 +91,32 @@ public List<CloudObjectLocation> getObjects()
* Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}.
*/
protected abstract T createEntity(InputSplit<CloudObjectLocation> split);
protected abstract InputEntity createEntity(CloudObjectLocation location);

/**
* Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*/
protected abstract Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream();
protected abstract Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(SplitHintSpec splitHintSpec);

@Override
public Stream<InputSplit<CloudObjectLocation>> createSplits(
public Stream<InputSplit<List<CloudObjectLocation>>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.stream().map(InputSplit::new);
return objects.stream().map(object -> new InputSplit<>(Collections.singletonList(object)));
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new);
return uris.stream()
.map(CloudObjectLocation::new)
.map(object -> new InputSplit<>(Collections.singletonList(object)));
}

return getPrefixesSplitStream();
return getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec));
}

@Override
Expand All @@ -127,7 +130,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp
return uris.size();
}

return Ints.checkedCast(getPrefixesSplitStream().count());
return Ints.checkedCast(getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec)).count());
}

@Override
Expand All @@ -146,7 +149,7 @@ protected InputSourceReader formattableReader(
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(this::createEntity),
createSplits(inputFormat, null).flatMap(split -> split.get().stream()).map(this::createEntity).iterator(),
temporaryDirectory
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected InputSourceReader formattableReader(
split.get(),
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
)),
)).iterator(),
temporaryDirectory
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected InputSourceReader formattableReader(
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
Stream.of(new ByteEntity(StringUtils.toUtf8(data))),
Stream.of(new ByteEntity(StringUtils.toUtf8(data))).iterator(),
temporaryDirectory
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* InputSourceReader iterating multiple {@link InputEntity}s. This class could be used for
Expand All @@ -48,23 +48,23 @@ public class InputEntityIteratingReader implements InputSourceReader
public InputEntityIteratingReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
Stream<InputEntity> sourceStream,
Iterator<? extends InputEntity> sourceIterator,
File temporaryDirectory
)
{
this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceStream.iterator()), temporaryDirectory);
this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceIterator), temporaryDirectory);
}

public InputEntityIteratingReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
CloseableIterator<InputEntity> sourceIterator,
CloseableIterator<? extends InputEntity> sourceCloseableIterator,
File temporaryDirectory
)
{
this.inputRowSchema = inputRowSchema;
this.inputFormat = inputFormat;
this.sourceIterator = sourceIterator;
this.sourceIterator = (CloseableIterator<InputEntity>) sourceCloseableIterator;
this.temporaryDirectory = temporaryDirectory;
}

Expand Down
Loading