Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
aff58ee
add s3 input source for native batch ingestion
clintropolis Nov 19, 2019
8d14aec
add docs
clintropolis Nov 19, 2019
33a02e8
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 19, 2019
ef4cc31
fixes
clintropolis Nov 19, 2019
11cb7cc
checkstyle
clintropolis Nov 19, 2019
5622807
lazy splits
clintropolis Nov 20, 2019
92c8f93
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 20, 2019
786e526
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 20, 2019
f7bb70e
fixes and hella tests
clintropolis Nov 20, 2019
0af2d6e
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 20, 2019
a4f6ae9
fix it
clintropolis Nov 20, 2019
d23a131
re-use better iterator
clintropolis Nov 20, 2019
765a6f4
use key
clintropolis Nov 20, 2019
008ddac
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 20, 2019
23d63ec
javadoc and checkstyle
clintropolis Nov 20, 2019
a16758b
exception
clintropolis Nov 21, 2019
7125e3e
oops
clintropolis Nov 21, 2019
4f221c2
refactor to use S3Coords instead of URI
clintropolis Nov 21, 2019
74fd2eb
remove unused code, add retrying stream to handle s3 stream
clintropolis Nov 21, 2019
b4527c8
remove unused parameter
clintropolis Nov 21, 2019
5d35b80
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 22, 2019
f47edbf
update to latest master
clintropolis Nov 22, 2019
b07e490
use list of objects instead of object
clintropolis Nov 22, 2019
33cee3c
serde test
clintropolis Nov 22, 2019
fd3c7ae
refactor and such
clintropolis Nov 22, 2019
3f6d548
now with the ability to compile
clintropolis Nov 22, 2019
a0677b8
fix signature and javadocs
clintropolis Nov 22, 2019
7622c1f
Merge remote-tracking branch 'upstream/master' into s3-input-source
clintropolis Nov 24, 2019
50746a6
fix conflicts yet again, fix S3 uri stuffs
clintropolis Nov 24, 2019
8e96e55
more tests, enforce uri for bucket
clintropolis Nov 24, 2019
283d457
javadoc
clintropolis Nov 24, 2019
bdb22ca
oops
clintropolis Nov 25, 2019
bcca782
abstract class instead of interface
clintropolis Nov 25, 2019
4283769
null or empty
clintropolis Nov 25, 2019
81ebdd4
better error
clintropolis Nov 26, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,5 @@ public interface InputSource
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
*/
InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
);
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,58 @@

package org.apache.druid.data.input;

import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.utils.CompressionUtils;

import java.io.IOException;
import java.io.InputStream;

