diff --git a/api/src/main/java/io/druid/data/input/FirehoseFactory.java b/api/src/main/java/io/druid/data/input/FirehoseFactory.java index 43cbf01111f9..2494c13ea716 100644 --- a/api/src/main/java/io/druid/data/input/FirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/FirehoseFactory.java @@ -20,12 +20,18 @@ package io.druid.data.input; import com.fasterxml.jackson.annotation.JsonTypeInfo; - import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.parsers.ParseException; +import java.io.File; import java.io.IOException; +/** + * FirehoseFactory creates a {@link Firehose} which is an interface holding onto the stream of incoming data. + * It currently provides two methods for creating a {@link Firehose} and their default implementations call each other + * for the backward compatibility. Implementations of this interface must implement one of these methods. + */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface FirehoseFactory { @@ -36,7 +42,31 @@ public interface FirehoseFactory * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on * invalid configuration is preferred over returning null. + * + * @param parser an input row parser */ - public Firehose connect(T parser) throws IOException, ParseException; + @Deprecated + default Firehose connect(T parser) throws IOException, ParseException + { + return connect(parser, null); + } + /** + * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to + * call hasMore() on the returned Firehose (which might subsequently block). + *

+ * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return + * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on + * invalid configuration is preferred over returning null. + *

+ * Some fire hoses like {@link PrefetchableTextFilesFirehoseFactory} may use a temporary + * directory to cache data in it. + * + * @param parser an input row parser + * @param temporaryDirectory a directory where temporary files are stored + */ + default Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException + { + return connect(parser); + } } diff --git a/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java new file mode 100644 index 000000000000..a8428b8fd299 --- /dev/null +++ b/api/src/main/java/io/druid/data/input/impl/AbstractTextFilesFirehoseFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.impl; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.java.util.common.logger.Logger; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * This is an abstract class for firehose factory for making firehoses reading text files. + * It provides an unified {@link #connect(StringInputRowParser, File)} implementation for its subclasses. + * + * @param object type representing input data + */ +public abstract class AbstractTextFilesFirehoseFactory + implements FirehoseFactory +{ + private static final Logger LOG = new Logger(AbstractTextFilesFirehoseFactory.class); + + private List objects; + + @Override + public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException + { + if (objects == null) { + objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects")); + } + final Iterator iterator = objects.iterator(); + return new FileIteratingFirehose( + new Iterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public LineIterator next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final ObjectType object = iterator.next(); + try { + return IOUtils.lineIterator(wrapObjectStream(object, openObjectStream(object)), Charsets.UTF_8); + } + catch (Exception e) { + LOG.error( + e, + "Exception reading object[%s]", + object + ); + throw Throwables.propagate(e); + } + } + }, + firehoseParser + ); + } + + /** + * Initialize objects to be read by this firehose. Since firehose factories are constructed whenever + * io.druid.indexing.common.task.Task objects are deserialized, actual initialization of objects is deferred + * until {@link #connect(StringInputRowParser, File)} is called. + * + * @return a collection of initialized objects. + */ + protected abstract Collection initObjects() throws IOException; + + /** + * Open an input stream from the given object. If the object is compressed, this method should return a byte stream + * as it is compressed. The object compression should be handled in {@link #wrapObjectStream(Object, InputStream)}. + * + * @param object an object to be read + * + * @return an input stream for the object + * + * @throws IOException + */ + protected abstract InputStream openObjectStream(ObjectType object) throws IOException; + + /** + * Wrap the given input stream if needed. The decompression logic should be applied to the given stream if the object + * is compressed. + * + * @param object an input object + * @param stream a stream for the object + * @return + * @throws IOException + */ + protected abstract InputStream wrapObjectStream(ObjectType object, InputStream stream) throws IOException; +} diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 648831bece31..d1d0606ea49e 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -24,6 +24,7 @@ import io.druid.utils.Runnables; import org.apache.commons.io.LineIterator; +import java.io.Closeable; import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; @@ -37,13 +38,25 @@ public class FileIteratingFirehose implements Firehose private LineIterator lineIterator = null; + private final Closeable closer; + public FileIteratingFirehose( Iterator lineIterators, StringInputRowParser parser ) + { + this(lineIterators, parser, null); + } + + public FileIteratingFirehose( + Iterator lineIterators, + StringInputRowParser parser, + Closeable closer + ) { this.lineIterators = lineIterators; this.parser = parser; + this.closer = closer; } @Override @@ -86,8 +99,24 @@ public Runnable commit() @Override public void close() throws IOException { - if (lineIterator != null) { - lineIterator.close(); + try { + if (lineIterator != null) { + lineIterator.close(); + } + } + catch (Throwable t) { + try { + if (closer != null) { + closer.close(); + } + } + catch (Exception e) { + t.addSuppressed(e); + } + throw t; + } + if (closer != null) { + closer.close(); } } } diff --git a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java new file mode 100644 index 000000000000..4baacbcf2462 --- /dev/null +++ b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java @@ -0,0 +1,506 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.impl; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.io.CountingOutputStream; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.druid.data.input.Firehose; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PrefetchableTextFilesFirehoseFactory is an abstract firehose factory for reading text files. The firehose returned + * by this class provides three key functionalities. + * + *

+ * + * This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because + * IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of + * GranularitySpec is not specified. + */ +public abstract class PrefetchableTextFilesFirehoseFactory + extends AbstractTextFilesFirehoseFactory +{ + private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class); + private static final long DEFAULT_MAX_CACHE_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB + private static final long DEFAULT_MAX_FETCH_CAPACITY_BYTES = 1024 * 1024 * 1024; // 1GB + private static final long DEFAULT_FETCH_TIMEOUT = 60_000; // 60 secs + private static final int DEFAULT_MAX_FETCH_RETRY = 3; + private static final String FETCH_FILE_PREFIX = "fetch-"; + + // The below two variables are roughly the max size of total cached/fetched objects, but the actual cached/fetched + // size can be larger. The reason is our current client implementations for cloud storages like s3 don't support range + // scan yet, so we must download the whole file at once. It's still possible for the size of cached/fetched data to + // not exceed these variables by estimating the after-fetch size, but it makes us consider the case when any files + // cannot be fetched due to their large size, which makes the implementation complicated. + private final long maxCacheCapacityBytes; + private final long maxFetchCapacityBytes; + + private final long prefetchTriggerBytes; + + // timeout for fetching an object from the remote site + private final long fetchTimeout; + + // maximum retry for fetching an object from the remote site + private final int maxFetchRetry; + + private final List cacheFiles = new ArrayList<>(); + private long totalCachedBytes; + + private List objects; + + private static ExecutorService createFetchExecutor() + { + return Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setNameFormat("firehose_fetch_%d") + .build() + ); + } + + public PrefetchableTextFilesFirehoseFactory( + Long maxCacheCapacityBytes, + Long maxFetchCapacityBytes, + Long prefetchTriggerBytes, + Long fetchTimeout, + Integer maxFetchRetry + ) + { + this.maxCacheCapacityBytes = maxCacheCapacityBytes == null + ? DEFAULT_MAX_CACHE_CAPACITY_BYTES + : maxCacheCapacityBytes; + this.maxFetchCapacityBytes = maxFetchCapacityBytes == null + ? DEFAULT_MAX_FETCH_CAPACITY_BYTES + : maxFetchCapacityBytes; + this.prefetchTriggerBytes = prefetchTriggerBytes == null + ? this.maxFetchCapacityBytes / 2 + : prefetchTriggerBytes; + this.fetchTimeout = fetchTimeout == null ? DEFAULT_FETCH_TIMEOUT : fetchTimeout; + this.maxFetchRetry = maxFetchRetry == null ? DEFAULT_MAX_FETCH_RETRY : maxFetchRetry; + } + + @Override + public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException + { + if (maxCacheCapacityBytes == 0 && maxFetchCapacityBytes == 0) { + return super.connect(firehoseParser, temporaryDirectory); + } + + if (objects == null) { + objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects")); + } + + Preconditions.checkState(temporaryDirectory.exists(), "temporaryDirectory[%s] does not exist", temporaryDirectory); + Preconditions.checkState( + temporaryDirectory.isDirectory(), + "temporaryDirectory[%s] is not a directory", + temporaryDirectory + ); + + // fetchExecutor is responsible for background data fetching + final ExecutorService fetchExecutor = createFetchExecutor(); + + return new FileIteratingFirehose( + new Iterator() + { + // When prefetching is enabled, fetchFiles and nextFetchIndex are updated by the fetchExecutor thread, but + // read by both the main thread (in hasNext()) and the fetchExecutor thread (in fetch()). To guarantee that + // fetchFiles and nextFetchIndex are updated atomically, this lock must be held before updating + // them. + private final Object fetchLock = new Object(); + private final LinkedBlockingQueue fetchFiles = new LinkedBlockingQueue<>(); + + // Number of bytes currently fetched files. + // This is updated when a file is successfully fetched or a fetched file is deleted. + private final AtomicLong fetchedBytes = new AtomicLong(0); + private final boolean cacheInitialized; + private final boolean prefetchEnabled; + + private Future fetchFuture; + private int cacheIterateIndex; + // nextFetchIndex indicates which object should be downloaded when fetch is triggered. + private int nextFetchIndex; + + { + cacheInitialized = totalCachedBytes > 0; + prefetchEnabled = maxFetchCapacityBytes > 0; + + if (cacheInitialized) { + nextFetchIndex = cacheFiles.size(); + } + if (prefetchEnabled) { + fetchIfNeeded(totalCachedBytes); + } + } + + private void fetchIfNeeded(long remainingBytes) + { + if ((fetchFuture == null || fetchFuture.isDone()) + && remainingBytes <= prefetchTriggerBytes) { + fetchFuture = fetchExecutor.submit( + () -> { + fetch(); + return null; + } + ); + } + } + + /** + * Fetch objects to a local disk up to {@link PrefetchableTextFilesFirehoseFactory#maxFetchCapacityBytes}. + * This method is not thread safe and must be called by a single thread. Note that even + * {@link PrefetchableTextFilesFirehoseFactory#maxFetchCapacityBytes} is 0, at least 1 file is always fetched. + * This is for simplifying design, and should be improved when our client implementations for cloud storages + * like S3 support range scan. + */ + private void fetch() throws Exception + { + for (int i = nextFetchIndex; i < objects.size() && fetchedBytes.get() <= maxFetchCapacityBytes; i++) { + final ObjectType object = objects.get(i); + LOG.info("Fetching object[%s], fetchedBytes[%d]", object, fetchedBytes.get()); + final File outFile = File.createTempFile(FETCH_FILE_PREFIX, null, temporaryDirectory); + fetchedBytes.addAndGet(download(object, outFile, 0)); + synchronized (fetchLock) { + fetchFiles.put(new FetchedFile(object, outFile)); + nextFetchIndex++; + } + } + } + + /** + * Downloads an object. It retries downloading {@link PrefetchableTextFilesFirehoseFactory#maxFetchRetry} + * times and throws an exception. + * + * @param object an object to be downloaded + * @param outFile a file which the object data is stored + * @param tryCount current retry count + * + * @return number of downloaded bytes + * + * @throws IOException + */ + private long download(ObjectType object, File outFile, int tryCount) throws IOException + { + try (final InputStream is = openObjectStream(object); + final CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(outFile))) { + IOUtils.copy(is, cos); + return cos.getCount(); + } + catch (IOException e) { + final int nextTry = tryCount + 1; + if (!Thread.currentThread().isInterrupted() && nextTry < maxFetchRetry) { + LOG.error(e, "Failed to download object[%s], retrying (%d of %d)", object, nextTry, maxFetchRetry); + outFile.delete(); + return download(object, outFile, nextTry); + } else { + LOG.error(e, "Failed to download object[%s], retries exhausted, aborting", object); + throw e; + } + } + } + + @Override + public boolean hasNext() + { + synchronized (fetchLock) { + return (cacheInitialized && cacheIterateIndex < cacheFiles.size()) + || !fetchFiles.isEmpty() + || nextFetchIndex < objects.size(); + } + } + + @Override + public LineIterator next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // If fetch() fails, hasNext() always returns true because nextFetchIndex must be smaller than the number + // of objects, which means next() is always called. The below method checks that fetch() threw an exception + // and propagates it if exists. + checkFetchException(); + + final OpenedObject openedObject; + + try { + // Check cache first + if (cacheInitialized && cacheIterateIndex < cacheFiles.size()) { + final FetchedFile fetchedFile = cacheFiles.get(cacheIterateIndex++); + openedObject = new OpenedObject(fetchedFile, getNoopCloser()); + } else if (prefetchEnabled) { + openedObject = openObjectFromLocal(); + } else { + openedObject = openObjectFromRemote(); + } + + final InputStream stream = wrapObjectStream( + openedObject.object, + openedObject.objectStream + ); + + return new ResourceCloseableLineIterator( + new InputStreamReader(stream, Charsets.UTF_8), + openedObject.resourceCloser + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private void checkFetchException() + { + if (fetchFuture != null && fetchFuture.isDone()) { + try { + fetchFuture.get(); + fetchFuture = null; + } + catch (InterruptedException | ExecutionException e) { + throw Throwables.propagate(e); + } + } + } + + private OpenedObject openObjectFromLocal() throws IOException + { + final FetchedFile fetchedFile; + final Closeable resourceCloser; + + if (!fetchFiles.isEmpty()) { + // If there are already fetched files, use them + fetchedFile = fetchFiles.poll(); + resourceCloser = cacheIfPossibleAndGetCloser(fetchedFile, fetchedBytes); + fetchIfNeeded(fetchedBytes.get()); + } else { + // Otherwise, wait for fetching + try { + fetchIfNeeded(fetchedBytes.get()); + fetchedFile = fetchFiles.poll(fetchTimeout, TimeUnit.MILLISECONDS); + if (fetchedFile == null) { + // Check the latest fetch is failed + checkFetchException(); + // Or throw a timeout exception + throw new RuntimeException(new TimeoutException()); + } + resourceCloser = cacheIfPossibleAndGetCloser(fetchedFile, fetchedBytes); + // trigger fetch again for subsequent next() calls + fetchIfNeeded(fetchedBytes.get()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + return new OpenedObject(fetchedFile, resourceCloser); + } + + private OpenedObject openObjectFromRemote() throws IOException + { + final OpenedObject openedObject; + final Closeable resourceCloser = getNoopCloser(); + + if (totalCachedBytes < maxCacheCapacityBytes) { + LOG.info("Caching object[%s]", objects.get(nextFetchIndex)); + try { + // Since maxFetchCapacityBytes is 0, at most one file is fetched. + fetch(); + FetchedFile fetchedFile = fetchFiles.poll(); + if (fetchedFile == null) { + throw new ISE("Cannot fetch object[%s]", objects.get(nextFetchIndex)); + } + cacheIfPossible(fetchedFile); + fetchedBytes.addAndGet(-fetchedFile.length()); + openedObject = new OpenedObject(fetchedFile, resourceCloser); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } else { + final ObjectType object = objects.get(nextFetchIndex++); + LOG.info("Reading object[%s]", object); + openedObject = new OpenedObject(object, openObjectStream(object), resourceCloser); + } + return openedObject; + } + }, + firehoseParser, + () -> { + fetchExecutor.shutdownNow(); + try { + Preconditions.checkState(fetchExecutor.awaitTermination(fetchTimeout, TimeUnit.MILLISECONDS)); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ISE("Failed to shutdown fetch executor during close"); + } + } + ); + } + + private boolean cacheIfPossible(FetchedFile fetchedFile) + { + // maxCacheCapacityBytes is a rough limit, so if totalCachedBytes is larger than it, no more caching is + // allowed. + if (totalCachedBytes < maxCacheCapacityBytes) { + cacheFiles.add(fetchedFile); + totalCachedBytes += fetchedFile.length(); + return true; + } else { + return false; + } + } + + private Closeable cacheIfPossibleAndGetCloser(FetchedFile fetchedFile, AtomicLong fetchedBytes) + { + final Closeable closeable; + if (cacheIfPossible(fetchedFile)) { + closeable = getNoopCloser(); + // If the fetchedFile is cached, make a room for fetching more data immediately. + // This is because cache space and fetch space are separated. + fetchedBytes.addAndGet(-fetchedFile.length()); + } else { + closeable = getFetchedFileCloser(fetchedFile, fetchedBytes); + } + return closeable; + } + + private Closeable getNoopCloser() + { + return () -> {}; + } + + private Closeable getFetchedFileCloser( + final FetchedFile fetchedFile, + final AtomicLong fetchedBytes + ) + { + return () -> { + final long fileSize = fetchedFile.length(); + fetchedFile.delete(); + fetchedBytes.addAndGet(-fileSize); + }; + } + + /** + * This class calls the {@link Closeable#close()} method of the resourceCloser when it is closed. + */ + static class ResourceCloseableLineIterator extends LineIterator + { + private final Closeable resourceCloser; + + public ResourceCloseableLineIterator(Reader reader, Closeable resourceCloser) throws IllegalArgumentException + { + super(reader); + this.resourceCloser = resourceCloser; + } + + @Override + public void close() + { + super.close(); + try { + resourceCloser.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + + private class FetchedFile + { + private final ObjectType object; + private final File file; + + public FetchedFile(ObjectType object, File file) + { + this.object = object; + this.file = file; + } + + public long length() + { + return file.length(); + } + + public void delete() + { + file.delete(); + } + } + + private class OpenedObject + { + private final ObjectType object; + private final InputStream objectStream; + private final Closeable resourceCloser; + + public OpenedObject(FetchedFile fetchedFile, Closeable resourceCloser) throws IOException + { + this.object = fetchedFile.object; + this.objectStream = FileUtils.openInputStream(fetchedFile.file); + this.resourceCloser = resourceCloser; + } + + public OpenedObject(ObjectType object, InputStream objectStream, Closeable resourceCloser) + { + this.object = object; + this.objectStream = objectStream; + this.resourceCloser = resourceCloser; + } + } +} diff --git a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java index 30239233bcbd..f715b78c90b4 100644 --- a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java +++ b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java @@ -30,7 +30,9 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.io.Closeable; import java.io.IOException; +import java.io.Reader; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; @@ -66,6 +68,8 @@ public static Collection constructorFeeder() throws IOException return args; } + private static final char[] LINE_CHARS = "\n".toCharArray(); + private final StringInputRowParser parser; private final List inputs; private final List expectedResults; @@ -122,4 +126,45 @@ public void testFirehose() throws Exception Assert.assertEquals(expectedResults, results); } } + + @Test(expected = RuntimeException.class) + public void testClose() throws IOException + { + final LineIterator lineIterator = new LineIterator(new Reader() + { + @Override + public int read(char[] cbuf, int off, int len) throws IOException + { + System.arraycopy(LINE_CHARS, 0, cbuf, 0, LINE_CHARS.length); + return LINE_CHARS.length; + } + + @Override + public void close() throws IOException + { + throw new RuntimeException("close test for FileIteratingFirehose"); + } + }); + + final TestCloseable closeable = new TestCloseable(); + final FileIteratingFirehose firehose = new FileIteratingFirehose( + ImmutableList.of(lineIterator).iterator(), + parser, + closeable + ); + firehose.hasMore(); // initialize lineIterator + firehose.close(); + Assert.assertTrue(closeable.closed); + } + + private static final class TestCloseable implements Closeable + { + private boolean closed; + + @Override + public void close() throws IOException + { + closed = true; + } + } } diff --git a/api/src/test/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactoryTest.java b/api/src/test/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactoryTest.java new file mode 100644 index 000000000000..8fefb396ecf9 --- /dev/null +++ b/api/src/test/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactoryTest.java @@ -0,0 +1,420 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.impl; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.druid.data.input.Firehose; +import io.druid.data.input.Row; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +public class PrefetchableTextFilesFirehoseFactoryTest +{ + private static File testDir; + private static File firehoseTempDir; + + private static final StringInputRowParser parser = new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")), + Lists.newArrayList(), + Lists.newArrayList() + ), + ",", + Arrays.asList("timestamp", "a", "b"), + false, + 0 + ), + Charsets.UTF_8.name() + ); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @BeforeClass + public static void setup() throws IOException + { + testDir = File.createTempFile(PrefetchableTextFilesFirehoseFactoryTest.class.getSimpleName(), "testDir"); + FileUtils.forceDelete(testDir); + FileUtils.forceMkdir(testDir); + + firehoseTempDir = File.createTempFile(PrefetchableTextFilesFirehoseFactoryTest.class.getSimpleName(), "baseDir"); + FileUtils.forceDelete(firehoseTempDir); + FileUtils.forceMkdir(firehoseTempDir); + + for (int i = 0; i < 10; i++) { + // Each file is 1390 bytes + try (final Writer writer = new BufferedWriter( + new FileWriter(new File(testDir, "test_" + i)) + )) { + for (int j = 0; j < 100; j++) { + final String a = (20171220 + i) + "," + i + "," + j + "\n"; + writer.write(a); + } + } + } + } + + @AfterClass + public static void teardown() throws IOException + { + FileUtils.forceDelete(testDir); + FileUtils.forceDelete(firehoseTempDir); + } + + private static void assertResult(List rows) + { + Assert.assertEquals(1000, rows.size()); + rows.sort((r1, r2) -> { + int c = r1.getTimestamp().compareTo(r2.getTimestamp()); + if (c != 0) { + return c; + } + c = Integer.valueOf(r1.getDimension("a").get(0)).compareTo(Integer.valueOf(r2.getDimension("a").get(0))); + if (c != 0) { + return c; + } + + return Integer.valueOf(r1.getDimension("b").get(0)).compareTo(Integer.valueOf(r2.getDimension("b").get(0))); + }); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 100; j++) { + final Row row = rows.get(i * 100 + j); + Assert.assertEquals(new DateTime(20171220 + i), row.getTimestamp()); + Assert.assertEquals(String.valueOf(i), row.getDimension("a").get(0)); + Assert.assertEquals(String.valueOf(j), row.getDimension("b").get(0)); + } + } + } + + @Test + public void testWithoutCacheAndFetch() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.with(testDir, 0, 0); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testWithoutCache() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.with(testDir, 0, 2048); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testWithZeroFetchCapacity() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.with(testDir, 2048, 0); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testWithCacheAndFetch() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.of(testDir); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testWithLargeCacheAndSmallFetch() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.with(testDir, 2048, 1024); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testWithSmallCacheAndLargeFetch() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.with(testDir, 1024, 2048); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testRetry() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(testDir, 1); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + + assertResult(rows); + } + + @Test + public void testMaxRetry() throws IOException + { + expectedException.expect(RuntimeException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class)); + expectedException.expectMessage("Exception for retry test"); + + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(testDir, 5); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + } + + @Test + public void testTimeout() throws IOException + { + expectedException.expect(RuntimeException.class); + expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.withSleepMillis(testDir, 1000); + + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + } + + @Test + public void testReconnect() throws IOException + { + final TestPrefetchableTextFilesFirehoseFactory factory = + TestPrefetchableTextFilesFirehoseFactory.of(testDir); + + for (int i = 0; i < 5; i++) { + final List rows = new ArrayList<>(); + try (Firehose firehose = factory.connect(parser, firehoseTempDir)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + assertResult(rows); + } + } + + static class TestPrefetchableTextFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory + { + private static final long defaultTimeout = 1000; + private final long sleepMillis; + private final File baseDir; + private int openExceptionCount; + + static TestPrefetchableTextFilesFirehoseFactory with(File baseDir, long cacheCapacity, long fetchCapacity) + { + return new TestPrefetchableTextFilesFirehoseFactory( + baseDir, + 1024, + cacheCapacity, + fetchCapacity, + defaultTimeout, + 3, + 0, + 0 + ); + } + + static TestPrefetchableTextFilesFirehoseFactory of(File baseDir) + { + return new TestPrefetchableTextFilesFirehoseFactory( + baseDir, + 1024, + 2048, + 2048, + defaultTimeout, + 3, + 0, + 0 + ); + } + + static TestPrefetchableTextFilesFirehoseFactory withOpenExceptions(File baseDir, int count) + { + return new TestPrefetchableTextFilesFirehoseFactory( + baseDir, + 1024, + 2048, + 2048, + defaultTimeout, + 3, + count, + 0 + ); + } + + static TestPrefetchableTextFilesFirehoseFactory withSleepMillis(File baseDir, long ms) + { + return new TestPrefetchableTextFilesFirehoseFactory( + baseDir, + 1024, + 2048, + 2048, + 100, + 3, + 0, + ms + ); + } + + public TestPrefetchableTextFilesFirehoseFactory( + File baseDir, + long prefetchTriggerThreshold, + long maxCacheCapacityBytes, + long maxFetchCapacityBytes, + long timeout, + int maxRetry, + int openExceptionCount, + long sleepMillis + ) + { + super( + maxCacheCapacityBytes, + maxFetchCapacityBytes, + prefetchTriggerThreshold, + timeout, + maxRetry + ); + this.openExceptionCount = openExceptionCount; + this.sleepMillis = sleepMillis; + this.baseDir = baseDir; + } + + @Override + protected Collection initObjects() + { + return FileUtils.listFiles( + Preconditions.checkNotNull(baseDir).getAbsoluteFile(), + TrueFileFilter.INSTANCE, + TrueFileFilter.INSTANCE + ); + } + + @Override + protected InputStream openObjectStream(File object) throws IOException + { + if (openExceptionCount > 0) { + openExceptionCount--; + throw new IOException("Exception for retry test"); + } + if (sleepMillis > 0) { + try { + Thread.sleep(sleepMillis); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return FileUtils.openInputStream(object); + } + + @Override + protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException + { + return stream; + } + } +} diff --git a/docs/content/development/extensions-contrib/azure.md b/docs/content/development/extensions-contrib/azure.md index 497735c7ddf9..cb4709677655 100644 --- a/docs/content/development/extensions-contrib/azure.md +++ b/docs/content/development/extensions-contrib/azure.md @@ -51,10 +51,18 @@ Sample spec: } ``` +This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or +shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. + |property|description|default|required?| |--------|-----------|-------|---------| -|type|This should be "static-azure-blobstore".|N/A|yes| +|type|This should be `static-azure-blobstore`.|N/A|yes| |blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no| +|fetchTimeout|Timeout for fetching an Azure object.|60000|no| +|maxFetchRetry|Maximum retry for fetching an Azure object.|3|no| Azure Blobs: diff --git a/docs/content/development/extensions-contrib/cloudfiles.md b/docs/content/development/extensions-contrib/cloudfiles.md index de56c6cc79cc..d11a9a0ec823 100644 --- a/docs/content/development/extensions-contrib/cloudfiles.md +++ b/docs/content/development/extensions-contrib/cloudfiles.md @@ -51,11 +51,18 @@ Sample spec: ] } ``` +This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or +shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. |property|description|default|required?| |--------|-----------|-------|---------| -|type|This should be "static-cloudfiles".|N/A|yes| +|type|This should be `static-cloudfiles`.|N/A|yes| |blobs|JSON array of Cloud Files blobs.|N/A|yes| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|prefetchTriggerBytes|Threshold to trigger prefetching Cloud Files objects.|maxFetchCapacityBytes / 2|no| +|fetchTimeout|Timeout for fetching a Cloud Files object.|60000|no| +|maxFetchRetry|Maximum retry for fetching a Cloud Files object.|3|no| Cloud Files Blobs: diff --git a/docs/content/development/extensions-contrib/google.md b/docs/content/development/extensions-contrib/google.md index 1c080c5c3dd4..0852b886cebc 100644 --- a/docs/content/development/extensions-contrib/google.md +++ b/docs/content/development/extensions-contrib/google.md @@ -41,10 +41,18 @@ Sample spec: } ``` +This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or +shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. + |property|description|default|required?| |--------|-----------|-------|---------| -|type|This should be "static-google-blobstore".|N/A|yes| +|type|This should be `static-google-blobstore`.|N/A|yes| |blobs|JSON array of Google Blobs.|N/A|yes| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no| +|fetchTimeout|Timeout for fetching a Google Blob.|60000|no| +|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no| Google Blobs: diff --git a/docs/content/development/extensions-core/s3.md b/docs/content/development/extensions-core/s3.md index 59f6a7084b3b..6a804f959975 100644 --- a/docs/content/development/extensions-core/s3.md +++ b/docs/content/development/extensions-core/s3.md @@ -32,7 +32,16 @@ Sample spec: } ``` +This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or +shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. + |property|description|default|required?| |--------|-----------|-------|---------| -|type|This should be "static-s3"|N/A|yes| -|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|yes| +|type|This should be `static-s3`.|N/A|yes| +|uris|JSON array of URIs where s3 files to be ingested are located.|N/A|`uris` or `prefixes` must be set| +|prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|N/A|`uris` or `prefixes` must be set| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no| +|fetchTimeout|Timeout for fetching an s3 object.|60000|no| +|maxFetchRetry|Maximum retry for fetching an s3 object.|3|no| \ No newline at end of file diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 6e01e82a386c..4e72e993c162 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -104,7 +104,6 @@ See [Ingestion](../ingestion/index.html) |type|The task type, this should always be "index".|none|yes| |firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no| -|skipFirehoseCaching|By default the IndexTask will fully read the supplied firehose to disk before processing the data. This prevents the task from doing multiple remote fetches and enforces determinism if more than one pass through the data is required. It also allows the task to retry fetching the data if the firehose throws an exception during reading. This requires sufficient disk space for the temporary cache.|false|no| #### TuningConfig diff --git a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index 9333e58126ee..4ac888fbb243 100644 --- a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -43,6 +43,7 @@ import twitter4j.User; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -113,7 +114,7 @@ public TwitterSpritzerFirehoseFactory( } @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { final ConnectionLifeCycleListener connectionLifeCycleListener = new ConnectionLifeCycleListener() { diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java index 2e41238fd1b1..98cd1a8ec4be 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java @@ -48,4 +48,13 @@ public String getContainer() { public String getPath() { return path; } + + @Override + public String toString() + { + return "AzureBlob{" + + "container=" + container + + ",path=" + path + + "}"; + } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index 233974cf22f8..c891c8472b0e 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -22,106 +22,71 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.impl.FileIteratingFirehose; -import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; -import io.druid.java.util.common.logger.Logger; import io.druid.storage.azure.AzureByteSource; import io.druid.storage.azure.AzureStorage; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.LineIterator; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.Collection; import java.util.List; /** * This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package */ -public class StaticAzureBlobStoreFirehoseFactory implements FirehoseFactory { - private static final Logger log = new Logger(StaticAzureBlobStoreFirehoseFactory.class); - +public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory +{ private final AzureStorage azureStorage; private final List blobs; @JsonCreator public StaticAzureBlobStoreFirehoseFactory( @JacksonInject("azureStorage") AzureStorage azureStorage, - @JsonProperty("blobs") AzureBlob[] blobs - ) { + @JsonProperty("blobs") List blobs, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry + ) + { + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); + this.blobs = blobs; this.azureStorage = azureStorage; - this.blobs = ImmutableList.copyOf(blobs); } @JsonProperty - public List getBlobs() { + public List getBlobs() + { return blobs; } @Override - public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException { - Preconditions.checkNotNull(azureStorage, "null azureStorage"); - - final LinkedList objectQueue = Lists.newLinkedList(blobs); - - return new FileIteratingFirehose( - new Iterator() { - @Override - public boolean hasNext() { - return !objectQueue.isEmpty(); - } - - @Override - public LineIterator next() { - final AzureBlob nextURI = objectQueue.poll(); - - final String container = nextURI.getContainer(); - final String path = nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath(); - - try { - final InputStream innerInputStream = new AzureByteSource(azureStorage, container, path).openStream(); - + protected Collection initObjects() + { + return blobs; + } - final InputStream outerInputStream = path.endsWith(".gz") - ? CompressionUtils.gzipInputStream(innerInputStream) - : innerInputStream; + @Override + protected InputStream openObjectStream(AzureBlob object) throws IOException + { + return makeByteSource(azureStorage, object).openStream(); + } - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } catch (Exception e) { - log.error(e, - "Exception opening container[%s] blob[%s]", - container, - path - ); + @Override + protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException + { + return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + } - throw Throwables.propagate(e); - } - } + private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object) + { + final String container = object.getContainer(); + final String path = object.getPath().startsWith("/") + ? object.getPath().substring(1) + : object.getPath(); - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }, - stringInputRowParser - ); + return new AzureByteSource(azureStorage, container, path); } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java index fc25ba9b6767..fb013ed55a74 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java @@ -62,4 +62,14 @@ public String getRegion() { return region; } + + @Override + public String toString() + { + return "CloudFilesBlob{" + + "container=" + container + + ",path=" + path + + ",region=" + region + + "}"; + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index c3f5d23ab999..29ecfb3bbc8a 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -22,34 +22,19 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.impl.FileIteratingFirehose; -import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; -import io.druid.java.util.common.parsers.ParseException; import io.druid.storage.cloudfiles.CloudFilesByteSource; import io.druid.storage.cloudfiles.CloudFilesObjectApiProxy; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.LineIterator; import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.Collection; import java.util.List; -public class StaticCloudFilesFirehoseFactory implements FirehoseFactory +public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { private static final Logger log = new Logger(StaticCloudFilesFirehoseFactory.class); @@ -59,11 +44,17 @@ public class StaticCloudFilesFirehoseFactory implements FirehoseFactory blobs, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry ) { + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); this.cloudFilesApi = cloudFilesApi; - this.blobs = ImmutableList.copyOf(blobs); + this.blobs = blobs; } @JsonProperty @@ -73,67 +64,31 @@ public List getBlobs() } @Override - public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException, ParseException + protected Collection initObjects() { - Preconditions.checkNotNull(cloudFilesApi, "null cloudFilesApi"); - - final LinkedList objectQueue = Lists.newLinkedList(blobs); - - return new FileIteratingFirehose( - new Iterator() - { - - @Override - public boolean hasNext() - { - return !objectQueue.isEmpty(); - } - - @Override - public LineIterator next() - { - final CloudFilesBlob nextURI = objectQueue.poll(); - - final String region = nextURI.getRegion(); - final String container = nextURI.getContainer(); - final String path = nextURI.getPath(); - - log.info("Retrieving file from region[%s], container[%s] and path [%s]", - region, container, path - ); - CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy( - cloudFilesApi, region, container); - final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); - - try { - final InputStream innerInputStream = byteSource.openStream(); - final InputStream outerInputStream = path.endsWith(".gz") - ? CompressionUtils.gzipInputStream(innerInputStream) - : innerInputStream; - - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8))); - } - catch (IOException e) { - log.error(e, - "Exception opening container[%s] blob[%s] from region[%s]", - container, path, region - ); - - throw Throwables.propagate(e); - } - } + return blobs; + } - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } + @Override + protected InputStream openObjectStream(CloudFilesBlob object) throws IOException + { + final String region = object.getRegion(); + final String container = object.getContainer(); + final String path = object.getPath(); - }, - stringInputRowParser + log.info("Retrieving file from region[%s], container[%s] and path [%s]", + region, container, path ); + CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy( + cloudFilesApi, region, container); + final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); + + return byteSource.openStream(); } + @Override + protected InputStream wrapObjectStream(CloudFilesBlob object, InputStream stream) throws IOException + { + return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + } } diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index ccc0442ca07b..82af3a159c97 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -42,6 +42,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -137,7 +138,10 @@ private boolean hasMessagesPending() } @Override - public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException + public Firehose connect( + ByteBufferInputRowParser byteBufferInputRowParser, + File temporaryDirectory + ) throws IOException, ParseException { Set newDimExclus = Sets.union( diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java index 0b6b6cc6eba0..d0109973d87b 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java @@ -41,5 +41,14 @@ public String getBucket() { public String getPath() { return path; } + + @Override + public String toString() + { + return "GoogleBlob {" + + "bucket=" + bucket + + ",path=" + path + + "}"; + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 907571d58ffa..4f8afca76ca3 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -22,43 +22,35 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.impl.FileIteratingFirehose; -import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; -import io.druid.java.util.common.logger.Logger; import io.druid.storage.google.GoogleByteSource; import io.druid.storage.google.GoogleStorage; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.LineIterator; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.Collection; import java.util.List; -public class StaticGoogleBlobStoreFirehoseFactory implements FirehoseFactory { - private static final Logger LOG = new Logger(StaticGoogleBlobStoreFirehoseFactory.class); - +public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory +{ private final GoogleStorage storage; private final List blobs; @JsonCreator public StaticGoogleBlobStoreFirehoseFactory( @JacksonInject GoogleStorage storage, - @JsonProperty("blobs") GoogleBlob[] blobs - ) { + @JsonProperty("blobs") List blobs, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry + ) + { + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); this.storage = storage; - this.blobs = ImmutableList.copyOf(blobs); + this.blobs = blobs; } @JsonProperty @@ -67,57 +59,26 @@ public List getBlobs() { } @Override - public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException { - Preconditions.checkNotNull(storage, "null storage"); - - final LinkedList objectQueue = Lists.newLinkedList(blobs); - - return new FileIteratingFirehose( - new Iterator() { - @Override - public boolean hasNext() { - return !objectQueue.isEmpty(); - } - - @Override - public LineIterator next() { - final GoogleBlob nextURI = objectQueue.poll(); - - final String bucket = nextURI.getBucket(); - final String path = nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath(); - - try { - final InputStream innerInputStream = new GoogleByteSource(storage, bucket, path).openStream(); - - final InputStream outerInputStream = path.endsWith(".gz") - ? CompressionUtils.gzipInputStream(innerInputStream) - : innerInputStream; - - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } catch (Exception e) { - LOG.error(e, - "Exception opening bucket[%s] blob[%s]", - bucket, - path - ); + protected Collection initObjects() + { + return blobs; + } - throw Throwables.propagate(e); - } - } + @Override + protected InputStream openObjectStream(GoogleBlob object) throws IOException + { + final String bucket = object.getBucket(); + final String path = object.getPath().startsWith("/") + ? object.getPath().substring(1) + : object.getPath(); + + return new GoogleByteSource(storage, bucket, path).openStream(); + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }, - stringInputRowParser - ); + @Override + protected InputStream wrapObjectStream(GoogleBlob object, InputStream stream) throws IOException + { + return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } } diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index b48083fe2991..66c91770a2b7 100644 --- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -41,6 +41,7 @@ import net.jodah.lyra.retry.RetryPolicy; import net.jodah.lyra.util.Duration; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; @@ -134,7 +135,7 @@ public JacksonifiedConnectionFactory getConnectionFactory() } @Override - public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException + public Firehose connect(final ByteBufferInputRowParser firehoseParser, File temporaryDirectory) throws IOException { ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); Config lyraConfig = new Config() diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 3a87e15b2adc..dd63153ce92c 100644 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -37,6 +37,7 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.InvalidMessageException; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -68,7 +69,7 @@ public KafkaEightFirehoseFactory( } @Override - public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException + public Firehose connect(final ByteBufferInputRowParser firehoseParser, File temporaryDirectory) throws IOException { Set newDimExclus = Sets.union( firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index a172530fbf08..5ca2131f5bb4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -90,7 +90,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; @@ -825,7 +824,7 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox ioConfig.getStartPartitions().getPartitionOffsetMap().size()); return Appenderators.createRealtime( dataSchema, - tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()) .withMaxRowsInMemory(maxRowsInMemoryPerPartition), metrics, toolbox.getSegmentPusher(), diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index b2ccd54d73f1..95ae9394a70a 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -22,54 +22,67 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.impl.FileIteratingFirehose; -import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.LineIterator; +import org.jets3t.service.ServiceException; +import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.net.URI; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; /** * Builds firehoses that read from a predefined list of S3 objects and then dry up. */ -public class StaticS3FirehoseFactory implements FirehoseFactory +public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactory { private static final Logger log = new Logger(StaticS3FirehoseFactory.class); + private static final long MAX_LISTING_LENGTH = 1024; private final RestS3Service s3Client; private final List uris; + private final List prefixes; @JsonCreator public StaticS3FirehoseFactory( @JacksonInject("s3Client") RestS3Service s3Client, - @JsonProperty("uris") List uris + @JsonProperty("uris") List uris, + @JsonProperty("prefixes") List prefixes, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry ) { - this.s3Client = s3Client; - this.uris = ImmutableList.copyOf(uris); + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); + this.s3Client = Preconditions.checkNotNull(s3Client, "null s3Client"); + this.uris = uris == null ? new ArrayList<>() : uris; + this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; + + if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) { + throw new IAE("uris and directories cannot be used together"); + } - for (final URI inputURI : uris) { + if (this.uris.isEmpty() && this.prefixes.isEmpty()) { + throw new IAE("uris or directories must be specified"); + } + + for (final URI inputURI : this.uris) { + Preconditions.checkArgument(inputURI.getScheme().equals("s3"), "input uri scheme == s3 (%s)", inputURI); + } + + for (final URI inputURI : this.prefixes) { Preconditions.checkArgument(inputURI.getScheme().equals("s3"), "input uri scheme == s3 (%s)", inputURI); } } @@ -80,72 +93,81 @@ public List getUris() return uris; } + @JsonProperty("prefixes") + public List getPrefixes() + { + return prefixes; + } + @Override - public Firehose connect(StringInputRowParser firehoseParser) throws IOException + protected Collection initObjects() throws IOException { - Preconditions.checkNotNull(s3Client, "null s3Client"); - - final LinkedList objectQueue = Lists.newLinkedList(uris); - - return new FileIteratingFirehose( - new Iterator() - { - @Override - public boolean hasNext() - { - return !objectQueue.isEmpty(); - } - - @Override - public LineIterator next() - { - final URI nextURI = objectQueue.poll(); - - final String s3Bucket = nextURI.getAuthority(); - final S3Object s3Object = new S3Object( - nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath() + // Here, the returned s3 objects contain minimal information without data. + // Getting data is deferred until openObjectStream() is called for each object. + if (!uris.isEmpty()) { + return uris.stream() + .map( + uri -> { + final String s3Bucket = uri.getAuthority(); + final S3Object s3Object = new S3Object(extractS3Key(uri)); + s3Object.setBucketName(s3Bucket); + return s3Object; + } + ) + .collect(Collectors.toList()); + } else { + final List objects = new ArrayList<>(); + for (URI uri : prefixes) { + final String bucket = uri.getAuthority(); + final String prefix = extractS3Key(uri); + try { + String lastKey = null; + StorageObjectsChunk objectsChunk; + do { + objectsChunk = s3Client.listObjectsChunked( + bucket, + prefix, + null, + MAX_LISTING_LENGTH, + lastKey ); + Arrays.stream(objectsChunk.getObjects()).forEach(storageObject -> objects.add((S3Object) storageObject)); + lastKey = objectsChunk.getPriorLastKey(); + } while (!objectsChunk.isListingComplete()); + } + catch (ServiceException e) { + throw new IOException(e); + } + } + return objects; + } + } - log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); - - try { - final InputStream innerInputStream = s3Client.getObject( - new S3Bucket(s3Bucket), s3Object.getKey() - ) - .getDataInputStream(); - - final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") - ? CompressionUtils.gzipInputStream(innerInputStream) - : innerInputStream; - - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } - catch (Exception e) { - log.error( - e, - "Exception reading from bucket[%s] object[%s]", - s3Bucket, - s3Object.getKey() - ); - - throw Throwables.propagate(e); - } - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }, - firehoseParser - ); + private static String extractS3Key(URI uri) + { + return uri.getPath().startsWith("/") + ? uri.getPath().substring(1) + : uri.getPath(); + } + + @Override + protected InputStream openObjectStream(S3Object object) throws IOException + { + log.info("Reading from bucket[%s] object[%s] (%s)", object.getBucketName(), object.getKey(), object); + + try { + // Get data of the given object and open an input stream + return s3Client.getObject(object.getBucketName(), object.getKey()).getDataInputStream(); + } + catch (ServiceException e) { + throw new IOException(e); + } + } + + @Override + protected InputStream wrapObjectStream(S3Object object, InputStream stream) throws IOException + { + return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } @Override @@ -160,13 +182,13 @@ public boolean equals(Object o) StaticS3FirehoseFactory factory = (StaticS3FirehoseFactory) o; - return !(uris != null ? !uris.equals(factory.uris) : factory.uris != null); + return getUris().equals(factory.getUris()); } @Override public int hashCode() { - return uris != null ? uris.hashCode() : 0; + return getUris().hashCode(); } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 25950e334037..79a03559e8b6 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.junit.Assert; import org.junit.Test; @@ -54,6 +56,7 @@ public void testSerde() throws Exception ); Assert.assertEquals(factory, outputFact); + Assert.assertEquals(uris, outputFact.getUris()); } // This class is a workaround for the injectable value that StaticS3FirehoseFactory requires @@ -64,7 +67,7 @@ public TestStaticS3FirehoseFactory( @JsonProperty("uris") List uris ) { - super(null, uris); + super(EasyMock.niceMock(RestS3Service.class), uris, null, null, null, null, null, null); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index d9a690eb65ba..dc1175a22053 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -253,4 +253,19 @@ public CacheConfig getCacheConfig() public IndexMergerV9 getIndexMergerV9() { return indexMergerV9; } + + public File getFirehoseTemporaryDir() + { + return new File(taskWorkDir, "firehose"); + } + + public File getMergeDir() + { + return new File(taskWorkDir, "merge"); + } + + public File getPersistDir() + { + return new File(taskWorkDir, "persist"); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index fd36470b111d..a39a2238c79b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -80,7 +80,6 @@ import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; -import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; @@ -89,6 +88,7 @@ import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpecLookup; +import org.codehaus.plexus.util.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -168,28 +168,18 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception .bucketIntervals() .isPresent(); - final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); + final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - if (delegateFirehoseFactory instanceof IngestSegmentFirehoseFactory) { + if (firehoseFactory instanceof IngestSegmentFirehoseFactory) { // pass toolbox to Firehose - ((IngestSegmentFirehoseFactory) delegateFirehoseFactory).setTaskToolbox(toolbox); + ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox); } - final FirehoseFactory firehoseFactory; - if (ingestionSchema.getIOConfig().isSkipFirehoseCaching() - || delegateFirehoseFactory instanceof ReplayableFirehoseFactory) { - firehoseFactory = delegateFirehoseFactory; - } else { - firehoseFactory = new ReplayableFirehoseFactory( - delegateFirehoseFactory, - ingestionSchema.getTuningConfig().isReportParseExceptions(), - null, - null, - smileMapper - ); - } + final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); + // Firehose temporary directory is automatically removed when this IndexTask completes. + FileUtils.forceMkdir(firehoseTempDir); - final Map> shardSpecs = determineShardSpecs(toolbox, firehoseFactory); + final Map> shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir); final String version; final DataSchema dataSchema; @@ -211,7 +201,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception dataSchema = ingestionSchema.getDataSchema(); } - if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, version, firehoseFactory)) { + if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, version, firehoseFactory, firehoseTempDir)) { return TaskStatus.success(getId()); } else { return TaskStatus.failure(getId()); @@ -224,7 +214,8 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception */ private Map> determineShardSpecs( final TaskToolbox toolbox, - final FirehoseFactory firehoseFactory + final FirehoseFactory firehoseFactory, + final File firehoseTempDir ) throws IOException { final ObjectMapper jsonMapper = toolbox.getObjectMapper(); @@ -268,7 +259,10 @@ private Map> determineShardSpecs( log.info("Determining intervals and shardSpecs"); long determineShardSpecsStartMillis = System.currentTimeMillis(); - try (final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { + try (final Firehose firehose = firehoseFactory.connect( + ingestionSchema.getDataSchema().getParser(), + firehoseTempDir) + ) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); @@ -353,7 +347,8 @@ private boolean generateAndPublishSegments( final DataSchema dataSchema, final Map> shardSpecs, final String version, - final FirehoseFactory firehoseFactory + final FirehoseFactory firehoseFactory, + final File firehoseTempDir ) throws IOException, InterruptedException { @@ -406,7 +401,7 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin segmentAllocator, fireDepartmentMetrics ); - final Firehose firehose = firehoseFactory.connect(dataSchema.getParser()) + final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); final Map shardSpecLookups = Maps.newHashMap(); @@ -420,6 +415,10 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin try { final InputRow inputRow = firehose.nextRow(); + if (inputRow == null) { + continue; + } + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); if (!optInterval.isPresent()) { fireDepartmentMetrics.incrementThrownAway(); @@ -512,7 +511,7 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox { return Appenderators.createOffline( dataSchema, - ingestionSchema.getTuningConfig().withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")), + ingestionSchema.getTuningConfig().withBasePersistDirectory(toolbox.getPersistDir()), metrics, toolbox.getSegmentPusher(), toolbox.getObjectMapper(), @@ -589,22 +588,18 @@ public IndexTuningConfig getTuningConfig() public static class IndexIOConfig implements IOConfig { private static final boolean DEFAULT_APPEND_TO_EXISTING = false; - private static final boolean DEFAULT_SKIP_FIREHOSE_CACHING = false; private final FirehoseFactory firehoseFactory; private final boolean appendToExisting; - private final boolean skipFirehoseCaching; @JsonCreator public IndexIOConfig( @JsonProperty("firehose") FirehoseFactory firehoseFactory, - @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting, - @JsonProperty("skipFirehoseCaching") @Nullable Boolean skipFirehoseCaching + @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting ) { this.firehoseFactory = firehoseFactory; this.appendToExisting = appendToExisting == null ? DEFAULT_APPEND_TO_EXISTING : appendToExisting; - this.skipFirehoseCaching = skipFirehoseCaching == null ? DEFAULT_SKIP_FIREHOSE_CACHING : skipFirehoseCaching; } @JsonProperty("firehose") @@ -618,12 +613,6 @@ public boolean isAppendToExisting() { return appendToExisting; } - - @JsonProperty("skipFirehoseCaching") - public boolean isSkipFirehoseCaching() - { - return skipFirehoseCaching; - } } @JsonTypeName("index") diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index fdec7e63c1b3..de49180bf08b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -130,7 +130,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); - final File taskDir = toolbox.getTaskWorkDir(); + final File mergeDir = toolbox.getMergeDir(); try { final long startTime = System.currentTimeMillis(); @@ -155,7 +155,7 @@ public String apply(DataSegment input) final Map gettedSegments = toolbox.fetchSegments(segments); // merge files together - final File fileToUpload = merge(toolbox, gettedSegments, new File(taskDir, "merged")); + final File fileToUpload = merge(toolbox, gettedSegments, mergeDir); emitter.emit(builder.build("merger/numMerged", segments.size())); emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime)); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 0eac7c88ec2a..aa041f09e335 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; @@ -30,7 +29,6 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; - import org.joda.time.DateTime; import java.util.Map; @@ -139,7 +137,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception if (firehoseFactory != null) { log.info("Connecting firehose"); } - try (Firehose firehose = firehoseFactory != null ? firehoseFactory.connect(null) : null) { + try (Firehose firehose = firehoseFactory != null ? firehoseFactory.connect(null, null) : null) { log.info("Running noop task[%s]", getId()); log.info("Sleeping for %,d millis.", runTime); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 60bf62a9d3af..807781a6fd32 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -63,6 +63,7 @@ import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -283,7 +284,7 @@ public String getVersion(final Interval interval) DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + .withBasePersistDirectory(toolbox.getPersistDir()) .withVersioningPolicy(versioningPolicy); final FireDepartment fireDepartment = new FireDepartment( @@ -324,6 +325,7 @@ public String getVersion(final Interval interval) this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); Supplier committerSupplier = null; + final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); try { plumber.startJob(); @@ -331,6 +333,9 @@ public String getVersion(final Interval interval) // Set up metrics emission toolbox.getMonitorScheduler().addMonitor(metricsMonitor); + // Firehose temporary directory is automatically removed when this RealtimeIndexTask completes. + FileUtils.forceMkdir(firehoseTempDir); + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory(); final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory); @@ -338,7 +343,7 @@ public String getVersion(final Interval interval) // Skip connecting firehose if we've been stopped before we got started. synchronized (this) { if (!gracefullyStopped) { - firehose = firehoseFactory.connect(spec.getDataSchema().getParser()); + firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir); committerSupplier = Committers.supplierFromFirehose(firehose); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 25028f0f5c0f..11bc9bdfa26c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -127,7 +127,7 @@ public void setTaskToolbox(TaskToolbox taskToolbox) } @Override - public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException + public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws IOException, ParseException { log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 3ed1942af863..ebb6e4f8c1bc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -595,7 +595,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( baseDir, "druid*", null - ), appendToExisting, null + ), appendToExisting ), new IndexTask.IndexTuningConfig( targetPartitionSize, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 7f4464f7d540..15e6bc8a7989 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -219,7 +219,7 @@ public TestFirehoseFactory() } @Override - public Firehose connect(InputRowParser parser) throws IOException, ParseException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException, ParseException { return new TestFirehose(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index c7ec7b849d31..515b3913f2f5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -88,7 +88,6 @@ public void testIndexTaskIOConfigDefaults() throws Exception ); Assert.assertEquals(false, ioConfig.isAppendToExisting()); - Assert.assertEquals(false, ioConfig.isSkipFirehoseCaching()); } @Test @@ -186,7 +185,7 @@ public void testIndexTaskSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, true), + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true) ), null, @@ -210,7 +209,6 @@ public void testIndexTaskSerde() throws Exception Assert.assertTrue(taskIoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); Assert.assertTrue(task2IoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); Assert.assertEquals(taskIoConfig.isAppendToExisting(), task2IoConfig.isAppendToExisting()); - Assert.assertEquals(taskIoConfig.isSkipFirehoseCaching(), task2IoConfig.isSkipFirehoseCaching()); IndexTask.IndexTuningConfig taskTuningConfig = task.getIngestionSchema().getTuningConfig(); IndexTask.IndexTuningConfig task2TuningConfig = task2.getIngestionSchema().getTuningConfig(); @@ -251,7 +249,7 @@ public void testIndexTaskwithResourceSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, null), + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index b7431f6f3abd..81ee7aa2a9ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -518,7 +518,7 @@ public void simpleFirehoseReadingTest() throws IOException Integer rowcount = 0; try (final IngestSegmentFirehose firehose = (IngestSegmentFirehose) - factory.connect(rowParser)) { + factory.connect(rowParser, null)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 165b29fc7fee..7d29d73a39bf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -144,7 +144,7 @@ public void testSimple() throws Exception int count = 0; long sum = 0; - try (final Firehose firehose = factory.connect(ROW_PARSER)) { + try (final Firehose firehose = factory.connect(ROW_PARSER, null)) { while (firehose.hasMore()) { final InputRow row = firehose.nextRow(); count++; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index a4373d5be9a2..16eeef86799b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -238,7 +238,7 @@ private static InputRow IR(String dt, String dim1, String dim2, float met) private static class MockExceptionalFirehoseFactory implements FirehoseFactory { @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { return new Firehose() { @@ -288,7 +288,7 @@ public MockFirehoseFactory(@JsonProperty("usedByRealtimeIdxTask") boolean usedBy } @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { final Iterator inputRowIterator = usedByRealtimeIdxTask ? realtimeIdxTaskInputRows.iterator() @@ -652,7 +652,7 @@ public void testIndexTask() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null), + new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), null, @@ -710,7 +710,7 @@ public void testIndexTaskFailure() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false, null), + new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), null, @@ -1069,7 +1069,7 @@ public void testResumeTasks() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null), + new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null) ), null, diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index 2c9fb462d1fa..dd8a605af969 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -30,7 +30,6 @@ import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; -import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import java.util.Arrays; @@ -57,7 +56,6 @@ public List getJacksonModules() new NamedType(LocalFirehoseFactory.class, "local"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"), new NamedType(CombiningFirehoseFactory.class, "combining"), - new NamedType(ReplayableFirehoseFactory.class, "replayable"), new NamedType(FixedCountFirehoseFactory.class, "fixedCount") ) ); diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 8114d903256b..eca78928eccf 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -101,7 +101,7 @@ public boolean checkFirehoseV2() public Firehose connect() throws IOException { - return ioConfig.getFirehoseFactory().connect(dataSchema.getParser()); + return ioConfig.getFirehoseFactory().connect(dataSchema.getParser(), null); } public FirehoseV2 connect(Object metaData) throws IOException diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index f13752b6006c..d9fbe539ab20 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -303,14 +303,15 @@ public void run() catch (RuntimeException e) { log.makeAlert( e, - "RuntimeException aborted realtime processing[%s]", + "[%s] aborted realtime processing[%s]", + e.getClass().getSimpleName(), fireDepartment.getDataSchema().getDataSource() ).emit(); normalExit = false; - throw e; + throw Throwables.propagate(e); } catch (Error e) { - log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource()) + log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource()) .emit(); normalExit = false; throw e; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java index ef9a8ad03fee..9bcd7761152d 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ClippedFirehoseFactory.java @@ -28,6 +28,7 @@ import io.druid.data.input.impl.InputRowParser; import org.joda.time.Interval; +import java.io.File; import java.io.IOException; /** @@ -61,10 +62,10 @@ public Interval getInterval() } @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { return new PredicateFirehose( - delegate.connect(parser), + delegate.connect(parser, temporaryDirectory), new Predicate() { @Override diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index 536c1edf88c7..4dc3c9627528 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -29,6 +29,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -52,9 +53,9 @@ public CombiningFirehoseFactory( } @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { - return new CombiningFirehose(parser); + return new CombiningFirehose(parser, temporaryDirectory); } @JsonProperty("delegates") @@ -63,16 +64,18 @@ public List getDelegateFactoryList() return delegateFactoryList; } - public class CombiningFirehose implements Firehose + class CombiningFirehose implements Firehose { private final InputRowParser parser; + private final File temporaryDirectory; private final Iterator firehoseFactoryIterator; private volatile Firehose currentFirehose; - public CombiningFirehose(InputRowParser parser) throws IOException + CombiningFirehose(InputRowParser parser, File temporaryDirectory) throws IOException { this.firehoseFactoryIterator = delegateFactoryList.iterator(); this.parser = parser; + this.temporaryDirectory = temporaryDirectory; nextFirehose(); } @@ -84,7 +87,7 @@ private void nextFirehose() currentFirehose.close(); } - currentFirehose = firehoseFactoryIterator.next().connect(parser); + currentFirehose = firehoseFactoryIterator.next().connect(parser, temporaryDirectory); } catch (IOException e) { if (currentFirehose != null) { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index 3f20139fd8ec..1cf9911c7c26 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -54,6 +54,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Collection; @@ -103,7 +104,7 @@ public EventReceiverFirehoseFactory( } @Override - public Firehose connect(MapInputRowParser firehoseParser) throws IOException + public Firehose connect(MapInputRowParser firehoseParser, File temporaryDirectory) throws IOException { log.info("Connecting firehose: %s", serviceName); final EventReceiverFirehose firehose = new EventReceiverFirehose(firehoseParser); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java index 300b6d139229..fbf498ce5bac 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java @@ -27,6 +27,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import java.io.File; import java.io.IOException; /** @@ -60,12 +61,12 @@ public int getCount() } @Override - public Firehose connect(final InputRowParser parser) throws IOException + public Firehose connect(final InputRowParser parser, File temporaryDirectory) throws IOException { return new Firehose() { private int i = 0; - private Firehose delegateFirehose = delegate.connect(parser); + private Firehose delegateFirehose = delegate.connect(parser, temporaryDirectory); @Override public boolean hasMore() diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index 3da59a8f5fda..099b7c12a3e1 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -30,15 +30,14 @@ import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; import com.ircclouds.irc.api.state.IIRCState; - import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.java.util.common.Pair; import io.druid.java.util.common.logger.Logger; - import org.joda.time.DateTime; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.UUID; @@ -101,7 +100,7 @@ public List getChannels() } @Override - public Firehose connect(final IrcInputRowParser firehoseParser) throws IOException + public Firehose connect(final IrcInputRowParser firehoseParser, File temporaryDirectory) throws IOException { final IRCApi irc = new IRCApiImpl(false); final LinkedBlockingQueue> queue = new LinkedBlockingQueue>(); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 381fefeed14f..6b80f2e96fbc 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -21,30 +21,23 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; import com.metamx.emitter.EmittingLogger; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.impl.FileIteratingFirehose; +import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory; import io.druid.data.input.impl.StringInputRowParser; -import io.druid.java.util.common.IAE; -import io.druid.java.util.common.ISE; - +import io.druid.java.util.common.CompressionUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.LineIterator; import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; /** */ -public class LocalFirehoseFactory implements FirehoseFactory +public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory { private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class); @@ -84,55 +77,26 @@ public StringInputRowParser getParser() } @Override - public Firehose connect(StringInputRowParser firehoseParser) throws IOException + protected Collection initObjects() { - if (baseDir == null) { - throw new IAE("baseDir is null"); - } - log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile()); - - Collection foundFiles = FileUtils.listFiles( - baseDir.getAbsoluteFile(), + final Collection files = FileUtils.listFiles( + Preconditions.checkNotNull(baseDir).getAbsoluteFile(), new WildcardFileFilter(filter), TrueFileFilter.INSTANCE ); + log.info("Initialized with " + files + " files"); + return files; + } - if (foundFiles == null || foundFiles.isEmpty()) { - throw new ISE("Found no files to ingest! Check your schema."); - } - log.info ("Found files: " + foundFiles); - - final LinkedList files = Lists.newLinkedList( - foundFiles - ); - - return new FileIteratingFirehose( - new Iterator() - { - @Override - public boolean hasNext() - { - return !files.isEmpty(); - } - - @Override - public LineIterator next() - { - try { - return FileUtils.lineIterator(files.poll()); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } + @Override + protected InputStream openObjectStream(File object) throws IOException + { + return FileUtils.openInputStream(object); + } - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }, - firehoseParser - ); + @Override + protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException + { + return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java deleted file mode 100644 index 08cd87ac0893..000000000000 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.realtime.firehose; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.fasterxml.jackson.dataformat.smile.SmileGenerator; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Ordering; -import com.google.common.io.CountingOutputStream; -import com.google.common.io.Files; -import com.metamx.emitter.EmittingLogger; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.Row; -import io.druid.data.input.Rows; -import io.druid.data.input.impl.InputRowParser; -import io.druid.guice.annotations.Smile; -import io.druid.java.util.common.parsers.ParseException; -import io.druid.utils.Runnables; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - * Creates a wrapper firehose that writes from another firehose to disk and then serves nextRow() from disk. Useful for - * tasks that require multiple passes through the data to prevent multiple remote fetches. Also has support for - * retrying fetches if the underlying firehose throws an exception while the local cache is being generated. - */ -public class ReplayableFirehoseFactory implements FirehoseFactory -{ - private static final EmittingLogger log = new EmittingLogger(ReplayableFirehoseFactory.class); - private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; - private static final int DEFAULT_MAX_TEMP_FILE_SIZE = 250000000; - private static final int DEFAULT_READ_FIREHOSE_RETRIES = 3; - - private final FirehoseFactory delegateFactory; - private final boolean reportParseExceptions; - - // This is *roughly* the max size of the temp files that will be generated, but they may be slightly larger. The - // reason for the approximation is that we're not forcing flushes after writing to the generator so the number of - // bytes written to the stream won't be updated until the flush occurs. It's probably more important to optimize for - // I/O speed rather than maintaining a strict max on the size of the temp file before it's split. - private final int maxTempFileSize; - - private final int readFirehoseRetries; - private final ObjectMapper smileMapper; - - private ReplayableFirehose firehose; - - @JsonCreator - public ReplayableFirehoseFactory( - @JsonProperty("delegate") FirehoseFactory delegateFactory, - @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, - @JsonProperty("maxTempFileSize") Integer maxTempFileSize, - @JsonProperty("readFirehoseRetries") Integer readFirehoseRetries, - @Smile @JacksonInject ObjectMapper smileMapper - ) - { - Preconditions.checkNotNull(delegateFactory, "delegate cannot be null"); - Preconditions.checkArgument( - !(delegateFactory instanceof ReplayableFirehoseFactory), - "Refusing to wrap another ReplayableFirehoseFactory" - ); - - this.delegateFactory = delegateFactory; - this.reportParseExceptions = reportParseExceptions == null - ? DEFAULT_REPORT_PARSE_EXCEPTIONS - : reportParseExceptions; - this.maxTempFileSize = maxTempFileSize == null ? DEFAULT_MAX_TEMP_FILE_SIZE : maxTempFileSize; - this.readFirehoseRetries = readFirehoseRetries == null ? DEFAULT_READ_FIREHOSE_RETRIES : readFirehoseRetries; - - this.smileMapper = smileMapper; - - log.info(this.toString()); - } - - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - if (firehose == null) { - firehose = new ReplayableFirehose(parser); - } else { - log.info("Rewinding and returning existing firehose"); - firehose.rewind(); - } - - return firehose; - } - - public class ReplayableFirehose implements Firehose - { - private final List files = new ArrayList<>(); - private final List dimensions; - - private int fileIndex = 0; - private JsonFactory jsonFactory; - private JsonParser jsonParser; - private Iterator it; - - public ReplayableFirehose(InputRowParser parser) throws IOException - { - jsonFactory = smileMapper.getFactory(); - - if (jsonFactory instanceof SmileFactory) { - jsonFactory = ((SmileFactory) jsonFactory).enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES); - } - - long counter = 0, totalBytes = 0, unparseable = 0, retryCount = 0; - Set dimensionScratch = new HashSet<>(); - - File tmpDir = Files.createTempDir(); - tmpDir.deleteOnExit(); - - long startTime = System.nanoTime(); - boolean isDone = false; - do { - deleteTempFiles(); - try (Firehose delegateFirehose = delegateFactory.connect(parser)) { - while (delegateFirehose.hasMore()) { - File tmpFile = File.createTempFile("replayable-", null, tmpDir); - tmpFile.deleteOnExit(); - - files.add(tmpFile); - log.debug("Created file [%s]", tmpFile.getAbsolutePath()); - - try (CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(tmpFile)); - JsonGenerator generator = jsonFactory.createGenerator(cos)) { - - while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { - try { - InputRow row = delegateFirehose.nextRow(); - - // delegateFirehose may return a null row if the underlying parser returns null. - // This should be written, but deserialization of null values causes an error of - // 'com.fasterxml.jackson.databind.RuntimeJsonMappingException: Can not deserialize instance of io.druid.data.input.MapBasedRow out of VALUE_NULL token' - // Since ReplayableFirehoseFactory is planed to be removed in https://github.com/druid-io/druid/pull/4193, - // we simply skip null rows for now. - if (row == null) { - continue; - } - - generator.writeObject(row); - dimensionScratch.addAll(row.getDimensions()); - counter++; - } - catch (ParseException e) { - if (reportParseExceptions) { - throw e; - } - unparseable++; - } - } - - totalBytes += cos.getCount(); - } - } - isDone = true; - } - catch (Exception e) { - if (++retryCount <= readFirehoseRetries && !(e instanceof ParseException)) { - log.error(e, "Delegate firehose threw an exception, retrying (%d of %d)", retryCount, readFirehoseRetries); - } else { - log.error(e, "Delegate firehose threw an exception, retries exhausted, aborting"); - Throwables.propagate(e); - } - } - } while (!isDone); - - log.info( - "Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable", - (System.nanoTime() - startTime) / 1000000, - counter, - totalBytes, - unparseable - ); - - dimensions = Ordering.natural().immutableSortedCopy(dimensionScratch); - - if (counter == 0) { - log.warn("Firehose contains no events!"); - deleteTempFiles(); - it = Iterators.emptyIterator(); - } else { - jsonParser = jsonFactory.createParser(files.get(fileIndex)); - it = jsonParser.readValuesAs(Row.class); - } - } - - @Override - public boolean hasMore() - { - if (it.hasNext()) { - return true; - } - - try { - if (jsonParser != null) { - jsonParser.close(); - } - - if (++fileIndex >= files.size() || files.get(fileIndex).length() == 0) { - return false; - } - - jsonParser = jsonFactory.createParser(files.get(fileIndex)); - it = jsonParser.readValuesAs(Row.class); - return true; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public InputRow nextRow() - { - return Rows.toCaseInsensitiveInputRow(it.next(), dimensions); - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - /** - * Closes the firehose by closing the input stream and setting an empty iterator. The underlying cache files - * backing the firehose are retained for when the firehose is "replayed" by calling rewind(). The cache files are - * deleted by File.deleteOnExit() when the process exits. - */ - @Override - public void close() throws IOException - { - if (jsonParser != null) { - jsonParser.close(); - } - it = Iterators.emptyIterator(); - } - - private void rewind() throws IOException - { - close(); - - if (!files.isEmpty()) { - fileIndex = 0; - jsonParser = jsonFactory.createParser(files.get(fileIndex)); - it = jsonParser.readValuesAs(Row.class); - } - } - - private void deleteTempFiles() - { - for (File file : files) { - log.debug("Deleting temp file: %s", file.getAbsolutePath()); - file.delete(); - } - - files.clear(); - } - } - - @JsonProperty("delegate") - public FirehoseFactory getDelegateFactory() - { - return delegateFactory; - } - - @JsonProperty("reportParseExceptions") - public boolean isReportParseExceptions() - { - return reportParseExceptions; - } - - @JsonProperty("maxTempFileSize") - public int getMaxTempFileSize() - { - return maxTempFileSize; - } - - @JsonProperty("readFirehoseRetries") - public int getReadFirehoseRetries() - { - return readFirehoseRetries; - } - - @Override - public String toString() - { - return "ReplayableFirehoseFactory{" + - "delegateFactory=" + delegateFactory + - ", reportParseExceptions=" + reportParseExceptions + - ", maxTempFileSize=" + maxTempFileSize + - ", readFirehoseRetries=" + readFirehoseRetries + - '}'; - } -} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 8b64ae1d3381..5ac8a57cbb93 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -29,6 +29,7 @@ import io.druid.data.input.impl.InputRowParser; import org.joda.time.DateTime; +import java.io.File; import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -53,21 +54,21 @@ public TimedShutoffFirehoseFactory( } @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { - return new TimedShutoffFirehose(parser); + return new TimedShutoffFirehose(parser, temporaryDirectory); } - public class TimedShutoffFirehose implements Firehose + class TimedShutoffFirehose implements Firehose { private final Firehose firehose; private final ScheduledExecutorService exec; private final Object shutdownLock = new Object(); private volatile boolean shutdown = false; - public TimedShutoffFirehose(InputRowParser parser) throws IOException + TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException { - firehose = delegateFactory.connect(parser); + firehose = delegateFactory.connect(parser, temporaryDirectory); exec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d"); diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java index 79d213186ccc..3904eca6bff4 100644 --- a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; @@ -52,7 +53,7 @@ public void testCombiningfirehose() throws IOException new ListFirehoseFactory(list2) ) ); - final Firehose firehose = combiningFactory.connect(null); + final Firehose firehose = combiningFactory.connect(null, null); for (int i = 1; i < 6; i++) { Assert.assertTrue(firehose.hasMore()); final InputRow inputRow = firehose.nextRow(); @@ -126,7 +127,7 @@ public static class ListFirehoseFactory implements FirehoseFactory iterator = rows.iterator(); return new Firehose() diff --git a/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java deleted file mode 100644 index 5af4dac0f06b..000000000000 --- a/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.realtime.firehose; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; -import io.druid.data.input.impl.InputRowParser; -import io.druid.data.input.impl.MapInputRowParser; -import io.druid.data.input.impl.TimeAndDimsParseSpec; -import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.parsers.ParseException; -import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; -import org.easymock.EasyMockSupport; -import org.easymock.IAnswer; -import org.joda.time.DateTime; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; - -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; - -public class ReplayableFirehoseFactoryTest extends EasyMockSupport -{ - private FirehoseFactory delegateFactory = createMock(FirehoseFactory.class); - private Firehose delegateFirehose = createMock(Firehose.class); - private InputRowParser parser = new MapInputRowParser(new TimeAndDimsParseSpec(null, null)); - private ObjectMapper mapper = new DefaultObjectMapper(); - - - private List testRows = Lists.newArrayList( - new MapBasedInputRow( - DateTime.now(), Lists.newArrayList("dim1", "dim2"), - ImmutableMap.of("dim1", "val1", "dim2", "val2", "met1", 1) - ), - new MapBasedInputRow( - DateTime.now(), Lists.newArrayList("dim1", "dim2"), - ImmutableMap.of("dim1", "val5", "dim2", "val2", "met1", 2) - ), - new MapBasedInputRow( - DateTime.now(), Lists.newArrayList("dim2", "dim3"), - ImmutableMap.of("dim2", "val1", "dim3", "val2", "met1", 3) - ) - ); - - private ReplayableFirehoseFactory replayableFirehoseFactory; - - @Before - public void setup() - { - replayableFirehoseFactory = new ReplayableFirehoseFactory( - delegateFactory, - true, - 10000, - 3, - mapper - ); - } - - @Test - public void testConstructor() throws Exception - { - Assert.assertEquals(delegateFactory, replayableFirehoseFactory.getDelegateFactory()); - Assert.assertEquals(10000, replayableFirehoseFactory.getMaxTempFileSize()); - Assert.assertEquals(3, replayableFirehoseFactory.getReadFirehoseRetries()); - Assert.assertEquals(true, replayableFirehoseFactory.isReportParseExceptions()); - } - - @Test - public void testReplayableFirehoseNoEvents() throws Exception - { - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andReturn(false); - delegateFirehose.close(); - replayAll(); - - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - Assert.assertFalse(firehose.hasMore()); - } - verifyAll(); - } - - @Test - public void testReplayableFirehoseWithEvents() throws Exception - { - final boolean hasMore[] = {true}; - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)) - .andReturn(testRows.get(1)) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return testRows.get(2); - } - } - ); - - delegateFirehose.close(); - replayAll(); - - List rows = Lists.newArrayList(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRows, rows); - - // now replay! - rows.clear(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRows, rows); - - verifyAll(); - } - - @Test - public void testReplayableFirehoseWithoutReportParseExceptions() throws Exception - { - final boolean hasMore[] = {true}; - replayableFirehoseFactory = new ReplayableFirehoseFactory( - delegateFactory, - false, - 10000, - 3, - mapper - ); - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)) - .andReturn(testRows.get(1)) - .andThrow(new ParseException("unparseable!")) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return testRows.get(2); - } - } - ); - - delegateFirehose.close(); - replayAll(); - - List rows = Lists.newArrayList(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRows, rows); - - verifyAll(); - } - - @Test(expected = ParseException.class) - public void testReplayableFirehoseWithReportParseExceptions() throws Exception - { - final boolean hasMore[] = {true}; - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)) - .andReturn(testRows.get(1)) - .andThrow(new ParseException("unparseable!")) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return testRows.get(2); - } - } - ); - - delegateFirehose.close(); - replayAll(); - - replayableFirehoseFactory.connect(parser); - verifyAll(); - } - - @Test - public void testReplayableFirehoseWithConnectRetries() throws Exception - { - final boolean hasMore[] = {true}; - - expect(delegateFactory.connect(parser)).andThrow(new IOException()) - .andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)) - .andReturn(testRows.get(1)) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return testRows.get(2); - } - } - ); - delegateFirehose.close(); - replayAll(); - - List rows = Lists.newArrayList(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRows, rows); - - verifyAll(); - } - - @Test - public void testReplayableFirehoseWithNextRowRetries() throws Exception - { - final boolean hasMore[] = {true}; - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose).times(2); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)) - .andThrow(new RuntimeException()) - .andReturn(testRows.get(0)) - .andReturn(testRows.get(1)) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return testRows.get(2); - } - } - ); - delegateFirehose.close(); - expectLastCall().times(2); - replayAll(); - - List rows = Lists.newArrayList(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRows, rows); - - verifyAll(); - } - - @Test(expected = TestReadingException.class) - public void testReplayableFirehoseWithNoRetries() throws Exception - { - replayableFirehoseFactory = new ReplayableFirehoseFactory( - delegateFactory, - false, - 10000, - 0, - mapper - ); - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andReturn(true).times(2); - expect(delegateFirehose.nextRow()).andThrow(new TestReadingException()); - - delegateFirehose.close(); - expectLastCall(); - replayAll(); - - replayableFirehoseFactory.connect(parser); - verifyAll(); - } - - @Test - public void testReplayableFirehoseWithMultipleFiles() throws Exception - { - replayableFirehoseFactory = new ReplayableFirehoseFactory(delegateFactory, false, 1, 3, mapper); - - final boolean hasMore[] = {true}; - final int multiplicationFactor = 500; - - final InputRow finalRow = new MapBasedInputRow( - DateTime.now(), Lists.newArrayList("dim4", "dim5"), - ImmutableMap.of("dim4", "val12", "dim5", "val20", "met1", 30) - ); - - expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); - expect(delegateFirehose.hasMore()).andAnswer( - new IAnswer() - { - @Override - public Boolean answer() throws Throwable - { - return hasMore[0]; - } - } - ).anyTimes(); - - expect(delegateFirehose.nextRow()) - .andReturn(testRows.get(0)).times(multiplicationFactor) - .andReturn(testRows.get(1)).times(multiplicationFactor) - .andReturn(testRows.get(2)).times(multiplicationFactor) - .andAnswer( - new IAnswer() - { - @Override - public InputRow answer() throws Throwable - { - hasMore[0] = false; - return finalRow; - } - } - ); - - delegateFirehose.close(); - replayAll(); - - List testRowsMultiplied = Lists.newArrayList(); - for (InputRow row : testRows) { - for (int i = 0; i < multiplicationFactor; i++) { - testRowsMultiplied.add(row); - } - } - testRowsMultiplied.add(finalRow); - - List rows = Lists.newArrayList(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRowsMultiplied, rows); - - // now replay! - rows.clear(); - try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { - while (firehose.hasMore()) { - rows.add(firehose.nextRow()); - } - } - Assert.assertEquals(testRowsMultiplied, rows); - - verifyAll(); - } - - private static class TestReadingException extends RuntimeException - { - } -} - - - diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 2484030ae135..ae760e5effc8 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -81,6 +81,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; @@ -143,7 +144,7 @@ public void setUp() throws Exception new FirehoseFactory() { @Override - public Firehose connect(InputRowParser parser) throws IOException + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException { return new TestFirehose(rows.iterator()); } diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java index 3517e7eb3e18..2e9a7d3716d6 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; - import io.druid.concurrent.Execs; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; @@ -88,7 +87,8 @@ public void setUp() throws Exception null, null ) - ) + ), + null ); } @@ -220,7 +220,8 @@ public void testDuplicateRegistering() throws IOException null, null ) - ) + ), + null ); } diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryTest.java new file mode 100644 index 000000000000..99d711fdecb3 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import io.druid.data.input.Firehose; +import io.druid.data.input.Row; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +public class LocalFirehoseFactoryTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private LocalFirehoseFactory factory; + + @Before + public void setup() throws IOException + { + for (int i = 0; i < 5; i++) { + try (final Writer writer = new BufferedWriter( + new FileWriter(temporaryFolder.newFile("test_" + i)) + )) { + writer.write((20171225 + i) + "," + i + "th test file\n"); + } + } + + for (int i = 0; i < 5; i++) { + try (final Writer writer = new BufferedWriter( + new FileWriter(temporaryFolder.newFile("filtered_" + i)) + )) { + writer.write((20171225 + i) + "," + i + "th filtered file\n"); + } + } + + factory = new LocalFirehoseFactory(temporaryFolder.getRoot(), "test_*", null); + } + + @Test + public void testConnect() throws IOException + { + try (final Firehose firehose = factory.connect(new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a")), + Lists.newArrayList(), + Lists.newArrayList() + ), + ",", + Arrays.asList("timestamp", "a"), + false, + 0 + ), + Charsets.UTF_8.name() + ), null)) { + final List rows = new ArrayList<>(); + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + + Assert.assertEquals(5, rows.size()); + rows.sort(Comparator.comparing(Row::getTimestamp)); + for (int i = 0; i < 5; i++) { + final List dimVals = rows.get(i).getDimension("a"); + Assert.assertEquals(1, dimVals.size()); + Assert.assertEquals(i + "th test file", dimVals.get(0)); + } + } + } +}