Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
fb5a787
Native parallel indexing without shuffle
jihoonson Mar 15, 2018
fcaf1bf
fix build
jihoonson Mar 15, 2018
6ce2db8
fix ci
jihoonson Mar 15, 2018
0ace07a
fix ingestion without intervals
jihoonson Mar 19, 2018
2e8dbe0
fix retry
jihoonson Mar 20, 2018
a2b44f0
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Mar 20, 2018
67ca45c
fix retry
jihoonson Mar 20, 2018
cc8bd38
add it test
jihoonson Mar 22, 2018
2d2347b
use chat handler
jihoonson Mar 23, 2018
932f69c
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Mar 27, 2018
a83c76a
fix build
jihoonson Mar 27, 2018
8d68dba
add docs
jihoonson Mar 27, 2018
00be56c
fix ITUnionQueryTest
jihoonson Mar 28, 2018
b9724a5
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Mar 29, 2018
7f21927
working
jihoonson Apr 6, 2018
3a94432
fix failures
jihoonson Apr 9, 2018
c68b21a
disable metrics reporting
jihoonson Apr 10, 2018
fd7dda3
working
jihoonson Apr 13, 2018
b142a30
Fix split of static-s3 firehose
jihoonson Apr 13, 2018
1fe0a05
Add endpoints to supervisor task and a unit test for endpoints
jihoonson Apr 23, 2018
bd5d5d5
increase timeout in test
jihoonson Apr 24, 2018
9d5d7c0
Added doc
jihoonson Apr 24, 2018
702b2a5
Address comments
jihoonson Apr 24, 2018
5dc8fb1
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Apr 24, 2018
6d32f0f
Fix overlapping locks
jihoonson Apr 27, 2018
185549f
address comments
jihoonson Apr 28, 2018
9a8ccc4
Fix static s3 firehose
jihoonson Apr 28, 2018
2447bbf
Fix test
jihoonson Apr 28, 2018
bda4fec
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Apr 30, 2018
ac817c6
fix build
jihoonson Apr 30, 2018
4924fa5
fix test
jihoonson May 1, 2018
dabb8bf
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson May 4, 2018
0998e71
fix typo in docs
jihoonson May 4, 2018
f306338
add missing maxBytesInMemory to doc
jihoonson May 4, 2018
e43adf2
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson May 9, 2018
7c7af69
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson May 17, 2018
b077c92
address comments
jihoonson May 21, 2018
3631e20
fix race in test
jihoonson May 24, 2018
b2b1de0
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson May 29, 2018
acb5305
fix test
jihoonson May 31, 2018
28e310d
Rename to ParallelIndexSupervisorTask
jihoonson Jun 1, 2018
a7fc8e8
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jun 6, 2018
00e2663
fix teamcity
jihoonson Jun 7, 2018
0a6eb30
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jun 21, 2018
c1c6f3f
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jun 22, 2018
86b4582
address comments
jihoonson Jun 28, 2018
6c85f62
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jun 28, 2018
892a0ff
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jun 28, 2018
77f6937
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jul 5, 2018
bf73f7e
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jul 10, 2018
a5254a6
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jul 12, 2018
6cd0dd6
Fix license
jihoonson Jul 12, 2018
509c9b3
addressing comments
jihoonson Jul 12, 2018
6a1bcfd
addressing comments
jihoonson Jul 12, 2018
d700395
indexTaskClient-based segmentAllocator instead of CountingActionBased…
jihoonson Jul 13, 2018
550fd38
Fix race in TaskMonitor and move HTTP endpoints to supervisorTask fro…
jihoonson Jul 17, 2018
77935bd
Add more javadocs
jihoonson Jul 17, 2018
7e6b110
use StringUtils.nonStrictFormat for logging
jihoonson Jul 17, 2018
b7266ce
fix typo and remove unused class
jihoonson Jul 17, 2018
e75baa4
fix tests
jihoonson Jul 17, 2018
b8ce241
change package
jihoonson Jul 18, 2018
3b29f6e
fix strict build
jihoonson Jul 18, 2018
8da2971
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jul 30, 2018
cc111ae
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Jul 31, 2018
9853cfa
tmp
jihoonson Aug 1, 2018
20f7c7b
Fix overlord api according to the recent change in master
jihoonson Aug 1, 2018
826ae54
Fix it test
jihoonson Aug 1, 2018
97a7819
Merge branch 'master' of github.com:druid-io/druid into superbatch
jihoonson Aug 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions api/src/main/java/io/druid/data/input/FiniteFirehoseFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.data.input;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.druid.data.input.impl.InputRowParser;

