Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5a98275
Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks
jihoonson Aug 3, 2019
77e2128
kill runner when it's ready
jihoonson Aug 3, 2019
31faec2
add comment
jihoonson Aug 3, 2019
13fd909
kill run thread
jihoonson Aug 3, 2019
befedf6
fix test
jihoonson Aug 3, 2019
20a502d
Take closeable out of Appenderator
jihoonson Aug 5, 2019
6dbbf71
add javadoc
jihoonson Aug 6, 2019
4404c2e
fix test
jihoonson Aug 6, 2019
c13ebeb
fix test
jihoonson Aug 6, 2019
a89b3fc
update javadoc
jihoonson Aug 7, 2019
76f75fa
add javadoc about killed task
jihoonson Aug 7, 2019
e430747
address comment
jihoonson Aug 7, 2019
01f6679
Add support for parallel native indexing with shuffle for perfect rol…
jihoonson Aug 8, 2019
1f314fb
Add comment about volatiles
jihoonson Aug 8, 2019
895c158
fix test
jihoonson Aug 8, 2019
ba0a4e5
fix test
jihoonson Aug 8, 2019
cff9465
handling missing exceptions
jihoonson Aug 8, 2019
2e19f5b
more clear javadoc for stopGracefully
jihoonson Aug 8, 2019
a1e27d4
unused import
jihoonson Aug 9, 2019
6752994
update javadoc
jihoonson Aug 12, 2019
97c0f12
Add missing statement in javadoc
jihoonson Aug 12, 2019
52fe75f
Merge branch 'task-resource-cleaner' of github.com:jihoonson/druid in…
jihoonson Aug 12, 2019
d531a88
Merge branch 'master' of github.com:apache/incubator-druid into super…
jihoonson Aug 13, 2019
8ddf3cc
address comments; fix doc
jihoonson Aug 14, 2019
2024fb2
add javadoc for isGuaranteedRollup
jihoonson Aug 14, 2019
68e118d
Rename confusing variable name and fix typos
jihoonson Aug 14, 2019
64c5837
fix typos; move fetch() to a better home; fix the expiration time
jihoonson Aug 15, 2019
62528b0
add support https
jihoonson Aug 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@
package org.apache.druid.data.input.impl.prefetch;

import com.google.common.base.Predicate;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;

Expand All @@ -39,7 +35,6 @@
* See the javadoc of {@link PrefetchableTextFilesFirehoseFactory} for more details.
*/
public class FileFetcher<T> extends Fetcher<T>

