Conversation
|
Travis failure looks related to PR changes: |
| } | ||
|
|
||
| protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) | ||
| throws IOException |
There was a problem hiding this comment.
TeamCity is flagging this: The declared exception IOException is never thrown in this method, nor in its derivables
There was a problem hiding this comment.
Ah yeah, nothing implements this method yet, I was preemptively adding it because InputSource.createSplits is allows to throw an IOException and readers will most likely be calling this method. I can remove for now, though I suspect it will likely need to be added back later.
Yeah, made a mistake cleaning up the code for PR and pointed the mock to return the wrong uris, fixed. |
| public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) | ||
| throws IOException | ||
| { | ||
| if (cacheSplitUris == null) { |
There was a problem hiding this comment.
The javadoc for SplittableInputSource.createSplits() notes that implementations should NOT cache the created splits in memory.
@jihoonson Is my understanding of SplittleInputSource.createSplits() correct?
There was a problem hiding this comment.
Yes, it shouldn’t be cached in memory.
There was a problem hiding this comment.
modified, uris are already in memory so if given an explicit list there is nothing to be chill about.
prefixes are handled with an iterator that was based on a previous iterator implementation in s3Utils. This iterator uses list objects calls on each prefix in batches of 1024 objects (with a fallback to getObjectMetadata if a specific 403 is encountered), and creates an iterator on that set of summaries which is drained to the outer iterator. When the batch (or single summary) is done, it moves on to the next prefix and repeats, until all prefixes in the list have been iterated. Retries are baked into each call, so the caller of this method doesn't have to worry about such things.
There was a problem hiding this comment.
hmm, i should probably add something like this to a javadoc of the iterator method 😢
There was a problem hiding this comment.
Ok, I'm fine with this change since it fixes the race between createSplits() and getNumSplits().
| this.uri = uri; | ||
| } | ||
|
|
||
| @Nullable |
There was a problem hiding this comment.
I believe this never returns null. If it does, then uri should be checked for null below in open before calling uri.getAuthority().
| import java.io.InputStream; | ||
| import java.net.URI; | ||
|
|
||
| public class S3Entity implements InputEntity |
| ); | ||
|
|
||
| @Test | ||
| public void testSerde() throws Exception |
There was a problem hiding this comment.
Consider splitting this into two separate tests (one for uris and one for prefixes).
| { | ||
| final ObjectMapper mapper = createS3ObjectMapper(); | ||
|
|
||
| final List<URI> uris = Arrays.asList( |
There was a problem hiding this comment.
Consider making uris and prefixes static final variables since they're used in a few tests.
There was a problem hiding this comment.
i went ahead and did this though i'm not sure I actually like the change, since looking at each test it's a lot less obvious what it is testing
| } | ||
|
|
||
| @Override | ||
| protected InputSourceReader formattableReader( |
There was a problem hiding this comment.
Unit test coverage is missing for this method
There was a problem hiding this comment.
added test through test of read method, which mocks the s3 client to do the list and getMetadata operations and mocked get object method that "returns" S3Object with csv file content
| ); | ||
| objects.addAll(Lists.newArrayList(objectSummaryIterator)); | ||
| } | ||
| catch (AmazonS3Exception outerException) { |
There was a problem hiding this comment.
Unit test coverage is missing for the exception handling here and may be worth adding to verify the retry logic.
There was a problem hiding this comment.
added tests for this and more
| if (s3Object == null) { | ||
| throw new ISE("Failed to get an s3 object for bucket[%s] and key[%s]", bucket, key); | ||
| } | ||
| return CompressionUtils.decompress(s3Object.getObjectContent(), uri.toString()); |
There was a problem hiding this comment.
minor: It doesn't matter much, but uri.getPath() would be better here, because uri.toString() URI-encodes its values.
Actually, key would be even better.
There was a problem hiding this comment.
oops, I meant to switch this to getPath to be consistent with google extension after your review of this there and forgot, but yeah key is better, fixed this and google extension to use key.
| * {@link ServerSideEncryptingAmazonS3#getObjectMetadata} to check if the 'prefix' is an object in the event the | ||
| * list objects call responds with a 403 http status code | ||
| */ | ||
| private static Iterator<InputSplit<URI>> objectFetchingIterator( |
There was a problem hiding this comment.
This code looks like it could be shareable between the input source and the firehose. If true, please accomplish that sharing, ideally by having the firehose call into the input source.
There was a problem hiding this comment.
It is, i was just ignoring the firehose.
I have moved this iterator to S3Utils and changed the signature to be Iterator<S3ObjectSummary>, and modified S3Utils.objectSummaryIterator, used by StaticS3FirehoseFactory and S3TimestampVersionedDataFinder, to take a URI instead of a bucket and key (mildly unfortunate since we will be converting it back to bucket and key, but all the callers have URI available), and defer the logic to this newer iterator.
| <dependency> | ||
| <groupId>joda-time</groupId> | ||
| <artifactId>joda-time</artifactId> | ||
| <version>2.10.5</version> |
There was a problem hiding this comment.
Can remove the version here so it uses the one defined in the root POM.
| originalAuthority; | ||
| final String path = originalPath.startsWith("/") ? originalPath.substring(1) : originalPath; | ||
|
|
||
| return URI.create(StringUtils.format("s3://%s/%s", authority, path)); |
There was a problem hiding this comment.
Since authority and path are both strings, string concatenation may be better than StringUtils.format() here
| EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result).once(); | ||
| } | ||
|
|
||
| private static void addExpectedNonPrefixObjectsWithNoListPermission(URI uri) |
There was a problem hiding this comment.
Parameter uri is not used in the method
| originalAuthority; | ||
| final String path = originalPath.startsWith("/") ? originalPath.substring(1) : originalPath; | ||
|
|
||
| return URI.create(StringUtils.format("s3://%s/%s", authority, path)); |
There was a problem hiding this comment.
This is bad, because it won't encode funny characters in path. Imagine the path has a ? in it. It needs to be URI-encoded, or else pulling the key out later won't work. The tricky characters are / (which you don't want to encode) and ?, #, and others (which you do).
StringUtils.urlEncode might help you here.
Alternatively, don't use URIs internally, instead use bucket/key pairs.
There was a problem hiding this comment.
How about adding this method to CloudObjectLocation:
public URI toUri()
{
// Encode path, except leave '/' characters unencoded
return URI.create(StringUtils.format("s3://%s/%s", bucket, StringUtils.urlEncode(path).replace("%2F", "/"));
}And using it everywhere that is doing this sort of concatenation today.
It won't handle weird, invalid bucket names but it's better than the simple concatenation happening now, and weird paths are more likely anyway. For extra credit you could include validation for the bucket, throwing an error if it's not valid (AWS, Google, etc all have rules for what's a valid bucket, you could do a loose superset of them).
There was a problem hiding this comment.
FYI this method in S3Utils was only called by StaticS3FirehoseFactory, not the new stuff, so I wasn't worrying about it too much because we presumably will remove it in a future release. That said, I went ahead and did the thing to fix it
|
I noticed that the type of this is |
| if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) { | ||
| throw new IAE("uris and prefixes cannot be used together"); | ||
| } | ||
|
|
||
| if (this.uris.isEmpty() && this.prefixes.isEmpty()) { | ||
| throw new IAE("uris or prefixes must be specified"); | ||
| } |
There was a problem hiding this comment.
Optionally, simplify to:
if (this.uris.isEmpty() == this.prefixes.isEmpty()) {
throw new IAE("exactly one of either uris or prefixes must be specified)
}| } | ||
|
|
||
| @JsonProperty | ||
| @JsonProperty("uris") |
| @JsonProperty("objects") | ||
| public List<CloudObjectLocation> getObject() |
There was a problem hiding this comment.
I think if you rename the getter to getObjects() then you wont need the ("objects").
| org.apache.commons.io.FileUtils.forceMkdir(outDir); | ||
|
|
||
| final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); | ||
| final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.getBucket(), s3Coords.getPath())); |
There was a problem hiding this comment.
Is the problem that described in https://github.com/apache/incubator-druid/pull/8903/files/a4f6ae9ae2f81381d865e87ec5e1219d275f299c..7125e3e94bd468bc82c95fad68538890a23c69ee#r348869424 possible here when the URI created here gets passed to the CloudObjectLocation constructor?
Is there a test to check the handling of tricky characters?
There was a problem hiding this comment.
There is not such a test afaik, I guess can look into this, or maybe as a follow-up since this isn't really new code and sort of feels like the scope of this PR is creeping
There was a problem hiding this comment.
This code is definitely buggy, please at least include a comment warning future devs as much. (You aren't doing much here besides mechanical refactoring, so I wouldn't insist on fixing it or adding tests, but the comment is nice.)
Example of the bug:
scala> URI.create(String.format("s3://%s/%s", "mybucket", "path/to/myobject?question")).getPath
res1: String = /path/to/myobjectAlso:
scala> URI.create(String.format("s3://%s/%s", "mybucket", "path/to/100%myobject")).getPath
java.lang.IllegalArgumentException: Malformed escape pair at index 25: s3://mybucket/path/to/100%myobject
at java.net.URI.create(URI.java:852)
... 28 elided
Caused by: java.net.URISyntaxException: Malformed escape pair at index 25: s3://mybucket/path/to/100%myobject
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.scanEscape(URI.java:2978)
at java.net.URI$Parser.scan(URI.java:3001)
at java.net.URI$Parser.checkChars(URI.java:3019)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at java.net.URI.create(URI.java:850)
... 28 moreThere was a problem hiding this comment.
IMHO we should always leave code better than we found it. Small bugs like this are not worth putting into an issue, and will likely never get worked on, but some poor soul somewhere on the interwebs will run into it and bang their head against it.
There was a problem hiding this comment.
Fixing bugs is not scope creep...
There was a problem hiding this comment.
This is not a regression and so doesn't have to be fixed in this PR. It's up to the author in this case.
IMHO we should always leave code better than we found it. Small bugs like this are not worth putting into an issue, and will likely never get worked on, but some poor soul somewhere on the interwebs will run into it and bang their head against it.
I don't think this will be happening for this bug. This bug is pretty critical and should be fixed as soon as possible.
There was a problem hiding this comment.
I went ahead and fixed this for most of S3 by refactoring to use CloudObjectLocation to ensure URI handling is good, though the footprint of this PR has grown a lot, which is what I was worried about. In some sense, this is sort of related to the work done in #6761. I have opened #8941 to finish the remaining issues.
| private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory"; | ||
| private static final Logger log = new Logger(S3Utils.class); | ||
|
|
||
| public static final int MAX_S3_RETRIES = 10; |
| ); | ||
| } | ||
| return CompressionUtils.decompress(s3Object.getObjectContent(), key); | ||
| return s3Object.getObjectContent(); |
There was a problem hiding this comment.
Should this decompress the stream like it did before?
There was a problem hiding this comment.
Oh this is my bad. S3Entity is a RetryingInputEntity and the returned input stream here is wrapped with RetryingInputStream. Decompression logic should be done on RetryingInputStream.
There was a problem hiding this comment.
May be worth adding a test
| |type|This should be `s3`.|N/A|yes| | ||
| |uris|JSON array of URIs where s3 files to be ingested are located.|N/A|`uris` or `prefixes` must be set| | ||
| |prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|N/A|`uris` or `prefixes` must be set| | ||
|
|
There was a problem hiding this comment.
With your latest changes, need to add another row for objects here and update the required value for the other columns based on the presence of objects.
There was a problem hiding this comment.
Ah, I actually wasn't going to document objects because it's primarily used internally for the splits for parallel subtasks to avoid converting bucket/path back into a URI, but I guess if people prefer to put in an array of objects instead of an array of uris I guess there is no harm in documenting it.
There was a problem hiding this comment.
The only harm will be making it something we can't get rid of in the future. If you want a method to have public visibility but not be a public API, we should note that on the method.
| if (keyString.startsWith("/")) { | ||
| keyString = keyString.substring(1); | ||
| } |
There was a problem hiding this comment.
This logic is used in a few places (e.g., CloudObjectLocation(URI uri). May be useful to add a helper function.
There was a problem hiding this comment.
Added StringUtils.maybeRemoveLeadingSlash
| getRetryCondition(), | ||
| RetryUtils.DEFAULT_MAX_TRIES | ||
| ); | ||
| return CompressionUtils.decompress(retryingInputStream, getDecompressionPath()); |
There was a problem hiding this comment.
nit: thank you for fixing this! I think The javadoc of readFrom and readFromStart now should mention the returned inputStream shouldn't decompress. I think I'm going to raise a PR for adding some unit tests for this bug and maybe I can update javadoc in my PR unless you want to do it here.
| InputRowSchema inputRowSchema, | ||
| @Nullable InputFormat inputFormat, | ||
| File temporaryDirectory | ||
| @Nullable File temporaryDirectory |
There was a problem hiding this comment.
Is this nullable now? I think it shouldn't be nullable if it's null only in unit tests.
There was a problem hiding this comment.
oops, i don't remember adding this so I suspect I did this on accident, will fix
I think all of the new native batch stuff will have to be addressed in the release notes. However all of the old stuff is still there so |
@vogievetsky — "auto firehose to input source converter" sounds a bit scary, I don't think special effort was put into making the sources consistent with the firehoses (rather, I believe the effort was instead put into making them consistent with other input sources). So there may be other pitfalls if you are assuming they can be converted without special knowledge of the specific source type. |
|
|
||
|
|
||
| /** | ||
| * Get path to decompress a compressed stream for the entity |
There was a problem hiding this comment.
I had trouble making sense of what this comment means, could you please consider rewording it?
At first glance it sounds like a path on local disk that the compressed stream will be decompressed to, but looking at implementations, that doesn't seem right.
At second glance it looks like it's the filename corresponding to the input entity, and is just used to figure out if it needs to be decompressed or not. The javadoc should say something like that.
There was a problem hiding this comment.
redid javadocs, renamed method to more generic getPath since it's also a correct name given usage and maybe this is useful for other things
| @@ -32,30 +33,39 @@ public interface RetryingInputEntity extends InputEntity | |||
| @Override | |||
| default InputStream open() throws IOException | |||
There was a problem hiding this comment.
Now that readFromStart() and readFrom(long) are clearer, could you please also update the javadocs for InputEntity#open as well, to say whether or not it should decompress?
There was a problem hiding this comment.
added javadocs mentioning that default open implementation handle decompression
| * Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, this should | ||
| * return the raw stream for the object. | ||
| */ | ||
| default InputStream readFromStart() throws IOException |
There was a problem hiding this comment.
nit: These seem like "internal" methods that aren't actually meant to be called by users of the interface. In that case RetryingInputEntity probably makes more sense as an abstract class than as an interface, with these methods marked protected. I won't insist it be changed, but if it stays an interface, it'd be nice for the javadocs to say that external callers aren't meant to use these methods.
The reason is a general assumption that any method on an interface is meant for users of the interface.
There was a problem hiding this comment.
refactored to abstract class
| @Override | ||
| public String toString() | ||
| { | ||
| return "CloudObjectLocation {" |
There was a problem hiding this comment.
The formatting is a little weird here.
There was a problem hiding this comment.
indeed, fixed, not really sure how it got mangled, i used intellij to generate it originally
| <dependency> | ||
| <groupId>joda-time</groupId> | ||
| <artifactId>joda-time</artifactId> | ||
| <version>2.10.5</version> |
There was a problem hiding this comment.
Please do remove the version here, it should get pulled it via dependencyManagement from the parent.
| } | ||
| } | ||
|
|
||
| private S3InputSource(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation inputSplit) |
There was a problem hiding this comment.
nit: IMO, it's better to have only one constructor that actually does stuff, and have the others call this(...). It makes invariants easier to get right.
Or just have one constructor, period, and use static creator methods for other styles of creation.
There was a problem hiding this comment.
removed extra constructor
| { | ||
| this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); | ||
| this.uris = uris == null ? new ArrayList<>() : uris; | ||
| this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; |
There was a problem hiding this comment.
Is there any reason for the inconsistency here: uris and prefixes are set to empty lists if they come in as null, but objects isn't?
There was a problem hiding this comment.
burned again for initially starting by porting over the S3StaticFirehoseFactory, reworked this to use nulls... though as I'm writing this comment I realize I should probably treat empties the same as null and not consider that invalid... will fix PR again
| } | ||
|
|
||
| for (final URI inputURI : this.uris) { | ||
| Preconditions.checkArgument("s3".equals(inputURI.getScheme()), "input uri scheme == s3 (%s)", inputURI); |
There was a problem hiding this comment.
nit: be nice to extract "s3" into a constant like SCHEME.
| org.apache.commons.io.FileUtils.forceMkdir(outDir); | ||
|
|
||
| final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); | ||
| final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.getBucket(), s3Coords.getPath())); |
There was a problem hiding this comment.
This code is definitely buggy, please at least include a comment warning future devs as much. (You aren't doing much here besides mechanical refactoring, so I wouldn't insist on fixing it or adding tests, but the comment is nice.)
Example of the bug:
scala> URI.create(String.format("s3://%s/%s", "mybucket", "path/to/myobject?question")).getPath
res1: String = /path/to/myobjectAlso:
scala> URI.create(String.format("s3://%s/%s", "mybucket", "path/to/100%myobject")).getPath
java.lang.IllegalArgumentException: Malformed escape pair at index 25: s3://mybucket/path/to/100%myobject
at java.net.URI.create(URI.java:852)
... 28 elided
Caused by: java.net.URISyntaxException: Malformed escape pair at index 25: s3://mybucket/path/to/100%myobject
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.scanEscape(URI.java:2978)
at java.net.URI$Parser.scan(URI.java:3001)
at java.net.URI$Parser.checkChars(URI.java:3019)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.<init>(URI.java:588)
at java.net.URI.create(URI.java:850)
... 28 more| originalAuthority; | ||
| final String path = originalPath.startsWith("/") ? originalPath.substring(1) : originalPath; | ||
|
|
||
| return URI.create(StringUtils.format("s3://%s/%s", authority, path)); |
There was a problem hiding this comment.
How about adding this method to CloudObjectLocation:
public URI toUri()
{
// Encode path, except leave '/' characters unencoded
return URI.create(StringUtils.format("s3://%s/%s", bucket, StringUtils.urlEncode(path).replace("%2F", "/"));
}And using it everywhere that is doing this sort of concatenation today.
It won't handle weird, invalid bucket names but it's better than the simple concatenation happening now, and weird paths are more likely anyway. For extra credit you could include validation for the bucket, throwing an error if it's not valid (AWS, Google, etc all have rules for what's a valid bucket, you could do a loose superset of them).
|
@gianm I made a thing that if you paste in a firehose based input spec into the data loader it will be magically converted to an input source based one. I need that because the data loader will soon only work with input sources. Do you think this is a bad idea? I know there are a lot of Druid users that have ingestion specs saved somewhere outside of Druid what were you imagining these people will do to get onto the new format? Convert it by hand? I figured that the data loader could be helpful there. |
|
Also if the data loader does not have that feature should there still be a paragraph in the release notes that guides people how to convert between the specs? Or is it just "here are the new docs, figure it out"? |
|
@vogievetsky let's continue this conversation in #8933 |
| * implementations. {@link #bucket} and {@link #path} should NOT be URL encoded. | ||
| * | ||
| * The intention is that this is used as a common representation for storage objects as an alternative to dealing in | ||
| * {@link URI} directly, but still provide a mechansim to round-trip with a URI. |
There was a problem hiding this comment.
If you need to push another commit later, there's a typo here: mechnsim -> mechanism
gianm
left a comment
There was a problem hiding this comment.
Just had one more comment on the new stuff, thanks @clintropolis
| { | ||
| this.bucket = Preconditions.checkNotNull(StringUtils.maybeRemoveTrailingSlash(bucket)); | ||
| this.path = Preconditions.checkNotNull(StringUtils.maybeRemoveLeadingSlash(path)); | ||
| Preconditions.checkArgument(this.bucket.equals(StringUtils.urlEncode(this.bucket))); |
There was a problem hiding this comment.
This exception might get thrown in response to user input, so please add a nice error message. As is, the user would get an IllegalArgumentException with no message.
gianm
left a comment
There was a problem hiding this comment.
LGTM, thanks @clintropolis
Description
Following up to #8823, this PR adds an S3
InputSourceandInputEntityimplementation allowing it to be used with the new native batch indexing interfaces. This is currently re-uses the same configuration options as the s3 static firehose, but as anInputSource:This PR has:
Key changed/added classes in this PR
S3InputSourceS3Entity