Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,51 @@
* by this class provides three key functionalities.
*
* <ul>
* <li>Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk
* up to {@link #maxCacheCapacityBytes}. These caches are NOT deleted until the process terminates,
* and thus can be used for future reads.</li>
* <li>Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
* them. For the performance reason, prefetch technique is used, that is, when the size of remaining cached or
* fetched data is smaller than {@link #prefetchTriggerBytes}, a background prefetch thread automatically starts to
* fetch remaining objects.</li>
* <li>Retry: if an exception occurs while downloading an object, it retries again up to
* {@link #maxFetchRetry}.</li>
* <li>
* Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk
* up to {@link #maxCacheCapacityBytes}. These caches are NOT deleted until the process terminates,
* and thus can be used for future reads.
* </li>
* <li>
* Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
* them. For the performance reason, prefetch technique is used, that is, when the size of remaining cached or
* fetched data is smaller than {@link #prefetchTriggerBytes}, a background prefetch thread automatically starts to
* fetch remaining objects.
* </li>
* <li>
* Retry: if an exception occurs while downloading an object, it retries again up to {@link #maxFetchRetry}.
* </li>
* </ul>
*
* This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because
* IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of
* GranularitySpec is not specified.
*
* Prefetching can be turned on/off by setting {@link #maxFetchCapacityBytes}. Depending on prefetching is enabled or
* disabled, the behavior of the firehose is different like below.
*
* <ol>
* <li>
* If prefetch is enabled, PrefetchableTextFilesFirehose can fetch input objects in background.
* </li>
* <li> When next() is called, it first checks that there are already fetched files in local storage.
* <ol>
* <li>
* If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file.
* </li>
* <li>
* If there is no fetched files in local storage but some objects are still remained to be read, the firehose
* fetches one of input objects in background immediately. If an IOException occurs while downloading the object,
* it retries up to the maximum retry count. Finally, the firehose returns a {@link LineIterator} only when the
* download operation is successfully finished.
* </li>
* </ol>
* </li>
* <li>
* If prefetch is disabled, the firehose returns a {@link LineIterator} which directly reads the stream opened by
* {@link #openObjectStream}. If there is an IOException, it will throw it and the read will fail.
* </li>
* </ol>
*/
public abstract class PrefetchableTextFilesFirehoseFactory<ObjectType>
extends AbstractTextFilesFirehoseFactory<ObjectType>
Expand Down
4 changes: 2 additions & 2 deletions docs/content/development/extensions-contrib/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|--------|-----------|-------|---------|
|type|This should be `static-azure-blobstore`.|N/A|yes|
|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an Azure object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an Azure object.|3|no|
Expand Down
4 changes: 2 additions & 2 deletions docs/content/development/extensions-contrib/cloudfiles.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|type|This should be `static-cloudfiles`.|N/A|yes|
|blobs|JSON array of Cloud Files blobs.|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Cloud Files objects.|maxFetchCapacityBytes / 2|no|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|fetchTimeout|Timeout for fetching a Cloud Files object.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Cloud Files object.|3|no|

Expand Down
4 changes: 2 additions & 2 deletions docs/content/development/extensions-contrib/google.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|--------|-----------|-------|---------|
|type|This should be `static-google-blobstore`.|N/A|yes|
|blobs|JSON array of Google Blobs.|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching a Google Blob.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no|
Expand Down
4 changes: 2 additions & 2 deletions docs/content/development/extensions-core/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|type|This should be `static-s3`.|N/A|yes|
|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|`uris` or `prefixes` must be set|
|prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|N/A|`uris` or `prefixes` must be set|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an s3 object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an s3 object.|3|no|
22 changes: 22 additions & 0 deletions docs/content/ingestion/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ A sample local firehose spec is shown below:
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes|
|baseDir|directory to search recursively for files to be ingested. |yes|

#### HttpFirehose

This Firehose can be used to read the data from remote sites via HTTP.
A sample http firehose spec is shown below:

```json
{
"type" : "http",
"uris" : ["http://example.com/uri1", "http://example2.com/uri2"]
}
```

The below configurations can be optionally used for tuning the firehose performance.

|property|description|default|
|--------|-----------|-------|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
|prefetchTriggerBytes|Threshold to trigger prefetching http objects.|maxFetchCapacityBytes / 2|
|fetchTimeout|Timeout for fetching a http object.|60000|
|maxFetchRetry|Maximum retry for fetching a http object.|3|

#### IngestSegmentFirehose

This Firehose can be used to read the data from existing druid segments.
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/io/druid/guice/FirehoseModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
import io.druid.segment.realtime.firehose.HttpFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
Expand All @@ -54,6 +55,7 @@ public List<? extends Module> getJacksonModules()
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(HttpFirehoseFactory.class, "http"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining"),
new NamedType(FixedCountFirehoseFactory.class, "fixedCount")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.List;

public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
{
private final List<URI> uris;

@JsonCreator
public HttpFirehoseFactory(
@JsonProperty("uris") List<URI> uris,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.uris = uris;
}

@JsonProperty
public List<URI> getUris()
{
return uris;
}

@Override
protected Collection<URI> initObjects()
{
return uris;
}

@Override
protected InputStream openObjectStream(URI object) throws IOException
{
return object.toURL().openConnection().getInputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will there be duplication in the case when this returns an InputStream ... which is partially read and then there is a temporary network glitch and consequently an IOException. parent class does the retry (which starts reading data from beginning again) and ends up introducing duplicates ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine.

  1. If prefetch is enabled,
    1.1) PrefetchableTextFilesFirehose can fetch http objects in background. When next() is called, it first checks that there are already fetched files in local storage. If exists, it simply chooses a fetched file and returns a LineIterator for that file.
    1.2) If there is no fetched files in local storage but some objects are still remained to be read, PrefetchableTextFilesFirehose fetches one of objects in background immediately. If an IOException occurs while downloading the object, it retries up to the maximum retry count. Finally, PrefetchableTextFilesFirehose returns a LineIterator only when the download operation is successfully finished.
  2. If prefetch is disabled, PrefetchableTextFilesFirehose returns a LineIterator which directly reads the stream opened by openObjectStream(). If there is an IOException, it will throw it and the read will fail.

Does it make sense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I see from PrefetchTextFilesFirehoseFactory.download(..) that whole contents are downloaded irrespective of cache/prefetch capacity bytes , which is fair to keep implementation simpler . it will be nice to document it in some way though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll add a document about the behavior of PrefetchableTextFilesFirehose somewhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the behavior of PrefetchableTextFilesFirehose to its java doc.

}

@Override
protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
}
}