{
private static final int BUFFER_SIZE = 1024 * 4;
private final ObjectOpenFunction<T> openObjectFunction;
Expand All @@ -48,7 +43,7 @@ public class FileFetcher<T> extends Fetcher<T>
// maximum retry for fetching an object from the remote site
private final int maxFetchRetry;

public int getMaxFetchRetry()
private int getMaxFetchRetry()
{
return maxFetchRetry;
}
Expand All @@ -61,7 +56,7 @@ public int getMaxFetchRetry()
PrefetchConfig prefetchConfig,
ObjectOpenFunction<T> openObjectFunction,
Predicate<Throwable> retryCondition,
Integer maxFetchRetries
int maxFetchRetries
)
{

Expand Down Expand Up @@ -91,23 +86,15 @@ public int getMaxFetchRetry()
@Override
protected long download(T object, File outFile) throws IOException
{
try {
return RetryUtils.retry(
() -> {
try (final InputStream is = openObjectFunction.open(object);
final OutputStream os = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(is, os, buffer);
}
},
retryCondition,
outFile::delete,
maxFetchRetry + 1,
StringUtils.format("Failed to download object[%s]", object)
);
}
catch (Exception e) {
throw new IOException(e);
}
return FileUtils.copyLarge(
object,
openObjectFunction,
outFile,
buffer,
retryCondition,
maxFetchRetry + 1,
StringUtils.format("Failed to download object[%s]", object)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
Expand Down Expand Up @@ -258,6 +262,50 @@ public void close()
};
}

/**
* Copies data from the InputStream opened with objectOpenFunction to the given file.
* This method is supposed to be used for copying large files.
* The output file is deleted automatically if copy fails.
*
* @param object object to open
* @param objectOpenFunction function to open the given object
* @param outFile file to write data
* @param fetchBuffer a buffer to copy data from the input stream to the file
* @param retryCondition condition which should be satisfied for retry
* @param numRetries max number of retries
* @param messageOnRetry log message on retry
*
* @return the number of bytes copied
*/
public static <T> long copyLarge(
T object,
ObjectOpenFunction<T> objectOpenFunction,
File outFile,
byte[] fetchBuffer,
Predicate<Throwable> retryCondition,
int numRetries,
String messageOnRetry
) throws IOException
{
try {
return RetryUtils.retry(
() -> {
try (InputStream inputStream = objectOpenFunction.open(object);
OutputStream out = new FileOutputStream(outFile)) {
return IOUtils.copyLarge(inputStream, out, fetchBuffer);
}
},
retryCondition,
outFile::delete,
numRetries,
messageOnRetry
);
}
catch (Exception e) {
throw new IOException(e);
}
}

public interface OutputStreamConsumer<T>
{
T apply(OutputStream outputStream) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static String toUpperCase(String s)
* @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20".
*/
@Nullable
public static String urlEncode(String s)
public static String urlEncode(@Nullable String s)
{
if (s == null) {
return null;
Expand Down
8 changes: 4 additions & 4 deletions docs/content/ingestion/hadoop-vs-native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ ingestion method.

| |Hadoop-based ingestion|Native parallel ingestion|Native local ingestion|
|---|----------------------|-------------------------|----------------------|
| Parallel indexing | Always parallel | Parallel if firehose is splittable | Always sequential |
| Supported indexing modes | Replacing mode | Both appending and replacing modes | Both appending and replacing modes |
| Parallel indexing | Always parallel | Parallel if firehose is splittable <br/> & maxNumSubTasks > 1 in tuningConfig | Always sequential |
| Supported indexing modes | Overwriting mode | Both appending and overwriting modes | Both appending and overwriting modes |
| External dependency | Hadoop (it internally submits Hadoop jobs) | No dependency | No dependency |
| Supported [rollup modes](./index.html#roll-up-modes) | Perfect rollup | Best-effort rollup | Both perfect and best-effort rollup |
| Supported partitioning methods | [Both Hash-based and range partitioning](./hadoop.html#partitioning-specification) | N/A | Hash-based partitioning (when `forceGuaranteedRollup` = true) |
| Supported [rollup modes](./index.html#roll-up-modes) | Perfect rollup | Both perfect and best-effort rollup | Both perfect and best-effort rollup |
| Supported partitioning methods | [Both Hash-based and range partitioning](./hadoop.html#partitioning-specification) | Hash-based partitioning (when `forceGuaranteedRollup` = true) | Hash-based partitioning (when `forceGuaranteedRollup` = true) |
| Supported input locations | All locations accessible via HDFS client or Druid dataSource | All implemented [firehoses](./firehose.html) | All implemented [firehoses](./firehose.html) |
| Supported file formats | All implemented Hadoop InputFormats | Currently text file formats (CSV, TSV, JSON) by default. Additional formats can be added though a [custom extension](../development/modules.html) implementing [`FiniteFirehoseFactory`](https://github.com/apache/incubator-druid/blob/master/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java) | Currently text file formats (CSV, TSV, JSON) by default. Additional formats can be added though a [custom extension](../development/modules.html) implementing [`FiniteFirehoseFactory`](https://github.com/apache/incubator-druid/blob/master/core/src/main/java/org/apache/druid/data/input/FiniteFirehoseFactory.java) |
| Saving parse exceptions in ingestion report | Currently not supported | Currently not supported | Supported |
Expand Down
Loading