public interface RetryingInputEntity extends InputEntity
public abstract class RetryingInputEntity implements InputEntity
{
/**
* Open a {@link RetryingInputStream} wrapper for an underlying input stream, optionally decompressing the retrying
* stream if the file extension matches a known compression, otherwise passing through the retrying stream directly.
*/
@Override
default InputStream open() throws IOException
public InputStream open() throws IOException
{
return new RetryingInputStream<>(
RetryingInputStream<?> retryingInputStream = new RetryingInputStream<>(
this,
new RetryingInputEntityOpenFunction(),
getRetryCondition(),
RetryUtils.DEFAULT_MAX_TRIES
);
return CompressionUtils.decompress(retryingInputStream, getPath());
}

/**
* Directly opens an {@link InputStream} on the input entity.
* Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
* handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
*/
default InputStream readFromStart() throws IOException
protected InputStream readFromStart() throws IOException
{
return readFrom(0);
}

/**
* Directly opens an {@link InputStream} starting at the given offset on the input entity.
* Directly opens an {@link InputStream} starting at the given offset on the input entity. Decompression should be
* handled externally, and is handled by the default implementation of {@link #open},this should return the raw stream
* for the object.
*
* @param offset an offset to start reading from. A non-negative integer counting
* the number of bytes from the beginning of the entity
*/
InputStream readFrom(long offset) throws IOException;
protected abstract InputStream readFrom(long offset) throws IOException;

@Override
Predicate<Throwable> getRetryCondition();
/**
* Get path name for this entity, used by the default implementation of {@link #open} to determine if the underlying
* stream needs decompressed, based on file extension of the path
*/
protected abstract String getPath();

class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
private static class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
{
@Override
public InputStream open(RetryingInputEntity object) throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;

import java.net.URI;
import java.util.Objects;

/**
* Common type for 'bucket' and 'path' concept of cloud objects to allow code sharing between cloud specific
* implementations. {@link #bucket} and {@link #path} should NOT be URL encoded.
*
* The intention is that this is used as a common representation for storage objects as an alternative to dealing in
* {@link URI} directly, but still provide a mechanism to round-trip with a URI.
*
* In common clouds, bucket names must be dns compliant:
* https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
* https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
* https://cloud.google.com/storage/docs/naming
*
* The constructor ensures that bucket names are DNS compliant by checking that the URL encoded form of the bucket
* matches the supplied value. Technically it should probably confirm that the bucket is also all lower-case, but
* S3 has a legacy mode where buckets did not have to be compliant so we can't enforce that here unfortunately.
*/
public class CloudObjectLocation
{
private final String bucket;
private final String path;

@JsonCreator
public CloudObjectLocation(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path)
{
this.bucket = Preconditions.checkNotNull(StringUtils.maybeRemoveTrailingSlash(bucket));
this.path = Preconditions.checkNotNull(StringUtils.maybeRemoveLeadingSlash(path));
Preconditions.checkArgument(
this.bucket.equals(StringUtils.urlEncode(this.bucket)),
"bucket must follow DNS-compliant naming conventions"
);
}

public CloudObjectLocation(URI uri)
{
this(uri.getHost(), uri.getPath());
}

/**
* Given a scheme, encode {@link #bucket} and {@link #path} into a {@link URI}.
*
* In all clouds bucket names must be dns compliant, so it does not require encoding
* There is no such restriction on object names, so they will be URL encoded when constructing the URI
*/
public URI toUri(String scheme)
{
// Encode path, except leave '/' characters unencoded
return URI.create(
StringUtils.format(
"%s://%s/%s",
scheme,
bucket,
StringUtils.replace(StringUtils.urlEncode(path), "%2F", "/")
)
);
}

@JsonProperty
public String getBucket()
{
return bucket;
}

@JsonProperty
public String getPath()
{
return path;
}

@Override
public String toString()
{
return "CloudObjectLocation{" +
"bucket='" + bucket + '\'' +
", path='" + path + '\'' +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final CloudObjectLocation that = (CloudObjectLocation) o;
return Objects.equals(bucket, that.bucket) &&
Objects.equals(path, that.path);
}

@Override
public int hashCode()
{
return Objects.hash(bucket, path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -36,7 +35,7 @@
import java.net.URLConnection;
import java.util.Base64;

public class HttpEntity implements RetryingInputEntity
public class HttpEntity extends RetryingInputEntity
{
private static final Logger LOG = new Logger(HttpEntity.class);

Expand Down Expand Up @@ -64,12 +63,15 @@ public URI getUri()
}

@Override
public InputStream readFrom(long offset) throws IOException
protected InputStream readFrom(long offset) throws IOException
{
return CompressionUtils.decompress(
openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset),
uri.toString()
);
return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset);
}

@Override
protected String getPath()
{
return uri.getPath();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ public static String urlDecode(String s)
}
}

public static String maybeRemoveLeadingSlash(String s)
{
return s != null && s.startsWith("/") ? s.substring(1) : s;
}

public static String maybeRemoveTrailingSlash(String s)
{
return s != null && s.endsWith("/") ? s.substring(0, s.length() - 1) : s;
}

/**
* Removes all occurrences of the given char from the given string. This method is an optimal version of
* {@link String#replace(CharSequence, CharSequence) s.replace("c", "")}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.ISE;

import javax.annotation.Nullable;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.TreeSet;
Expand Down Expand Up @@ -116,6 +118,11 @@ public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMa
return result;
}

public static boolean isNullOrEmpty(@Nullable List<?> list)
{
return list == null || list.isEmpty();
}

private CollectionUtils()
{
}
Expand Down
Loading