diff --git a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
index 4baacbcf2462..b5bc768870fd 100644
--- a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
+++ b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
@@ -57,20 +57,51 @@
* by this class provides three key functionalities.
*
*
- * - Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk
- * up to {@link #maxCacheCapacityBytes}. These caches are NOT deleted until the process terminates,
- * and thus can be used for future reads.
- * - Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
- * them. For the performance reason, prefetch technique is used, that is, when the size of remaining cached or
- * fetched data is smaller than {@link #prefetchTriggerBytes}, a background prefetch thread automatically starts to
- * fetch remaining objects.
- * - Retry: if an exception occurs while downloading an object, it retries again up to
- * {@link #maxFetchRetry}.
+ * -
+ * Caching: for the first call of {@link #connect(StringInputRowParser, File)}, it caches objects in a local disk
+ * up to {@link #maxCacheCapacityBytes}. These caches are NOT deleted until the process terminates,
+ * and thus can be used for future reads.
+ *
+ * -
+ * Fetching: when it reads all cached data, it fetches remaining objects into a local disk and reads data from
+ * them. For the performance reason, prefetch technique is used, that is, when the size of remaining cached or
+ * fetched data is smaller than {@link #prefetchTriggerBytes}, a background prefetch thread automatically starts to
+ * fetch remaining objects.
+ *
+ * -
+ * Retry: if an exception occurs while downloading an object, it retries again up to {@link #maxFetchRetry}.
+ *
*
*
* This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because
* IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of
* GranularitySpec is not specified.
+ *
+ * Prefetching can be turned on/off by setting {@link #maxFetchCapacityBytes}. Depending on prefetching is enabled or
+ * disabled, the behavior of the firehose is different like below.
+ *
+ *
+ * -
+ * If prefetch is enabled, PrefetchableTextFilesFirehose can fetch input objects in background.
+ *
+ * - When next() is called, it first checks that there are already fetched files in local storage.
+ *
+ * -
+ * If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file.
+ *
+ * -
+ * If there is no fetched files in local storage but some objects are still remained to be read, the firehose
+ * fetches one of input objects in background immediately. If an IOException occurs while downloading the object,
+ * it retries up to the maximum retry count. Finally, the firehose returns a {@link LineIterator} only when the
+ * download operation is successfully finished.
+ *
+ *
+ *
+ * -
+ * If prefetch is disabled, the firehose returns a {@link LineIterator} which directly reads the stream opened by
+ * {@link #openObjectStream}. If there is an IOException, it will throw it and the read will fail.
+ *
+ *
*/
public abstract class PrefetchableTextFilesFirehoseFactory
extends AbstractTextFilesFirehoseFactory
diff --git a/docs/content/development/extensions-contrib/azure.md b/docs/content/development/extensions-contrib/azure.md
index cb4709677655..3a3946accf48 100644
--- a/docs/content/development/extensions-contrib/azure.md
+++ b/docs/content/development/extensions-contrib/azure.md
@@ -58,8 +58,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|--------|-----------|-------|---------|
|type|This should be `static-azure-blobstore`.|N/A|yes|
|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
+|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
+|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an Azure object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an Azure object.|3|no|
diff --git a/docs/content/development/extensions-contrib/cloudfiles.md b/docs/content/development/extensions-contrib/cloudfiles.md
index d11a9a0ec823..954179d5fbdb 100644
--- a/docs/content/development/extensions-contrib/cloudfiles.md
+++ b/docs/content/development/extensions-contrib/cloudfiles.md
@@ -59,8 +59,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|type|This should be `static-cloudfiles`.|N/A|yes|
|blobs|JSON array of Cloud Files blobs.|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
-|prefetchTriggerBytes|Threshold to trigger prefetching Cloud Files objects.|maxFetchCapacityBytes / 2|no|
+|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
+|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|fetchTimeout|Timeout for fetching a Cloud Files object.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Cloud Files object.|3|no|
diff --git a/docs/content/development/extensions-contrib/google.md b/docs/content/development/extensions-contrib/google.md
index 0852b886cebc..7aa764728828 100644
--- a/docs/content/development/extensions-contrib/google.md
+++ b/docs/content/development/extensions-contrib/google.md
@@ -48,8 +48,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|--------|-----------|-------|---------|
|type|This should be `static-google-blobstore`.|N/A|yes|
|blobs|JSON array of Google Blobs.|N/A|yes|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
+|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
+|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching a Google Blob.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no|
diff --git a/docs/content/development/extensions-core/s3.md b/docs/content/development/extensions-core/s3.md
index 6a804f959975..f24a406946b6 100644
--- a/docs/content/development/extensions-core/s3.md
+++ b/docs/content/development/extensions-core/s3.md
@@ -40,8 +40,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch
|type|This should be `static-s3`.|N/A|yes|
|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|`uris` or `prefixes` must be set|
|prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|N/A|`uris` or `prefixes` must be set|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no|
+|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
+|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an s3 object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an s3 object.|3|no|
\ No newline at end of file
diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index 91d594ec8c10..aec3ea15f62d 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -36,6 +36,28 @@ A sample local firehose spec is shown below:
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes|
|baseDir|directory to search recursively for files to be ingested. |yes|
+#### HttpFirehose
+
+This Firehose can be used to read the data from remote sites via HTTP.
+A sample http firehose spec is shown below:
+
+```json
+{
+ "type" : "http",
+ "uris" : ["http://example.com/uri1", "http://example2.com/uri2"]
+}
+```
+
+The below configurations can be optionally used for tuning the firehose performance.
+
+|property|description|default|
+|--------|-----------|-------|
+|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
+|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
+|prefetchTriggerBytes|Threshold to trigger prefetching http objects.|maxFetchCapacityBytes / 2|
+|fetchTimeout|Timeout for fetching a http object.|60000|
+|maxFetchRetry|Maximum retry for fetching a http object.|3|
+
#### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java
index dd8a605af969..4678edbfe4d1 100644
--- a/server/src/main/java/io/druid/guice/FirehoseModule.java
+++ b/server/src/main/java/io/druid/guice/FirehoseModule.java
@@ -28,6 +28,7 @@
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
+import io.druid.segment.realtime.firehose.HttpFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@@ -54,6 +55,7 @@ public List extends Module> getJacksonModules()
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"),
+ new NamedType(HttpFirehoseFactory.class, "http"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining"),
new NamedType(FixedCountFirehoseFactory.class, "fixedCount")
diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java
new file mode 100644
index 000000000000..4a604ae7f93b
--- /dev/null
+++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory;
+import io.druid.java.util.common.CompressionUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+
+public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory
+{
+ private final List uris;
+
+ @JsonCreator
+ public HttpFirehoseFactory(
+ @JsonProperty("uris") List uris,
+ @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
+ @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
+ @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
+ @JsonProperty("fetchTimeout") Long fetchTimeout,
+ @JsonProperty("maxFetchRetry") Integer maxFetchRetry
+ )
+ {
+ super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
+ this.uris = uris;
+ }
+
+ @JsonProperty
+ public List getUris()
+ {
+ return uris;
+ }
+
+ @Override
+ protected Collection initObjects()
+ {
+ return uris;
+ }
+
+ @Override
+ protected InputStream openObjectStream(URI object) throws IOException
+ {
+ return object.toURL().openConnection().getInputStream();
+ }
+
+ @Override
+ protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException
+ {
+ return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
+ }
+}