import java.io.IOException;
import java.util.stream.Stream;

/**
* {@link FiniteFirehoseFactory} designed for batch processing. Its implementations assume that the amount of inputs is
* limited.
*
* @param <T> parser type
* @param <S> input split type
*/
public interface FiniteFirehoseFactory<T extends InputRowParser, S> extends FirehoseFactory<T>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) suggest using descriptive type names. I always have a hard time keeping track of what T and S represent

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather stick to the generic parameter naming conventions for Java. See 'Type Parameter Naming Conventions' section in https://docs.oracle.com/javase/tutorial/java/generics/types.html.

{
/**
* Returns true if this {@link FiniteFirehoseFactory} supports parallel batch indexing.
*/
@JsonIgnore
@Override
default boolean isSplittable()
{
return true;
}

/**
* Returns a {@link Stream} for {@link InputSplit}s. In parallel batch indexing, each {@link InputSplit} is processed
* by a sub task.
*
* Listing splits may cause high overhead in some implementations. In this case, {@link InputSplit}s should be listed
* lazily so that the listing overhead could be amortized.
*/
@JsonIgnore
Stream<InputSplit<S>> getSplits() throws IOException;

/**
* Returns number of splits returned by {@link #getSplits()}.
*/
@JsonIgnore
int getNumSplits() throws IOException;

/**
* Returns the same {@link FiniteFirehoseFactory} but with the given {@link InputSplit}. The returned
* {@link FiniteFirehoseFactory} is used by sub tasks in parallel batch indexing.
*/
FiniteFirehoseFactory<T, S> withSplit(InputSplit<S> split);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) would InputSplit<? extends S> split work here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S is the actual type of a split, and the split is designed to be associated to a certain firehose type. For example, it's File for LocalFirehose. URI is the split type for HttpFirehose.

I'm not sure we need to have several implementations for the same split type. Do you have any concrete example?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the most common scenario where something like this comes into play is if you have

withSplit(ImmutableList.of(testObject1,testObject2).toSplit()))

kind of thing where your test objects inherit the split type. In practice it can sometimes come up, but the method signature can be changed later if needed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this specific case, InputSplit<? extends S> split is needed only when testObject1 and testObject2 have different types (which are inherited from the same class). This means, they might need a special handling according to their types when subTasks process them.

However, this is not what I intended. InputSplit is designed to abstract the same type of input and all subTasks are expected to process the same split type.

}
5 changes: 5 additions & 0 deletions api/src/main/java/io/druid/data/input/FirehoseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ default Firehose connect(T parser, @Nullable File temporaryDirectory) throws IOE
{
return connect(parser);
}

default boolean isSplittable()
{
return false;
}
}
52 changes: 52 additions & 0 deletions api/src/main/java/io/druid/data/input/InputSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.data.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Input unit for distributed batch ingestion. Used in {@link FiniteFirehoseFactory}.
* An {@link InputSplit} represents the input data processed by a {@code io.druid.indexing.common.task.Task}.
*/
public class InputSplit<T>
{
private final T split;

@JsonCreator
public InputSplit(@JsonProperty("split") T split)
{
this.split = split;
}

@JsonProperty("split")
public T get()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSplit is more common syntax

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I think most of use cases would be like this:

final InputSplit split = someMethodToGetSplit();
...
final T actualSplit = split.get();

If we rename this method, the code would be like split.getSplit(). Do you think this is better? I have no strong opinion here.

{
return split;
}

@Override
public String toString()
{
return "InputSplit{" +
"split=" + split +
"}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.FiniteFirehoseFactory;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputSplit;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Stream;

/**
* This is an abstract class for firehose factory for making firehoses reading text files.
Expand All @@ -44,7 +46,7 @@
* @param <T> object type representing input data
*/
public abstract class AbstractTextFilesFirehoseFactory<T>
implements FirehoseFactory<StringInputRowParser>
implements FiniteFirehoseFactory<StringInputRowParser, T>
{
private static final Logger LOG = new Logger(AbstractTextFilesFirehoseFactory.class);

Expand All @@ -53,9 +55,7 @@ public abstract class AbstractTextFilesFirehoseFactory<T>
@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
}
initializeObjectsIfNeeded();
final Iterator<T> iterator = objects.iterator();
return new FileIteratingFirehose(
new Iterator<LineIterator>()
Expand All @@ -74,7 +74,7 @@ public LineIterator next()
}
final T object = iterator.next();
try {
return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), Charsets.UTF_8);
return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), StandardCharsets.UTF_8);
}
catch (Exception e) {
LOG.error(
Expand All @@ -90,6 +90,32 @@ public LineIterator next()
);
}

