diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
index f181d08f443c..168bdb7069da 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java
@@ -20,10 +20,13 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.AbstractInputStreamContent;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpResponse;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Supplier;
+import org.apache.druid.java.util.common.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -36,7 +39,7 @@ public class GoogleStorage
* if we have a Storage instead of a supplier of it, it can cause unnecessary config validation
* against Google storage even when it's not used at all. To perform the config validation
* only when it is actually used, we use a supplier.
- *
+ *
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private final Supplier storage;
@@ -59,6 +62,26 @@ public InputStream get(final String bucket, final String path) throws IOExceptio
return get(bucket, path, 0);
}
+ public InputStream getUsingRangeHeaders(final String bucket, final String path, long start, long end)
+ throws IOException
+ {
+ final Get get = storage.get().objects().get(bucket, path);
+ HttpHeaders httpHeaders = new HttpHeaders();
+
+ String rangeString = StringUtils.format("bytes %d-%d/*", start, end);
+
+ httpHeaders.setRange(rangeString);
+ get.setRequestHeaders(httpHeaders);
+
+ HttpResponse httpResponse = get.executeUsingHead();
+ String responseRangeString = httpResponse.getHeaders().getRange();
+ if (!responseRangeString.equals(rangeString)) {
+ throw new IOException("Unable to fetch the correct range");
+ }
+
+ return get.executeMediaAsInputStream();
+ }
+
public InputStream get(final String bucket, final String path, long start) throws IOException
{
final Get get = storage.get().objects().get(bucket, path);
@@ -86,7 +109,7 @@ public boolean exists(final String bucket, final String path)
return false;
}
}
-
+
public long size(final String bucket, final String path) throws IOException
{
return storage.get().objects().get(bucket, path).execute().getSize().longValue();
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
index 25b4f3286ea7..5b2727abb61b 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java
@@ -45,7 +45,7 @@ public static boolean isRetryable(Throwable t)
return t instanceof IOException;
}
- static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception
+ public static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception
{
return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java
new file mode 100644
index 000000000000..ee0d9a58f880
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import java.util.Objects;
+
+public class GoogleInputRange
+{
+ private final long start;
+ private final long size;
+ private final String bucket;
+ private final String path;
+
+ public GoogleInputRange(long start, long size, String bucket, String path)
+ {
+ this.start = start;
+ this.size = size;
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public long getSize()
+ {
+ return size;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GoogleInputRange that = (GoogleInputRange) o;
+ return start == that.start
+ && size == that.size
+ && Objects.equals(bucket, that.bucket)
+ && Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(start, size, bucket, path);
+ }
+}
+
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java
new file mode 100644
index 000000000000..4fa91c1c911a
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.RetryUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class GoogleOutputConfig
+{
+
+ @JsonProperty
+ private final String bucket;
+
+ @JsonProperty
+ private final String prefix;
+
+ @JsonProperty
+ private final File tempDir;
+
+ @JsonProperty
+ private boolean chunkedDownloads = false;
+
+ @JsonProperty
+ private HumanReadableBytes chunkSize = new HumanReadableBytes("100MiB");
+
+ @JsonProperty
+ private int maxRetry = RetryUtils.DEFAULT_MAX_TRIES;
+
+ public GoogleOutputConfig(
+ final String bucket,
+ final String prefix,
+ final File tempDir,
+ @Nullable final Boolean chunkedDownloads,
+ @Nullable final HumanReadableBytes chunkSize,
+ @Nullable final Integer maxRetry
+ )
+ {
+ this.bucket = bucket;
+ this.prefix = prefix;
+ this.tempDir = tempDir;
+ if (chunkedDownloads != null) {
+ this.chunkedDownloads = chunkedDownloads;
+ }
+ if (chunkSize != null) {
+ this.chunkSize = chunkSize;
+ }
+ if (maxRetry != null) {
+ this.maxRetry = maxRetry;
+ }
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+
+ public File getTempDir()
+ {
+ return tempDir;
+ }
+
+ public Boolean isChunkedDownloads()
+ {
+ return chunkedDownloads;
+ }
+
+ public HumanReadableBytes getChunkSize()
+ {
+ return chunkSize;
+ }
+
+ public Integer getMaxRetry()
+ {
+ return maxRetry;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GoogleOutputConfig that = (GoogleOutputConfig) o;
+ return Objects.equals(bucket, that.bucket)
+ && Objects.equals(prefix, that.prefix)
+ && Objects.equals(tempDir, that.tempDir)
+ && Objects.equals(chunkedDownloads, that.chunkedDownloads)
+ && Objects.equals(chunkSize, that.chunkSize)
+ && Objects.equals(maxRetry, that.maxRetry);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucket, prefix, tempDir, chunkedDownloads, chunkSize, maxRetry);
+ }
+
+
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java
new file mode 100644
index 000000000000..3f0bba093b6a
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteStreams;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+import org.apache.druid.storage.google.GoogleUtils;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.UUID;
+
+public class GoogleStorageConnector extends ChunkingStorageConnector
+{
+
+ private static final String DELIM = "/";
+ private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+ private static final Logger log = new Logger(GoogleStorageConnector.class);
+
+ private final GoogleStorage storage;
+ private final GoogleOutputConfig config;
+ private final GoogleInputDataConfig inputDataConfig;
+
+ public GoogleStorageConnector(
+ GoogleStorage storage,
+ GoogleOutputConfig config,
+ GoogleInputDataConfig inputDataConfig
+ )
+ {
+ this.storage = storage;
+ this.config = config;
+ this.inputDataConfig = inputDataConfig;
+ }
+
+
+ @Override
+ public boolean pathExists(String path)
+ {
+ return storage.exists(config.getBucket(), objectPath(path));
+ }
+
+ @Override
+ public OutputStream write(String path) throws IOException
+ {
+ File tempFile = new File(JOINER.join(config.getTempDir(), UUID.randomUUID().toString()));
+
+ return new OutputStream()
+ {
+
+ final OutputStream delegate = new FileOutputStream(tempFile);
+ @Override
+ public void write(int b) throws IOException
+ {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ delegate.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
+
+ try (final InputStream fileInputStream = Files.newInputStream(tempFile.toPath())) {
+ InputStreamContent mediaContent = new InputStreamContent("application/octet-stream", fileInputStream);
+ GoogleUtils.retryGoogleCloudStorageOperation(
+ () -> {
+ storage.insert(config.getBucket(), objectPath(path), mediaContent);
+ return null;
+ }
+ );
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ if (!tempFile.delete()) {
+ throw new RE("Unable to delete the temp file [%s]", tempFile);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void deleteFile(String path) throws IOException
+ {
+ try {
+ final String fullPath = objectPath(path);
+ log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath);
+
+ GoogleUtils.retryGoogleCloudStorageOperation(
+ () -> {
+ storage.delete(config.getBucket(), fullPath);
+ return null;
+ }
+ );
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage());
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable paths) throws IOException
+ {
+ for (String path : paths) {
+ deleteFile(objectPath(path));
+ }
+ }
+
+ @Override
+ public void deleteRecursively(String path) throws IOException
+ {
+ try {
+ GoogleUtils.deleteObjectsInPath(
+ storage,
+ inputDataConfig,
+ config.getBucket(),
+ objectPath(config.getPrefix()),
+ p -> true
+ );
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Iterator listDir(String dirName)
+ {
+ final String fullPath = objectPath(dirName);
+ Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath)
+ .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(),
+ inputDataConfig.getMaxListingLength()
+ );
+
+ return Iterators.transform(
+ storageObjects,
+ storageObject -> {
+ String[] split = storageObject.getName().split(fullPath, 2);
+ if (split.length > 1) {
+ return split[1];
+ } else {
+ return "";
+ }
+ }
+ );
+ }
+
+ @Override
+ public InputStream read(String path) throws IOException
+ {
+ if (config.isChunkedDownloads()) {
+ return super.read(path);
+ }
+
+ return storage.get(config.getBucket(), objectPath(path));
+ }
+
+ @Override
+ public InputStream readRange(String path, long from, long size) throws IOException
+ {
+ if (config.isChunkedDownloads()) {
+ return super.readRange(path, from, size);
+ }
+
+ return ByteStreams.limit(storage.get(config.getBucket(), objectPath(path), from), size);
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException
+ {
+ long size = storage.size(config.getBucket(), objectPath(path));
+ return buildInputParams(path, 0, size);
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size)
+ {
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(from);
+ builder.end(from + size);
+ builder.cloudStoragePath(objectPath(path));
+ builder.tempDirSupplier(config::getTempDir);
+ builder.maxRetry(config.getMaxRetry());
+ builder.retryCondition(GoogleUtils.GOOGLE_RETRY);
+ builder.objectSupplier(((start, end) -> new GoogleInputRange(start, end - start, config.getBucket(), objectPath(path))));
+ builder.objectOpenFunction(new ObjectOpenFunction()
+ {
+ @Override
+ public InputStream open(GoogleInputRange googleInputRange) throws IOException
+ {
+ long rangeEnd = googleInputRange.getStart() + googleInputRange.getSize() - 1;
+ return storage.getUsingRangeHeaders(
+ googleInputRange.getBucket(),
+ googleInputRange.getPath(),
+ googleInputRange.getStart(),
+ rangeEnd
+ );
+ }
+
+ @Override
+ public InputStream open(GoogleInputRange googleInputRange, long offset) throws IOException
+ {
+ long rangeStart = googleInputRange.getStart() + offset;
+ long rangeEnd = googleInputRange.getStart() + googleInputRange.getSize() - 1;
+ return storage.getUsingRangeHeaders(
+ googleInputRange.getBucket(),
+ googleInputRange.getPath(),
+ rangeStart,
+ rangeEnd
+ );
+ }
+ });
+
+ return builder.build();
+ }
+
+ private String objectPath(String path)
+ {
+ return JOINER.join(config.getPrefix(), path);
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
new file mode 100644
index 000000000000..875e9fde46e3
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GoogleStorageConnectorModule implements DruidModule
+{
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(
+ new SimpleModule(GoogleStorageConnectorModule.class.getSimpleName())
+ .registerSubtypes(GoogleStorageConnectorProvider.class)
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+}
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java
new file mode 100644
index 000000000000..37dd1c1234f9
--- /dev/null
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.google.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.google.GoogleInputDataConfig;
+import org.apache.druid.storage.google.GoogleStorage;
+import org.apache.druid.storage.google.GoogleStorageDruidModule;
+
+import java.io.File;
+
+@JsonTypeName(GoogleStorageDruidModule.SCHEME)
+public class GoogleStorageConnectorProvider extends GoogleOutputConfig implements StorageConnectorProvider
+{
+
+ private final GoogleStorage storage;
+ private final GoogleInputDataConfig inputDataConfig;
+
+ @JsonCreator
+ public GoogleStorageConnectorProvider(
+ @JacksonInject GoogleStorage storage,
+ @JacksonInject GoogleInputDataConfig inputDataConfig,
+ @JsonProperty(value = "bucket", required = true) final String bucket,
+ @JsonProperty(value = "prefix", required = true) final String prefix,
+ @JsonProperty(value = "tempDir", required = true) final File tempDir,
+ @JsonProperty(value = "chunkedDownloads") final Boolean chunkedDownloads,
+ @JsonProperty(value = "chunkSize") final HumanReadableBytes chunkSize,
+ @JsonProperty(value = "maxRetry") final Integer maxRetry
+ )
+ {
+ super(bucket, prefix, tempDir, chunkedDownloads, chunkSize, maxRetry);
+ this.storage = Preconditions.checkNotNull(storage, "google client must be provided");
+ this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "google input data config must be provided");
+ }
+
+ @Override
+ public StorageConnector get()
+ {
+ return new GoogleStorageConnector(storage, this, inputDataConfig);
+ }
+}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 81dfb5747621..a68ed9c1c00c 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -27,40 +27,29 @@
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-import org.apache.commons.io.input.NullInputStream;
import org.apache.druid.data.input.impl.CloudObjectLocation;
-import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nonnull;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.SequenceInputStream;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times.
*/
-public class S3StorageConnector implements StorageConnector
+public class S3StorageConnector extends ChunkingStorageConnector
{
private static final Logger log = new Logger(S3StorageConnector.class);
@@ -69,7 +58,6 @@ public class S3StorageConnector implements StorageConnector
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
- private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
private static final int MAX_NUMBER_OF_LISTINGS = 1000;
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
@@ -105,169 +93,61 @@ public boolean pathExists(String path) throws IOException
}
@Override
- public InputStream read(String path)
+ public ChunkingStorageConnectorParameters buildInputParams(String path)
{
- return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
- }
-
- @Override
- public InputStream readRange(String path, long from, long size)
- {
- if (from < 0 || size < 0) {
- throw new IAE(
- "Invalid arguments for reading %s. from = %d, readSize = %d",
- objectPath(path),
- from,
- size
+ long size;
+ try {
+ size = S3Utils.retryS3Operation(
+ () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(),
+ config.getMaxRetry()
);
}
- return buildInputStream(
- new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
- path
- );
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return buildInputParams(path, 0, size);
}
- private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size)
{
- // fetch the size of the whole object to make chunks
- long readEnd;
- AtomicLong currReadStart = new AtomicLong(0);
- if (getObjectRequest.getRange() != null) {
- currReadStart.set(getObjectRequest.getRange()[0]);
- readEnd = getObjectRequest.getRange()[1] + 1;
- } else {
- try {
- readEnd = S3Utils.retryS3Operation(
- () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(),
- config.getMaxRetry()
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
-
- // build a sequence input stream from chunks
- return new SequenceInputStream(new Enumeration()
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(from);
+ builder.end(from + size);
+ builder.cloudStoragePath(objectPath(path));
+ builder.tempDirSupplier(config::getTempDir);
+ builder.maxRetry(config.getMaxRetry());
+ builder.retryCondition(S3Utils.S3RETRY);
+ builder.objectSupplier((start, end) -> new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(start, end - 1));
+ builder.objectOpenFunction(new ObjectOpenFunction()
{
- boolean initStream = false;
@Override
- public boolean hasMoreElements()
+ public InputStream open(GetObjectRequest object)
{
- // checking if the stream was already closed. If it was, then don't iterate over the remaining chunks
- // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them
- // lazily, we don't need to close them.
- if (isSequenceStreamClosed.get()) {
- return false;
- }
- // don't stop until the whole object is downloaded
- return currReadStart.get() < readEnd;
- }
-
- @Override
- public InputStream nextElement()
- {
- // since Sequence input stream calls nextElement in the constructor, we start chunking as soon as we call read.
- // to avoid that we pass a nullInputStream for the first iteration.
- if (!initStream) {
- initStream = true;
- return new NullInputStream();
- }
- File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
- // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
- long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
try {
- if (!outFile.createNewFile()) {
- throw new IOE(
- StringUtils.format(
- "Could not create temporary file [%s] for copying [%s]",
- outFile.getAbsolutePath(),
- objectPath(path)
- )
- );
- }
- FileUtils.copyLarge(
- () -> new RetryingInputStream<>(
- new GetObjectRequest(
- config.getBucket(),
- objectPath(path)
- ).withRange(currReadStart.get(), endPoint),
- new ObjectOpenFunction()
- {
- @Override
- public InputStream open(GetObjectRequest object)
- {
- try {
- return S3Utils.retryS3Operation(
- () -> s3Client.getObject(object).getObjectContent(),
- config.getMaxRetry()
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InputStream open(GetObjectRequest object, long offset)
- {
- if (object.getRange() != null) {
- long[] oldRange = object.getRange();
- object.setRange(oldRange[0] + offset, oldRange[1]);
- } else {
- object.setRange(offset);
- }
- return open(object);
- }
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- ),
- outFile,
- new byte[8 * 1024],
- Predicates.alwaysFalse(),
- 1,
- StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+ return S3Utils.retryS3Operation(
+ () -> s3Client.getObject(object).getObjectContent(),
+ config.getMaxRetry()
);
}
- catch (IOException e) {
- throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile));
- }
- try {
- AtomicBoolean isClosed = new AtomicBoolean(false);
- return new FileInputStream(outFile)
- {
- @Override
- public void close() throws IOException
- {
- // close should be idempotent
- if (isClosed.get()) {
- return;
- }
- isClosed.set(true);
- super.close();
- // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
- currReadStart.set(endPoint + 1);
- if (!outFile.delete()) {
- throw new RE("Cannot delete temp file [%s]", outFile);
- }
- }
- };
- }
- catch (FileNotFoundException e) {
- throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
}
- })
- {
+
@Override
- public void close() throws IOException
+ public InputStream open(GetObjectRequest object, long offset)
{
- isSequenceStreamClosed.set(true);
- super.close();
+ if (object.getRange() != null) {
+ long[] oldRange = object.getRange();
+ object.setRange(oldRange[0] + offset, oldRange[1]);
+ } else {
+ object.setRange(offset);
+ }
+ return open(object);
}
- };
+ });
+ return builder.build();
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
index 9682ac01d88e..a0bf3a91f0cc 100644
--- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
+++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
@@ -33,8 +33,10 @@ public class StorageConnectorModule implements DruidModule
@Override
public List extends Module> getJacksonModules()
{
- return ImmutableList.of(new SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes(
- LocalFileStorageConnectorProvider.class));
+ return ImmutableList.of(
+ new SimpleModule(StorageConnector.class.getSimpleName())
+ .registerSubtypes(LocalFileStorageConnectorProvider.class)
+ );
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
new file mode 100644
index 000000000000..d628ee87f4b8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class ChunkingStorageConnector implements StorageConnector
+{
+ private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
+
+ public ChunkingStorageConnector()
+ {
+
+ }
+
+ @Override
+ public InputStream read(String path) throws IOException
+ {
+ return buildInputStream(buildInputParams(path));
+ }
+
+ @Override
+ public InputStream readRange(String path, long from, long size) throws IOException
+ {
+ return buildInputStream(buildInputParams(path, from, size));
+ }
+
+ public abstract ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException;
+
+ public abstract ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size);
+
+ private InputStream buildInputStream(ChunkingStorageConnectorParameters params)
+ {
+ AtomicLong currentReadStartPosition = new AtomicLong(params.getStart());
+ long readEnd = params.getEnd();
+
+ AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
+
+ return new SequenceInputStream(
+
+ new Enumeration()
+ {
+ boolean initStream = false;
+
+ @Override
+ public boolean hasMoreElements()
+ {
+ // Checking if the stream was already closed. If it was, then don't iterate over the remaining chunks
+ // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them
+ // lazily, we don't need to close them.
+ if (isSequenceStreamClosed.get()) {
+ return false;
+ }
+ // Don't stop until the whole object is downloaded
+ return currentReadStartPosition.get() < readEnd;
+ }
+
+ @Override
+ public InputStream nextElement()
+ {
+ if (!initStream) {
+ initStream = true;
+ return new NullInputStream();
+ }
+
+ File outFile = new File(
+ params.getTempDirSupplier().get().getAbsolutePath(),
+ UUID.randomUUID().toString()
+ );
+
+ // exclusive
+ long currentReadEndPosition = Math.min(
+ currentReadStartPosition.get() + DOWNLOAD_MAX_CHUNK_SIZE,
+ readEnd
+ );
+
+ try {
+ if (!outFile.createNewFile()) {
+ throw new IOE(
+ StringUtils.format(
+ "Could not create temporary file [%s] for copying [%s]",
+ outFile.getAbsolutePath(),
+ params.getCloudStoragePath()
+ )
+ );
+ }
+
+ FileUtils.copyLarge(
+ () -> new RetryingInputStream<>(
+ params.getObjectSupplier().getObject(currentReadStartPosition.get(), currentReadEndPosition),
+ params.getObjectOpenFunction(),
+ params.getRetryCondition(),
+ params.getMaxRetry()
+ ),
+ outFile,
+ new byte[8 * 1024],
+ Predicates.alwaysFalse(),
+ 1,
+ StringUtils.format(
+ "Retrying copying of [%s] to [%s]",
+ params.getCloudStoragePath(),
+ outFile.getAbsolutePath()
+ )
+ );
+ }
+ catch (IOException e) {
+ throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", params.getCloudStoragePath(), outFile));
+ }
+
+ try {
+ AtomicBoolean fileInputStreamClosed = new AtomicBoolean(false);
+ return new FileInputStream(outFile)
+ {
+ @Override
+ public void close() throws IOException
+ {
+ // close should be idempotent
+ if (fileInputStreamClosed.get()) {
+ return;
+ }
+ fileInputStreamClosed.set(true);
+ super.close();
+ // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
+ currentReadStartPosition.set(currentReadEndPosition);
+ if (!outFile.delete()) {
+ throw new RE("Cannot delete temp file [%s]", outFile);
+ }
+ }
+
+ };
+ }
+ catch (FileNotFoundException e) {
+ throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+ }
+ }
+ }
+ )
+ {
+ @Override
+ public void close() throws IOException
+ {
+ isSequenceStreamClosed.set(true);
+ super.close();
+ }
+ };
+ }
+
+ public interface GetObjectFromRangeFunction
+ {
+ T getObject(long start, long end);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
new file mode 100644
index 000000000000..1e2440d11132
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+
+import java.io.File;
+import java.util.function.Supplier;
+
+// Start inclusive, end exclusive
+public class ChunkingStorageConnectorParameters
+{
+ private final long start;
+ private final long end;
+ private final String cloudStoragePath;
+ private final ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier;
+ private final ObjectOpenFunction objectOpenFunction;
+ private final Predicate retryCondition;
+ private final int maxRetry;
+ private final Supplier tempDirSupplier;
+
+ public ChunkingStorageConnectorParameters(
+ long start,
+ long end,
+ String cloudStoragePath,
+ ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier,
+ ObjectOpenFunction objectOpenFunction,
+ Predicate retryCondition,
+ int maxRetry,
+ Supplier tempDirSupplier
+ )
+ {
+ this.start = start;
+ this.end = end;
+ this.cloudStoragePath = cloudStoragePath;
+ this.objectSupplier = objectSupplier;
+ this.objectOpenFunction = objectOpenFunction;
+ this.retryCondition = retryCondition;
+ this.maxRetry = maxRetry;
+ this.tempDirSupplier = tempDirSupplier;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public long getEnd()
+ {
+ return end;
+ }
+
+ public String getCloudStoragePath()
+ {
+ return cloudStoragePath;
+ }
+
+ public ChunkingStorageConnector.GetObjectFromRangeFunction getObjectSupplier()
+ {
+ return objectSupplier;
+ }
+
+ public ObjectOpenFunction getObjectOpenFunction()
+ {
+ return objectOpenFunction;
+ }
+
+ public Predicate getRetryCondition()
+ {
+ return retryCondition;
+ }
+
+ public int getMaxRetry()
+ {
+ return maxRetry;
+ }
+
+ public Supplier getTempDirSupplier()
+ {
+ return tempDirSupplier;
+ }
+
+ public static class Builder
+ {
+ private long start;
+ private long end;
+ private String cloudStoragePath;
+ private ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier;
+ private ObjectOpenFunction objectOpenFunction;
+ private Predicate retryCondition;
+ private int maxRetry;
+ private Supplier tempDirSupplier;
+
+
+ public Builder start(long start)
+ {
+ this.start = start;
+ return this;
+ }
+
+ public Builder end(long end)
+ {
+ this.end = end;
+ return this;
+ }
+
+ public Builder cloudStoragePath(String cloudStoragePath)
+ {
+ this.cloudStoragePath = cloudStoragePath;
+ return this;
+ }
+
+ public Builder objectSupplier(ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier)
+ {
+ this.objectSupplier = objectSupplier;
+ return this;
+ }
+
+ public Builder objectOpenFunction(ObjectOpenFunction objectOpenFunction)
+ {
+ this.objectOpenFunction = objectOpenFunction;
+ return this;
+ }
+
+ public Builder retryCondition(Predicate retryCondition)
+ {
+ this.retryCondition = retryCondition;
+ return this;
+ }
+
+ public Builder maxRetry(int maxRetry)
+ {
+ this.maxRetry = maxRetry;
+ return this;
+ }
+
+ public Builder tempDirSupplier(Supplier tempDirSupplier)
+ {
+ this.tempDirSupplier = tempDirSupplier;
+ return this;
+ }
+
+ public ChunkingStorageConnectorParameters build()
+ {
+ Preconditions.checkArgument(start >= 0, "'start' not provided or an incorrect value [%s] passed", start);
+ Preconditions.checkArgument(end >= 0, "'end' not provided or an incorrect value [%s] passed", end);
+ Preconditions.checkArgument(maxRetry >= 0, "'maxRetry' not provided or an incorrect value [%s] passed", maxRetry);
+ return new ChunkingStorageConnectorParameters(
+ start,
+ end,
+ Preconditions.checkNotNull(cloudStoragePath, "'cloudStoragePath' not supplied"),
+ Preconditions.checkNotNull(objectSupplier, "'objectSupplier' not supplied"),
+ Preconditions.checkNotNull(objectOpenFunction, "'objectOpenFunction' not supplied"),
+ Preconditions.checkNotNull(retryCondition, "'retryCondition' not supplied"),
+ maxRetry,
+ Preconditions.checkNotNull(tempDirSupplier, "'tempDirSupplier' not supplied")
+ );
+ }
+ }
+}