Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/java/org/apache/druid/data/input/InputEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.data.input;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -63,7 +64,7 @@ interface CleanableFile extends Closeable
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
* @see #fetch as an alternative way to read data.
* @see #fetch
*/
InputStream open() throws IOException;

Expand All @@ -89,7 +90,7 @@ default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws
is,
tempFile,
fetchBuffer,
getFetchRetryCondition(),
getRetryCondition(),
DEFAULT_MAX_NUM_FETCH_TRIES,
StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath())
);
Expand All @@ -114,7 +115,12 @@ public void close()
}

/**
* {@link #fetch} will retry during the fetch if it sees an exception matching to the returned predicate.
* Returns a retry condition that the caller should retry on.
* The returned condition should be used when reading data from this InputEntity such as in {@link #fetch}
* or {@link RetryingInputEntity}.
*/
Predicate<Throwable> getFetchRetryCondition();
default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;

import java.io.IOException;
import java.io.InputStream;

public interface RetryingInputEntity extends InputEntity
{
@Override
default InputStream open() throws IOException
{
return new RetryingInputStream<>(
this,
new RetryingInputEntityOpenFunction(),
getRetryCondition(),
RetryUtils.DEFAULT_MAX_TRIES
);
}

/**
* Directly opens an {@link InputStream} on the input entity.
*/
default InputStream readFromStart() throws IOException
{
return readFrom(0);
}

/**
* Directly opens an {@link InputStream} starting at the given offset on the input entity.
*
* @param offset an offset to start reading from. A non-negative integer counting
* the number of bytes from the beginning of the entity
*/
InputStream readFrom(long offset) throws IOException;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there ever be cases where the offset type for an InputEntity might be something other than a long?

If not, can you add a comment about that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good question. It's hard for me to imagine an offset of other types since InputEntity abstracts an byte-representable entity. I added javadoc on it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks


@Override
Predicate<Throwable> getRetryCondition();

class RetryingInputEntityOpenFunction implements ObjectOpenFunction<RetryingInputEntity>
{
@Override
public InputStream open(RetryingInputEntity object) throws IOException
{
return object.readFromStart();
}

@Override
public InputStream open(RetryingInputEntity object, long start) throws IOException
{
return object.readFrom(start);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.data.input.impl;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.io.ByteBufferInputStream;

Expand Down Expand Up @@ -60,10 +58,4 @@ public InputStream open()
{
return new ByteBufferInputStream(buffer);
}

@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return Predicates.alwaysFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.data.input.impl;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.utils.CompressionUtils;

Expand Down Expand Up @@ -69,10 +67,4 @@ public InputStream open() throws IOException
{
return CompressionUtils.decompress(new FileInputStream(file), file.getName());
}

@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return Predicates.alwaysFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import org.apache.druid.data.input.InputEntity;
import com.google.common.net.HttpHeaders;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;

Expand All @@ -33,8 +36,10 @@
import java.net.URLConnection;
import java.util.Base64;

public class HttpEntity implements InputEntity
public class HttpEntity implements RetryingInputEntity
{
private static final Logger LOG = new Logger(HttpEntity.class);

private final URI uri;
@Nullable
private final String httpAuthenticationUsername;
Expand All @@ -59,29 +64,52 @@ public URI getUri()
}

@Override
public InputStream open() throws IOException
public InputStream readFrom(long offset) throws IOException
{
return CompressionUtils.decompress(
openURLConnection(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider).getInputStream(),
openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset),
uri.toString()
);
}

@Override
public Predicate<Throwable> getFetchRetryCondition()
public Predicate<Throwable> getRetryCondition()
{
return t -> t instanceof IOException;
}

public static URLConnection openURLConnection(URI object, String userName, PasswordProvider passwordProvider)
public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset)
throws IOException
{
URLConnection urlConnection = object.toURL().openConnection();
final URLConnection urlConnection = object.toURL().openConnection();
if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) {
String userPass = userName + ":" + passwordProvider.getPassword();
String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
urlConnection.setRequestProperty("Authorization", basicAuthString);
}
return urlConnection;
final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges);
if (withRanges && offset > 0) {
// Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", offset));
return urlConnection.getInputStream();
} else {
if (!withRanges && offset > 0) {
LOG.warn(
"Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+ " a lot."
);
}
final InputStream in = urlConnection.getInputStream();
final long skipped = in.skip(offset);
if (skipped != offset) {
throw new ISE("Requested to skip [%s] bytes, but actual number of bytes skipped is [%s]", offset, skipped);
}
return in;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
* under the License.
*/

package org.apache.druid.data.input.impl.prefetch;
package org.apache.druid.data.input.impl;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.io.CountingInputStream;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;

Expand All @@ -35,7 +37,7 @@
*
* @param <T> object type
*/
class RetryingInputStream<T> extends InputStream
public class RetryingInputStream<T> extends InputStream
{
private static final Logger log = new Logger(RetryingInputStream.class);

Expand All @@ -47,7 +49,7 @@ class RetryingInputStream<T> extends InputStream
private CountingInputStream delegate;
private long startOffset;

RetryingInputStream(
public RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
Predicate<Throwable> retryCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl.prefetch;

import com.google.common.base.Predicate;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.FileIteratingFirehose;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class RetryUtils
public static final Logger log = new Logger(RetryUtils.class);
public static final long MAX_SLEEP_MILLIS = 60000;
public static final long BASE_SLEEP_MILLIS = 1000;
public static final int DEFAULT_MAX_TRIES = 10;

public interface Task<T>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.druid.data.input.google;

import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
Expand All @@ -31,7 +31,7 @@
import java.io.InputStream;
import java.net.URI;

public class GoogleCloudStorageEntity implements InputEntity
public class GoogleCloudStorageEntity implements RetryingInputEntity
{
private final GoogleStorage storage;
private final URI uri;
Expand All @@ -50,17 +50,17 @@ public URI getUri()
}

@Override
public InputStream open() throws IOException
public InputStream readFrom(long offset) throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri);
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return CompressionUtils.decompress(byteSource.openStream(), uri.getPath());
return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath());
}

@Override
public Predicate<Throwable> getFetchRetryCondition()
public Predicate<Throwable> getRetryCondition()
{
return GoogleUtils.GOOGLE_RETRY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ protected InputSourceReader formattableReader(
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(storage, split.get())),
createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(
storage,
split.get()
)),
temporaryDirectory
);
}
Expand Down
Loading