protected void initializeObjectsIfNeeded() throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
}
}

public List<T> getObjects()
{
return objects;
}

@Override
public Stream<InputSplit<T>> getSplits() throws IOException
{
initializeObjectsIfNeeded();
return getObjects().stream().map(InputSplit::new);
}

@Override
public int getNumSplits() throws IOException
{
initializeObjectsIfNeeded();
return getObjects().size();
}

/**
* Initialize objects to be read by this firehose. Since firehose factories are constructed whenever
* io.druid.indexing.common.task.Task objects are deserialized, actual initialization of objects is deferred
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/druid/indexer/RunnerTaskState.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public enum RunnerTaskState
WAITING,
PENDING,
RUNNING,
NONE; // is used for a completed task
NONE // is used for a completed task
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) is **also** used for a completed task ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like, but not sure. I don't want to change it in this PR.

}
16 changes: 16 additions & 0 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,20 @@ public int hashCode()
getErrorMsg()
);
}

@Override
public String toString()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this toString have errorMsg as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added.

{
return "TaskStatusPlus{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
", createdTime=" + createdTime +
", queueInsertionTime=" + queueInsertionTime +
", state=" + state +
", duration=" + duration +
", location=" + location +
", dataSource='" + dataSource + '\'' +
", errorMsg='" + errorMsg + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.io.CountingOutputStream;
import io.druid.data.input.FiniteFirehoseFactory;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputSplit;
import io.druid.data.input.Row;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -605,6 +607,12 @@ protected InputStream openObjectStream(File object, long start) throws IOExcepti
private int readCount;
private int numConnectionResets;

@Override
public FiniteFirehoseFactory<StringInputRowParser, File> withSplit(InputSplit<File> split)
{
throw new UnsupportedOperationException();
}

private class TestInputStream extends InputStream
{
private static final int NUM_READ_COUNTS_BEFORE_ERROR = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ void insert(
*/
Optional<StatusType> getStatus(String entryId);

@Nullable
TaskInfo<EntryType, StatusType> getTaskInfo(String entryId);

/**
* Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries
* created on or later than the given timestamp
Expand Down
3 changes: 3 additions & 0 deletions docs/content/development/extensions-contrib/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ The storage account is shared with the one used for Azure deep storage functiona

As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz

This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native_tasks.html#parallel-index-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.

Sample spec:

```json
Expand Down
3 changes: 3 additions & 0 deletions docs/content/development/extensions-contrib/cloudfiles.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ The storage account is shared with the one used for Racksapce's Cloud Files deep

As with the Azure blobstore, it is assumed to be gzipped if the extension ends in .gz

This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native_tasks.html#parallel-index-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.

Sample spec:

```json
Expand Down
3 changes: 3 additions & 0 deletions docs/content/development/extensions-contrib/google.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ This firehose ingests events, similar to the StaticS3Firehose, but from an Googl

As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz

This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native_tasks.html#parallel-index-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.

Sample spec:

```json
Expand Down
2 changes: 2 additions & 0 deletions docs/content/development/extensions-core/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/lat
## StaticS3Firehose

This firehose ingests events from a predefined list of S3 objects.
This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native_tasks.html#parallel-index-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.

Sample spec:

Expand Down
4 changes: 4 additions & 0 deletions docs/content/ingestion/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ For additional firehoses, please see our [extensions list](../development/extens

This Firehose can be used to read the data from files on local disk.
It can be used for POCs to ingest data on disk.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
A sample local firehose spec is shown below:

```json
Expand All @@ -39,6 +41,8 @@ A sample local firehose spec is shown below:
#### HttpFirehose

This Firehose can be used to read the data from remote sites via HTTP.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
A sample http firehose spec is shown below:

```json
Expand Down
Loading