Add PrefetchableTextFilesFirehoseFactory for cloud storage types#4193
Add PrefetchableTextFilesFirehoseFactory for cloud storage types#4193gianm merged 28 commits intoapache:masterfrom
Conversation
| @JsonProperty("blobs") AzureBlob[] blobs | ||
| ) { | ||
| @JsonProperty("blobs") List<AzureBlob> blobs, | ||
| @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, |
There was a problem hiding this comment.
These @JsonProperty annotations need to be fixed
There was a problem hiding this comment.
Oh, copy-paste bad. Fixed them. Thanks!
| @JacksonInject("objectApi") CloudFilesApi cloudFilesApi, | ||
| @JsonProperty("blobs") CloudFilesBlob[] blobs | ||
| @JsonProperty("blobs") List<CloudFilesBlob> blobs, | ||
| @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, |
There was a problem hiding this comment.
These @JsonProperty annotations need to be fixed
| @JsonProperty("blobs") GoogleBlob[] blobs | ||
| ) { | ||
| @JsonProperty("blobs") List<GoogleBlob> blobs, | ||
| @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, |
There was a problem hiding this comment.
These @JsonProperty annotations need to be fixed
| @JacksonInject("s3Client") RestS3Service s3Client, | ||
| @JsonProperty("uris") List<URI> uris | ||
| @JsonProperty("uris") List<URI> uris, | ||
| @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, |
There was a problem hiding this comment.
These @JsonProperty annotations need to be fixed
gianm
left a comment
There was a problem hiding this comment.
Didn't do a full review yet, but had some initial comments from the parts I did look at. Let me know what you think of those and I will also continue reviewing the rest.
I think this patch also won't totally solve the original problem, since running out of disk isn't just due to the replayable firehose caching everything. That's what you hit first, but once that's fixed that, there's still the segments we write on disk. They'll eventually fill up the disk because we don't publish them until the very end of the task. I think it's ok to address that in a separate patch though -- it's a different problem than the one you're solving in this patch.
| import io.druid.segment.realtime.appenderator.SegmentIdentifier; | ||
| import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; | ||
| import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; | ||
| import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; |
There was a problem hiding this comment.
This was the only use of ReplayableFirehoseFactory (it wasn't exposed to users… just used internally). So if it's not needed anymore, you should just remove it from the code base.
| public IndexIOConfig( | ||
| @JsonProperty("firehose") FirehoseFactory firehoseFactory, | ||
| @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting, | ||
| @JsonProperty("skipFirehoseCaching") @Nullable Boolean skipFirehoseCaching |
There was a problem hiding this comment.
You should remove skipFirehoseCaching from the docs too (tasks.md I think).
| smileMapper | ||
| ); | ||
| } | ||
| final FirehoseFactory firehoseFactory = delegateFirehoseFactory; |
There was a problem hiding this comment.
Might as well call delegateFirehoseFactory as firehoseFactory from the start. The renaming isn't doing much useful anymore.
There was a problem hiding this comment.
I forgot renaming it. Thanks.
|
|
||
| final File baseDir = new File("lol"); | ||
| baseDir.mkdir(); | ||
| baseDir.deleteOnExit(); |
There was a problem hiding this comment.
What's this for? Could it be a @Rule TemporaryFolder instead? That's preferred for any temporary directories or files needed by tests.
There was a problem hiding this comment.
It was for checking that the directory exists in the constructor of LocalFirehoseFactory. Removed now.
| ) | ||
| { | ||
| super( | ||
| FileUtils.listFiles( |
There was a problem hiding this comment.
This shouldn't happen in construction. Firehose factories get constructed semi-frequently (anytime Task objects are deserialized) and that doesn't necessarily mean they will really be used. Anything that hits the disk or network should be deferred until the firehose is connected.
There was a problem hiding this comment.
Right. I fixed. Thanks!
| * | ||
| * @return true if the object is compressed with gzip | ||
| */ | ||
| protected abstract boolean isGzipped(ObjectType object); |
There was a problem hiding this comment.
Is it necessary to have isGzipped and wrapIfNeeded? I would think that the implementations could wrap if needed, since they should know if an object is gzipped or not.
| * IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of | ||
| * GranularitySpec is not specified. | ||
| */ | ||
| public abstract class PrefetcheableTextFilesFirehoseFactory<ObjectType> |
| { | ||
| if (baseDir == null) { | ||
| baseDir = Files.createTempDir(); | ||
| baseDir.deleteOnExit(); |
There was a problem hiding this comment.
Expanding use of deleteOnExit here is a little undesirable (see #3923 where we removed others). We did use it in ReplayableFirehoseFactory but that was only used by the IndexTask. Now these firehoses are used by potentially more code.
I wonder if instead, we should either add a File temporaryDirectory parameter to connect or do a hack similar to #4069, to allow the caller to pass in an appropriate tmp directory. Something nicer than #4069 is probably better, since that approach doesn't work if the firehose is wrapped in any way.
There was a problem hiding this comment.
Right. I vote for passing a temporaryDirectory parameter because it works well with wrapped firehoses and the caller can manage its resource.
| } | ||
|
|
||
| if (closer != null) { | ||
| closer.close(); |
There was a problem hiding this comment.
Should close this even if lineIterator fails to close.
| Long maxCacheCapacityBytes, | ||
| Long maxFetchCapacityBytes, | ||
| Long prefetchTriggerBytes, | ||
| Integer fetchTimeout, |
There was a problem hiding this comment.
Generally we use longs to represent times and durations.
|
@gianm you're right. I changed the title. Also, I'll fix the problem of publishing segments at the very end of the index task in a follow-up pr. |
| } | ||
|
|
||
| /** | ||
| * Downloads an object. It retires downloading {@link #maxFetchRetry} times and throws that exception. |
| } | ||
|
|
||
| try { | ||
| InputStream stream = FileUtils.openInputStream(fetchedFile); |
There was a problem hiding this comment.
Should this block use the factory's openStream() method instead?
There was a problem hiding this comment.
AbstractTextFilesFirehoseFactory.openStream() is used for fetching data from remote storage in PrefetchableTextFilesFirehoseFactory.download(). Once a file is fetched, it is opened with FileUtils.openInputStream().
There was a problem hiding this comment.
Hm, should we remove the GZip file extension check here then, and require that implementations of openStream() output a decompressed stream?
There was a problem hiding this comment.
I think the current mechanism is fine, it saves some disk space by postponing decompression until reading from disk.
There was a problem hiding this comment.
Thanks, my bad. I separated openStream() into openObjectStream() and wrapObjectStream(). openObjectStream() describes how an input stream is opened for an object, and wrapObjectStream() wraps the input stream if the object is compressed.
| } | ||
| } | ||
|
|
||
| FileUtils.forceDelete(firehoseTempDir); |
There was a problem hiding this comment.
Should this be moved to the outer finally block? If an exception occurs after firehoseTempDir is created (normalExit is false), it looks like firehoseTempDir won't be deleted here.
There was a problem hiding this comment.
Right. I moved to the finally block.
| * @param temporaryDirectory a directory where temporary files are stored | ||
| */ | ||
| public Firehose connect(T parser) throws IOException, ParseException; | ||
| Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException; |
There was a problem hiding this comment.
What do you think about removing temporaryDirectory from the connect() parameters, and have the temp directory creation/cleanup be managed in the Firehoses that need it?
It looks like most Firehose implementations aside from the one returned by PrefetchableTestFilesFirehoseFactory don't need the temp dir.
There was a problem hiding this comment.
The life cycle of temporaryDirectory is same with that of FirehoseFactory rather than Firehose. This is because the temporaryDirectory containing cached data should not be deleted for reusing even when a PrefetchableFirehose is closed.
We may add abstract methods for initialization and termination to FirehoseFactory and the implementation of PrefetchableFirehoseFactory can create and delete the temporaryDirectory in those methods. Do you think it is better?
There was a problem hiding this comment.
I see, I think it would be better to have the temp dir management contained within the PrefetchableFirehoseFactory implementation then as you mentioned.
There was a problem hiding this comment.
Would you tell me what kind of temp dir management you think? I think the temp dir should be managed by the caller of PrefetchableFirehoseFactory.connect() because PrefetchableFirehoseFactory cannot decide when it removes the temp dir by itself.
There was a problem hiding this comment.
I think initialization/termination methods that you mentioned on FirehoseFactory would be good place for the temp dir management.
There was a problem hiding this comment.
I realized that FirehoseFactory is in the druid api module. I think it would be better to manage temporaryDirectory internally rather than leaving the management to users.
There was a problem hiding this comment.
Think about the context where it's used. In our world, callers of firehoses are indexing tasks, which already have working directories that are in user-configurable locations and are automatically cleaned up. It makes sense to use this directory (and pass it into the firehose factory) rather than expect the firehose to set up and clean up its own temporary directories. Especially in the relatively common case where the user-configured working directory is on a filesystem with more space than the system default temp directory.
There was a problem hiding this comment.
Right. I changed to use a temporary directory in taskWorkDir.
| * @param temporaryDirectory a directory where temporary files are stored | ||
| */ | ||
| public Firehose connect(T parser) throws IOException, ParseException; | ||
| Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException; |
There was a problem hiding this comment.
This is an interface for extensions, so for backwards compat we should keep the old method and add a default implementation for this one that just calls the old one, ignoring temporaryDirectory. But also mark the old one @Deprecated.
| @Override | ||
| public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException | ||
| { | ||
| final List<ObjectType> objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects")); |
There was a problem hiding this comment.
Maybe objects should be cached? IndexTask connects the same firehose multiple times, caching could be useful for both performance and for being consistent from run-to-run.
There was a problem hiding this comment.
Doesn't have to be in this PR, but, in addition to the text-based firehoses, IngestSegmentFirehose should also support some kind of run-to-run consistency. It would be an input for the index task in the case of reindexing, and it might get connected multiple times in the same task if the task wants to do multiple passes. Ideally each pass should see the same data.
There was a problem hiding this comment.
Hmm. I agree on we should support run-to-run consistency for IngestSegmentFirehose.
However, maybe is it better to leave the consistency management to users for other firehoses which read data from the outside of Druid? Because we cannot guarantee it if some files are removed or changed between subsequent scans.
There was a problem hiding this comment.
However, maybe is it better to leave the consistency management to users for other firehoses which read data from the outside of Druid? Because we cannot guarantee it if some files are removed or changed between subsequent scans.
By user do you mean the caller of the firehose? How would the user be able to manage consistency? It seems tough to me, since for features like S3 prefix listing, the user doesn't really know what objects are being read.
IMO the firehose is in a pretty good position to do what it can to try to get run-to-run consistency (like remembering objects and using that on subsequent connects) and to detect when it's impossible (perhaps remembering checksums of object contents, and detecting when they change).
(I do think this sort of consistency verification isn't necessary for this patch, but happy to talk through it for the future)
There was a problem hiding this comment.
@gianm, I cached objects. It would be good for performance anyway.
By user do you mean the caller of the firehose? How would the user be able to manage consistency? It seems tough to me, since for features like S3 prefix listing, the user doesn't really know what objects are being read.
I was thinking that Druid operators (people) can be responsible for consistency management. I think they can do by deferring changes on their data sets until the current ingestion task completes.
IMO the firehose is in a pretty good position to do what it can to try to get run-to-run consistency (like remembering objects and using that on subsequent connects) and to detect when it's impossible (perhaps remembering checksums of object contents, and detecting when they change).
It sounds good, but will definitely affect to the ingestion performance. I'm wondering it would be worthwhile if we can leave the responsibility for consistency management to Druid operators.
| // scan yet, so we must download the whole file at once. It's still possible for the size of cached/fetched data to | ||
| // not exceed these variables by estimating the after-fetch size, but it makes us consider the case when any files | ||
| // cannot be fetched due to their large size, which makes the implementation complicated. | ||
| private long maxCacheCapacityBytes; |
| } | ||
|
|
||
| try { | ||
| InputStream stream = FileUtils.openInputStream(fetchedFile); |
There was a problem hiding this comment.
I think the current mechanism is fine, it saves some disk space by postponing decompression until reading from disk.
| // maximum retry for fetching an object from the remote site | ||
| private final int maxFetchRetry; | ||
|
|
||
| private volatile int nextFetchIndex; |
There was a problem hiding this comment.
Comment about why this is volatile?
| ); | ||
|
|
||
| // fetchExecutor is responsible for background data fetching | ||
| final ExecutorService fetchExecutor = Executors.newSingleThreadExecutor(); |
There was a problem hiding this comment.
Could you give this thread a descriptive name?
| } | ||
| } | ||
| finally { | ||
| FileUtils.forceDelete(firehoseTempDir); |
There was a problem hiding this comment.
There's no need to do this, work directories get automatically cleaned up after the task exits. It's probably better not to do this since then we'd have the option of skipping the higher-level cleanup if we want to inspect a failed task's working directory.
| @JacksonInject("s3Client") RestS3Service s3Client, | ||
| @JsonProperty("uris") List<URI> uris | ||
| @JsonProperty("uris") List<URI> uris, | ||
| @JsonProperty("directories") List<URI> directories, |
There was a problem hiding this comment.
Could you call this "prefixes"? S3 doesn't really have directories (see http://docs.aws.amazon.com/AmazonS3/latest/UG/FolderOperations.html) so this term is misleading.
There was a problem hiding this comment.
"prefixes" and the new prefetching parameters should be added to the docs for development/extensions-core/s3.md.
There was a problem hiding this comment.
Updated documents.
| this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); | ||
|
|
||
| Supplier<Committer> committerSupplier = null; | ||
| final File firehoseTempDir = Files.createTempDir(); |
There was a problem hiding this comment.
This should be inside the task working dir -- a subdirectory of toolbox.getTaskWorkDir() -- not system temp directory.
| @Override | ||
| public TaskStatus run(final TaskToolbox toolbox) throws Exception | ||
| { | ||
| final File firehoseTempDir = Files.createTempDir(); |
There was a problem hiding this comment.
This should be inside the task working dir -- a subdirectory of toolbox.getTaskWorkDir() -- not system temp directory.
dclim
left a comment
There was a problem hiding this comment.
Looks good pending other review comments + a suggestion to look at using listObjectsChunked
| final String bucket = uri.getAuthority(); | ||
| final String prefix = extractS3Key(uri); | ||
| try { | ||
| final S3Object[] listed = s3Client.listObjects(bucket, prefix, null); |
There was a problem hiding this comment.
Does this support common prefixes (subdirectory paths), and is this something we want to support? The docs indicate that we should use listObjectsChunked here instead, and that might be preferable anyway in case there's a really large number of items.
There was a problem hiding this comment.
Thanks. I changed to use listObjectsChunked().
| // scan yet, so we must download the whole file at once. It's still possible for the size of cached/fetched data to | ||
| // not exceed these variables by estimating the after-fetch size, but it makes us consider the case when any files | ||
| // cannot be fetched due to their large size, which makes the implementation complicated. | ||
| private final long maxCacheCapacityBytes; |
There was a problem hiding this comment.
It took me a bit of time to wrap my head around the separate cache / fetch pool implementation where the files in the cache remain until termination and the files in the fetch pool are deleted when closed. My thought was that there would be a single configurably-sized pool that would have some initial data written into it by the prefetcher and then would continue to have more data added to it by the background thread. If the entire file set fit within the pool then all the data would be completely cached (the same as the cache pool). Otherwise, once the pool started approaching the limit, files that have already been processed would begin to be evicted to make room for more pre-fetched data.
I think the main advantage of doing it this way would be that there is just one pool that needs to be tuned, and indexing could start before the entire cache is filled with data (if the amount of data exceeds the cache size).
I don't feel super strongly about it either way and realize it would be more complicated to implement, but just wondering if it's something you considered and decided against, and if so what your reasoning was.
There was a problem hiding this comment.
@dclim thanks for your comment. The idea behind separating cache and fetch pools is that caching and fetching have different goals. Caching is for reducing the initial latency of the firehose while prefetching is for increasing the throughput of the firehose (it is assumed that reading directly from the remote site is slow). So, I thought cached objects should be the first n objects (it's also assumed that objects should be read in the given order and the order must not be changed if firehose is reconnected).
Your idea about early ingestion during caching sounds good. I updated the patch.
There was a problem hiding this comment.
@dclim, also it would be useful if caching and prefetching can be turn on/off individually because some users might want to use only one of caching or prefetching features. I updated the patch to support disabling prefetching.
|
The travis failure seems to not relate to this patch. I'll close and reopen this pr. |
|
Hmm, I'm looking at the failure. |
| } | ||
| final ObjectType object = iterator.next(); | ||
| try { | ||
| return IOUtils.lineIterator( |
There was a problem hiding this comment.
I think you can get rid of the BufferedReader and InputStreamReader here since wrapObjectStream() returns an InputStream:
https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/IOUtils.html
"All the methods in this class that read a stream are buffered internally. This means that there is no cause to use a BufferedInputStream or BufferedReader. The default buffer size of 4K has been shown to be efficient in tests."
There was a problem hiding this comment.
Cool, thanks! I also removed from PrefetchableTextFilesFirehoseFactory because LineIterator also automatically wraps the input reader with a BufferedReader.
jon-wei
left a comment
There was a problem hiding this comment.
👍 on design, had a few code level comments
| @Override | ||
| public int read(char[] cbuf, int off, int len) throws IOException | ||
| { | ||
| final char[] chs = "\n".toCharArray(); |
There was a problem hiding this comment.
Could extract chs here and make it static
|
|
||
| assertResult(rows); | ||
| } | ||
|
|
There was a problem hiding this comment.
Can you add a test where both cache and prefetch are enabled, but the max prefetch size is less than the max cache capacity?
|
@jon-wei thanks. I addressed your comments. |
|
👍 |
gianm
left a comment
There was a problem hiding this comment.
Looks good other than the FirehoseFactory comment.
| * @param parser an input row parser | ||
| * @param temporaryDirectory a directory where temporary files are stored | ||
| */ | ||
| Firehose connect(T parser, File temporaryDirectory) throws IOException, ParseException; |
There was a problem hiding this comment.
This one should have a default implementation too (for compatibility with pre-existing firehose impls that don't know about overriding this method). It's ok for them to both have default impls that call each other -- anyone implementing a firehose should override at least one. (it's worth calling this out in the javadoc)
There was a problem hiding this comment.
hmm, how about throwing an exception like NotImplementedException in the default implementation?
There was a problem hiding this comment.
Ah, never mind. It will break the compatibility anyway.
There was a problem hiding this comment.
It's got to call the other method; the case we're trying to support is that someone already wrote an extension for Druid 0.10.0 with a custom FirehoseFactory, and then upgrades to Druid 0.10.1. The new Druid 0.10.1 code will call connect(parser, temporaryDirectory) and unless there is a default impl that calls connect(parser), the user's extension will not work properly.
Generally, we want to keep extension compatibility for the extension points on http://druid.io/docs/latest/development/modules.html within a major version like 0.9.x, 0.10.x, etc (it's ok to break extensions from 0.9.x -> 0.10.x if there is a good reason).
There was a problem hiding this comment.
Right. I added a default implementation.
gianm
left a comment
There was a problem hiding this comment.
👍 after travis, thx @jihoonson
Currently, IndexTask always wraps a given Firehose with the
ReplayableFirehose. The problem is thatReplayableFirehosedownloads the whole data in a local disk when it is initialized which can cause the out of disk error. A possible simple solution is to add a max cache capacity toReplayableFirehose, but I didn't becauseReplayableFirehosefirst reads the whole input and stores them in the smile format. When it reads back the data, it needs to parse the data of smile format which introduces an additional parsing overhead.ReplayableFirehoseis valuable as it is because this is meaningful for truly unreliable data sources. Also, it can be used for fault tolerance of streaming ingestion like kafka does after some modifications.So, I added a new class
PrefetchableTextFilesFirehoseFactorywhich is responsible for caching and prefetching objects to a local disk. It has three key features.connect(StringInputRowParser), it caches objects in a local disk up tomaxCacheCapacityBytes. These caches are NOT deleted until the process terminates, and thus can be used for future reads.prefetchTriggerBytes, a background prefetch thread automatically starts fetching remaining objects. A fetched object is deleted when the its LineIterator is closed.maxFetchRetry. I think this is a temporal solution for handling frequent exceptions occurred while accessing cloud storages, and believe we should handle them at a lower level in the future.Additionally, I refactored file-based firehoses by adding
AbstractTextFilesFirehoseFactory, and fixed a bug in FileIteratingFirehose which doesn't close after iterating LineIterator.This change is