diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 592aed9a2a98..e676d77f57b3 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -349,20 +349,35 @@ SQL-based ingestion supports using durable storage to store intermediate files t
### Durable storage configurations
-The following common service properties control how durable storage behaves:
+Durable storage is supported on Amazon S3 storage and Microsoft's Azure storage. There are a few common configurations that controls the behavior for both the services as documented below. Apart from the common configurations,
+there are a few properties specific to each storage that must be set.
+
+Common properties to configure the behavior of durable storage
|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
-|`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable durable storage for the cluster. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
-|`druid.msq.intermediate.storage.type` | `s3` for Amazon S3 | Required. The type of storage to use. `s3` is the only supported storage type. |
-|`druid.msq.intermediate.storage.bucket` | n/a | The S3 bucket to store intermediate files. |
-|`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store intermediate stage results. Provide a unique value for the prefix. Don't share the same prefix between clusters. If the location includes other files or directories, then they will get cleaned up as well. |
-|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to temporarily store intermediate stage results. |
+|`druid.msq.intermediate.storage.enable` | false | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
+|`druid.msq.intermediate.storage.type` | n/a | Required. The type of storage to use. Set it to `s3` for S3 and `azure` for Azure |
+|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to store temporary files required while uploading and downloading the data |
|`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. |
|`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.|
+Following properties need to be set in addition to the common properties to enable durable storage on S3
+
+|Parameter |Default | Description |
+|-------------------|----------------------------------------|----------------------|
+|`druid.msq.intermediate.storage.bucket` | n/a | Required. The S3 bucket where the files are uploaded to and download from |
+|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |
+
+Following properties must be set in addition to the common properties to enable durable storage on Azure.
+
+|Parameter |Default | Description |
+|-------------------|----------------------------------------|----------------------|
+|`druid.msq.intermediate.storage.container` | n/a | Required. The Azure container where the files are uploaded to and downloaded from. |
+|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the container to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |
-In addition to the common service properties, there are certain properties that you configure on the Overlord specifically to clean up intermediate files:
+Durable storage creates files on the remote storage and is cleaned up once the job no longer requires those files. However, due to failures causing abrupt exit of the tasks, these files might not get cleaned up.
+Therefore, there are certain properties that you configure on the Overlord specifically to clean up intermediate files for the tasks that have completed and would no longer require these files:
|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml
index 8d062990a692..ca9aa970c88c 100644
--- a/extensions-core/azure-extensions/pom.xml
+++ b/extensions-core/azure-extensions/pom.xml
@@ -92,7 +92,7 @@
com.google.inject.extensions
guice-assistedinject
- ${guice.version}
+ provided
com.fasterxml.jackson.core
@@ -152,6 +152,17 @@
equalsverifier
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ org.mockito
+ mockito-inline
+ test
+
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index f68ddfa9011a..6d0e60fe873b 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -139,7 +139,7 @@ public Iterator getDescriptorIteratorForPrefixes(List pre
public long getObjectSize(CloudObjectLocation location)
{
try {
- final CloudBlob blobWithAttributes = storage.getBlobReferenceWithAttributes(
+ final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
index a8461de0e884..91af1140cb5f 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java
@@ -60,7 +60,7 @@ public InputStream openStream() throws IOException
public InputStream openStream(long offset) throws IOException
{
try {
- return azureStorage.getBlobInputStream(offset, containerName, blobPath);
+ return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 1e46239fd21a..9f97256b1da8 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -183,7 +183,7 @@ DataSegment uploadDataSegment(
)
throws StorageException, IOException, URISyntaxException
{
- azureStorage.uploadBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
+ azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
final DataSegment outSegment = segment
.withSize(size)
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
index 4b1539d35019..fc1a128e11e4 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java
@@ -23,19 +23,26 @@
import com.google.common.base.Supplier;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
+import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -48,6 +55,9 @@ public class AzureStorage
{
private static final boolean USE_FLAT_BLOB_LISTING = true;
+ // Default value from Azure library
+ private static final int DELTA_BACKOFF_MS = 30_000;
+
private static final Logger log = new Logger(AzureStorage.class);
/**
@@ -70,14 +80,28 @@ public AzureStorage(
public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
throws StorageException, URISyntaxException
+ {
+ return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
+ }
+
+ public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts)
+ throws StorageException, URISyntaxException
{
List deletedFiles = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
- for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
+ Iterable blobItems = container.listBlobs(
+ virtualDirPath,
+ USE_FLAT_BLOB_LISTING,
+ null,
+ getRequestOptionsWithRetry(maxAttempts),
+ null
+ );
+
+ for (ListBlobItem blobItem : blobItems) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
- log.info("Removing file[%s] from Azure.", cloudBlob.getName());
- if (cloudBlob.deleteIfExists()) {
+ log.debug("Removing file[%s] from Azure.", cloudBlob.getName());
+ if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, getRequestOptionsWithRetry(maxAttempts), null)) {
deletedFiles.add(cloudBlob.getName());
}
}
@@ -89,7 +113,7 @@ public List emptyCloudBlobDirectory(final String containerName, final St
return deletedFiles;
}
- public void uploadBlob(final File file, final String containerName, final String blobPath)
+ public void uploadBlockBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
@@ -98,7 +122,29 @@ public void uploadBlob(final File file, final String containerName, final String
}
}
- public CloudBlob getBlobReferenceWithAttributes(final String containerName, final String blobPath)
+ public OutputStream getBlockBlobOutputStream(
+ final String containerName,
+ final String blobPath,
+ @Nullable final Integer streamWriteSizeBytes,
+ Integer maxAttempts
+ ) throws URISyntaxException, StorageException
+ {
+ CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
+ CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath);
+
+ if (blockBlobReference.exists()) {
+ throw new RE("Reference already exists");
+ }
+
+ if (streamWriteSizeBytes != null) {
+ blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
+ }
+
+ return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null);
+
+ }
+
+ public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
@@ -106,28 +152,97 @@ public CloudBlob getBlobReferenceWithAttributes(final String containerName, fina
return blobReference;
}
- public long getBlobLength(final String containerName, final String blobPath)
+ public long getBlockBlobLength(final String containerName, final String blobPath)
+ throws URISyntaxException, StorageException
+ {
+ return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
+ }
+
+ public InputStream getBlockBlobInputStream(final String containerName, final String blobPath)
+ throws URISyntaxException, StorageException
+ {
+ return getBlockBlobInputStream(0L, containerName, blobPath);
+ }
+
+ public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath)
+ throws URISyntaxException, StorageException
+ {
+ return getBlockBlobInputStream(offset, null, containerName, blobPath);
+ }
+
+ public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
- return getBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
+ return getBlockBlobInputStream(offset, length, containerName, blobPath, null);
+ }
+
+ public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts)
+ throws URISyntaxException, StorageException
+ {
+ CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
+ return container.getBlockBlobReference(blobPath)
+ .openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null);
}
- public InputStream getBlobInputStream(final String containerName, final String blobPath)
+ public void batchDeleteFiles(String containerName, Iterable paths, Integer maxAttempts)
throws URISyntaxException, StorageException
{
- return getBlobInputStream(0L, containerName, blobPath);
+ CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName);
+ BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation();
+ for (String path : paths) {
+ CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
+ blobDeleteBatchOperation.addSubOperation(blobReference);
+ }
+ cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null);
}
- public InputStream getBlobInputStream(long offset, final String containerName, final String blobPath)
+ public List listDir(final String containerName, final String virtualDirPath)
throws URISyntaxException, StorageException
{
+ return listDir(containerName, virtualDirPath, null);
+ }
+
+ public List listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
+ throws StorageException, URISyntaxException
+ {
+ List files = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
- return container.getBlockBlobReference(blobPath).openInputStream(offset, null, null, null, null);
+
+ for (ListBlobItem blobItem :
+ container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) {
+ CloudBlob cloudBlob = (CloudBlob) blobItem;
+ files.add(cloudBlob.getName());
+ }
+
+ return files;
}
- public boolean getBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
+ public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
+ {
+ return getBlockBlobExists(container, blobPath, null);
+ }
+
+
+ public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts)
+ throws URISyntaxException, StorageException
{
- return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
+ return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
+ .exists(null, getRequestOptionsWithRetry(maxAttempts), null);
+ }
+
+ /**
+ * If maxAttempts is provided, this method returns request options with retry built in.
+ * Retry backoff is exponential backoff, with maxAttempts set to the one provided
+ */
+ @Nullable
+ private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
+ {
+ if (maxAttempts == null) {
+ return null;
+ }
+ BlobRequestOptions requestOptions = new BlobRequestOptions();
+ requestOptions.setRetryPolicyFactory(new RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
+ return requestOptions;
}
@VisibleForTesting
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index f2f973f11abe..674e451de51a 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -51,7 +51,7 @@
public class AzureStorageDruidModule implements DruidModule
{
- static final String SCHEME = "azure";
+ public static final String SCHEME = "azure";
public static final String
STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
public static final String
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 9bfda5ab349f..5e6880c14ed2 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -95,7 +95,7 @@ private void pushTaskFile(final File logFile, String taskKey)
try {
AzureUtils.retryAzureOperation(
() -> {
- azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
+ azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()
@@ -129,12 +129,12 @@ private Optional streamTaskFile(final String taskid, final long off
{
final String container = config.getContainer();
try {
- if (!azureStorage.getBlobExists(container, taskKey)) {
+ if (!azureStorage.getBlockBlobExists(container, taskKey)) {
return Optional.absent();
}
try {
final long start;
- final long length = azureStorage.getBlobLength(container, taskKey);
+ final long length = azureStorage.getBlockBlobLength(container, taskKey);
if (offset > 0 && offset < length) {
start = offset;
@@ -144,7 +144,7 @@ private Optional streamTaskFile(final String taskid, final long off
start = 0;
}
- InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
+ InputStream stream = azureStorage.getBlockBlobInputStream(container, taskKey);
stream.skip(start);
return Optional.of(stream);
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java
new file mode 100644
index 000000000000..4803dc8b297b
--- /dev/null
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureInputRange.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import java.util.Objects;
+
+/**
+ * Represents a chunk of the Azure blob
+ */
+public class AzureInputRange
+{
+
+ /**
+ * Starting location in the blob stream
+ */
+ private final long start;
+
+ /**
+ * Size of the blob stream that this object represents
+ */
+ private final long size;
+
+ /**
+ * Container where the blob resides
+ */
+ private final String container;
+
+ /**
+ * Absolute path of the blob
+ */
+ private final String path;
+
+ public AzureInputRange(long start, long size, String container, String path)
+ {
+ this.start = start;
+ this.size = size;
+ this.container = container;
+ this.path = path;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public long getSize()
+ {
+ return size;
+ }
+
+ public String getContainer()
+ {
+ return container;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AzureInputRange that = (AzureInputRange) o;
+ return start == that.start
+ && size == that.size
+ && Objects.equals(container, that.container)
+ && Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(start, size, container, path);
+ }
+}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java
new file mode 100644
index 000000000000..7af9c856c5f5
--- /dev/null
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.RetryUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Configuration of the Azure storage connector
+ */
+public class AzureOutputConfig
+{
+ @JsonProperty
+ private final String container;
+
+ @JsonProperty
+ private final String prefix;
+
+ @JsonProperty
+ private final File tempDir;
+
+ @JsonProperty
+ private final HumanReadableBytes chunkSize;
+
+ private static final HumanReadableBytes DEFAULT_CHUNK_SIZE = new HumanReadableBytes("4MiB");
+
+ // Minimum limit is self-imposed, so that chunks are appropriately sized, and we don't spend a lot of time downloading
+ // the part of the blobs
+ private static final long AZURE_MIN_CHUNK_SIZE_BYTES = new HumanReadableBytes("256KiB").getBytes();
+
+ // Maximum limit is imposed by Azure, on the size of one block blob
+ private static final long AZURE_MAX_CHUNK_SIZE_BYTES = new HumanReadableBytes("4000MiB").getBytes();
+
+
+ @JsonProperty
+ private final int maxRetry;
+
+ public AzureOutputConfig(
+ @JsonProperty(value = "container", required = true) String container,
+ @JsonProperty(value = "prefix", required = true) String prefix,
+ @JsonProperty(value = "tempDir", required = true) File tempDir,
+ @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
+ @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
+ )
+ {
+ this.container = container;
+ this.prefix = prefix;
+ this.tempDir = tempDir;
+ this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
+ this.maxRetry = maxRetry != null ? maxRetry : RetryUtils.DEFAULT_MAX_TRIES;
+ validateFields();
+ }
+
+
+ public String getContainer()
+ {
+ return container;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+
+ public File getTempDir()
+ {
+ return tempDir;
+ }
+
+ public HumanReadableBytes getChunkSize()
+ {
+ return chunkSize;
+ }
+
+ public int getMaxRetry()
+ {
+ return maxRetry;
+ }
+
+ private void validateFields()
+ {
+ if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) {
+ throw InvalidInput.exception(
+ "'chunkSize' [%d] bytes to the AzureConfig should be between [%d] bytes and [%d] bytes",
+ chunkSize.getBytes(),
+ AZURE_MIN_CHUNK_SIZE_BYTES,
+ AZURE_MAX_CHUNK_SIZE_BYTES
+ );
+ }
+
+ try {
+ FileUtils.mkdirp(tempDir);
+ }
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Unable to create temporary directory [%s]", tempDir.getAbsolutePath());
+ }
+
+ if (!tempDir.canRead() || !tempDir.canWrite()) {
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(
+ "Cannot read or write on the 'tempDir' [%s]. "
+ + "Please provide a different path to store the intermediate contents of AzureStorageConnector",
+ tempDir.getAbsolutePath()
+ );
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AzureOutputConfig that = (AzureOutputConfig) o;
+ return maxRetry == that.maxRetry
+ && Objects.equals(container, that.container)
+ && Objects.equals(prefix, that.prefix)
+ && Objects.equals(tempDir, that.tempDir)
+ && Objects.equals(chunkSize, that.chunkSize);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(container, prefix, tempDir, chunkSize, maxRetry);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AzureOutputConfig{" +
+ "container='" + container + '\'' +
+ ", prefix='" + prefix + '\'' +
+ ", tempDir=" + tempDir +
+ ", chunkSize=" + chunkSize +
+ ", maxRetry=" + maxRetry +
+ '}';
+ }
+}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
new file mode 100644
index 000000000000..657043797e03
--- /dev/null
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.microsoft.azure.storage.StorageException;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureUtils;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of the storage connector that facilitates reading and writing from Azure's blob storage.
+ * This extends the {@link ChunkingStorageConnector} so that the downloads are in a chunked manner.
+ */
+public class AzureStorageConnector extends ChunkingStorageConnector
+{
+
+ private static final String DELIM = "/";
+ private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+
+ private final AzureOutputConfig config;
+ private final AzureStorage azureStorage;
+
+ public AzureStorageConnector(
+ final AzureOutputConfig config,
+ final AzureStorage azureStorage
+ )
+ {
+ this.config = config;
+ this.azureStorage = azureStorage;
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException
+ {
+ try {
+ return buildInputParams(path, 0, azureStorage.getBlockBlobLength(config.getContainer(), objectPath(path)));
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size)
+ {
+ ChunkingStorageConnectorParameters.Builder parameters = new ChunkingStorageConnectorParameters.Builder<>();
+ parameters.tempDirSupplier(config::getTempDir);
+ parameters.maxRetry(config.getMaxRetry());
+ parameters.cloudStoragePath(objectPath(path));
+ parameters.retryCondition(AzureUtils.AZURE_RETRY);
+ parameters.start(from);
+ parameters.end(from + size);
+ parameters.objectSupplier((start, end) -> new AzureInputRange(
+ start,
+ end - start,
+ config.getContainer(),
+ objectPath(path)
+ ));
+ parameters.objectOpenFunction(
+ new ObjectOpenFunction()
+ {
+ @Override
+ public InputStream open(AzureInputRange inputRange) throws IOException
+ {
+ try {
+ return azureStorage.getBlockBlobInputStream(
+ inputRange.getStart(),
+ inputRange.getSize(),
+ inputRange.getContainer(),
+ inputRange.getPath(),
+ config.getMaxRetry()
+ );
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public InputStream open(AzureInputRange inputRange, long offset) throws IOException
+ {
+ AzureInputRange newInputRange = new AzureInputRange(
+ inputRange.getStart() + offset,
+ Math.max(inputRange.getSize() - offset, 0),
+ inputRange.getContainer(),
+ inputRange.getPath()
+ );
+ return open(newInputRange);
+ }
+ }
+ );
+
+ return parameters.build();
+ }
+
+ @Override
+ public boolean pathExists(String path) throws IOException
+ {
+ try {
+ return azureStorage.getBlockBlobExists(config.getContainer(), objectPath(path), config.getMaxRetry());
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public OutputStream write(String path) throws IOException
+ {
+ try {
+ return azureStorage.getBlockBlobOutputStream(
+ config.getContainer(),
+ objectPath(path),
+ config.getChunkSize().getBytesInInt(),
+ config.getMaxRetry()
+ );
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFile(String path) throws IOException
+ {
+ try {
+ azureStorage.batchDeleteFiles(
+ config.getContainer(),
+ Collections.singletonList(objectPath(path)),
+ config.getMaxRetry()
+ );
+ }
+ catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFiles(Iterable paths) throws IOException
+ {
+ try {
+ azureStorage.batchDeleteFiles(
+ config.getContainer(),
+ Iterables.transform(paths, this::objectPath),
+ config.getMaxRetry()
+ );
+ }
+ catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void deleteRecursively(String path) throws IOException
+ {
+ try {
+ azureStorage.emptyCloudBlobDirectory(config.getContainer(), objectPath(path), config.getMaxRetry());
+ }
+ catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Iterator listDir(String dirName) throws IOException
+ {
+ final String prefixBasePath = objectPath(dirName);
+ List paths;
+ try {
+ paths = azureStorage.listDir(config.getContainer(), prefixBasePath, config.getMaxRetry());
+ }
+ catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return paths.stream().map(path -> {
+ String[] size = path.split(prefixBasePath, 2);
+ if (size.length > 1) {
+ return size[1];
+ } else {
+ return "";
+ }
+ }).iterator();
+ }
+
+ private String objectPath(String path)
+ {
+ return JOINER.join(config.getPrefix(), path);
+ }
+}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java
new file mode 100644
index 000000000000..b2cdda0eb29a
--- /dev/null
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class AzureStorageConnectorModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(
+ new SimpleModule(AzureStorageConnectorModule.class.getSimpleName())
+ .registerSubtypes(AzureStorageConnectorProvider.class)
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+}
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
new file mode 100644
index 000000000000..4264801f4acf
--- /dev/null
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureStorageDruidModule;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName(AzureStorageDruidModule.SCHEME)
+public class AzureStorageConnectorProvider extends AzureOutputConfig implements StorageConnectorProvider
+{
+
+ @JacksonInject
+ AzureStorage azureStorage;
+
+ @JsonCreator
+ public AzureStorageConnectorProvider(
+ @JsonProperty(value = "container", required = true) String container,
+ @JsonProperty(value = "prefix", required = true) String prefix,
+ @JsonProperty(value = "tempDir", required = true) File tempDir,
+ @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
+ @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
+ )
+ {
+ super(container, prefix, tempDir, chunkSize, maxRetry);
+ }
+
+ @Override
+ public StorageConnector get()
+ {
+ return new AzureStorageConnector(this, azureStorage);
+ }
+}
diff --git a/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 298f1d39d173..a801f540d396 100644
--- a/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ b/extensions-core/azure-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.druid.storage.azure.output.AzureStorageConnectorModule
org.apache.druid.storage.azure.AzureStorageDruidModule
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
index 649001cc3577..f54ef2e40361 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureByteSourceTest.java
@@ -41,7 +41,7 @@ public void test_openStream_withoutOffset_succeeds() throws IOException, URISynt
AzureStorage azureStorage = createMock(AzureStorage.class);
InputStream stream = createMock(InputStream.class);
- EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, blobPath)).andReturn(stream);
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, containerName, blobPath)).andReturn(stream);
replayAll();
@@ -60,7 +60,7 @@ public void test_openStream_withOffset_succeeds() throws IOException, URISyntaxE
AzureStorage azureStorage = createMock(AzureStorage.class);
InputStream stream = createMock(InputStream.class);
- EasyMock.expect(azureStorage.getBlobInputStream(OFFSET, containerName, blobPath)).andReturn(stream);
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(OFFSET, containerName, blobPath)).andReturn(stream);
replayAll();
@@ -78,7 +78,7 @@ public void openStreamWithRecoverableErrorTest() throws URISyntaxException, Stor
final String blobPath = "/path/to/file";
AzureStorage azureStorage = createMock(AzureStorage.class);
- EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, blobPath)).andThrow(
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, containerName, blobPath)).andThrow(
new StorageException(
"",
"",
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
index fc984349628a..13820072cb7b 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java
@@ -62,7 +62,7 @@ public void test_getSegmentFiles_success()
final InputStream zipStream = new FileInputStream(pulledFile);
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
- EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
replayAll();
@@ -94,7 +94,7 @@ public void test_getSegmentFiles_blobPathIsHadoop_success()
final InputStream zipStream = new FileInputStream(pulledFile);
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
- EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
replayAll();
@@ -123,7 +123,7 @@ public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFile
final File outDir = FileUtils.createTempDir();
try {
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
- EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
new URISyntaxException(
"error",
"error",
@@ -155,7 +155,7 @@ public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_d
final File outDir = FileUtils.createTempDir();
try {
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
- EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
new StorageException(null, null, 0, null, null)
).atLeastOnce();
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
index f3e65c7923f5..b18fabbc3dae 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
@@ -115,7 +115,7 @@ public void test_push_nonUniquePathNoPrefix_succeeds() throws Exception
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
- azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+ azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
EasyMock.expectLastCall();
replayAll();
@@ -145,7 +145,7 @@ public void test_push_nonUniquePathWithPrefix_succeeds() throws Exception
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
- azureStorage.uploadBlob(
+ azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.eq(PREFIX + "/" + azurePath)
@@ -178,7 +178,7 @@ public void test_push_uniquePathNoPrefix_succeeds() throws Exception
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
- azureStorage.uploadBlob(
+ azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX)
@@ -211,7 +211,7 @@ public void test_push_uniquePath_succeeds() throws Exception
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
- azureStorage.uploadBlob(
+ azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_PREFIX)
@@ -245,7 +245,7 @@ public void test_push_exception_throwsException() throws Exception
final long size = DATA.length;
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
- azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+ azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
EasyMock.expectLastCall().andThrow(new URISyntaxException("", ""));
replayAll();
@@ -284,7 +284,7 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn
final File compressedSegmentData = new File("index.zip");
final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false);
- azureStorage.uploadBlob(compressedSegmentData, CONTAINER_NAME, azurePath);
+ azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, azurePath);
EasyMock.expectLastCall();
replayAll();
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
new file mode 100644
index 000000000000..9ae08546401a
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure;
+
+import com.google.common.collect.ImmutableList;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+public class AzureStorageTest
+{
+
+ AzureStorage azureStorage;
+ CloudBlobClient cloudBlobClient = Mockito.mock(CloudBlobClient.class);
+ CloudBlobContainer cloudBlobContainer = Mockito.mock(CloudBlobContainer.class);
+
+ @Before
+ public void setup() throws URISyntaxException, StorageException
+ {
+ Mockito.doReturn(cloudBlobContainer).when(cloudBlobClient).getContainerReference(ArgumentMatchers.anyString());
+ azureStorage = new AzureStorage(() -> cloudBlobClient);
+ }
+
+ @Test
+ public void testListDir() throws URISyntaxException, StorageException
+ {
+ List listBlobItems = ImmutableList.of(
+ new CloudBlockBlob(new URI("azure://dummy.com/container/blobName"))
+ );
+
+ Mockito.doReturn(listBlobItems).when(cloudBlobContainer).listBlobs(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.anyBoolean(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+
+ );
+ Assert.assertEquals(ImmutableList.of("blobName"), azureStorage.listDir("test", ""));
+
+ }
+}
+
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 297545e4cad0..2575793176e3 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -97,7 +97,7 @@ public void test_PushTaskLog_uploadsBlob() throws Exception
try {
final File logFile = new File(tmpDir, "log");
- azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
+ azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
EasyMock.expectLastCall();
replayAll();
@@ -119,7 +119,7 @@ public void test_PushTaskLog_exception_rethrowsException() throws Exception
try {
final File logFile = new File(tmpDir, "log");
- azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
+ azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
EasyMock.expectLastCall().andThrow(new IOException());
replayAll();
@@ -141,7 +141,7 @@ public void test_PushTaskReports_uploadsBlob() throws Exception
try {
final File logFile = new File(tmpDir, "log");
- azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
+ azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
EasyMock.expectLastCall();
replayAll();
@@ -163,7 +163,7 @@ public void test_PushTaskStatus_uploadsBlob() throws Exception
try {
final File logFile = new File(tmpDir, "status.json");
- azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json");
+ azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json");
EasyMock.expectLastCall();
replayAll();
@@ -185,7 +185,7 @@ public void test_PushTaskReports_exception_rethrowsException() throws Exception
try {
final File logFile = new File(tmpDir, "log");
- azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
+ azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
EasyMock.expectLastCall().andThrow(new IOException());
replayAll();
@@ -205,9 +205,9 @@ public void testStreamTaskLogWithoutOffset() throws Exception
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/log";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
@@ -228,9 +228,9 @@ public void testStreamTaskLogWithPositiveOffset() throws Exception
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/log";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
@@ -251,9 +251,9 @@ public void testStreamTaskLogWithNegative() throws Exception
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/log";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andReturn(
new ByteArrayInputStream(StringUtils.toUtf8(testLog)));
@@ -274,9 +274,9 @@ public void test_streamTaskReports_blobExists_succeeds() throws Exception
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(StandardCharsets.UTF_8)));
@@ -297,7 +297,7 @@ public void test_streamTaskReports_blobDoesNotExist_returnsAbsent() throws Excep
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/report.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false);
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(false);
replayAll();
@@ -315,9 +315,9 @@ public void test_streamTaskReports_exceptionWhenGettingStream_throwsException()
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow(
new URISyntaxException("", ""));
@@ -336,7 +336,7 @@ public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsExce
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
replayAll();
@@ -351,9 +351,9 @@ public void test_streamTaskStatus_blobExists_succeeds() throws Exception
final String taskStatus = "{}";
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andReturn(
new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8)));
@@ -372,7 +372,7 @@ public void test_streamTaskStatus_blobExists_succeeds() throws Exception
public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws Exception
{
final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false);
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(false);
replayAll();
@@ -390,9 +390,9 @@ public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() t
final String taskStatus = "{}";
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
- EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
- EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow(
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow(
new URISyntaxException("", ""));
@@ -409,7 +409,7 @@ public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() t
public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
{
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
- EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
replayAll();
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java
new file mode 100644
index 000000000000..4753132d1c48
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureInputRangeTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class AzureInputRangeTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(AzureInputRange.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java
new file mode 100644
index 000000000000..ab3104adf4ea
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.ISE;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AzureOutputConfigTest
+{
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String CONTAINER = "container";
+ private static final String PREFIX = "prefix";
+ private static final int MAX_RETRY_COUNT = 0;
+
+ @Test
+ public void testTooLargeChunkSize()
+ {
+ HumanReadableBytes chunkSize = new HumanReadableBytes("4001MiB");
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new AzureOutputConfig(CONTAINER, PREFIX, temporaryFolder.newFolder(), chunkSize, MAX_RETRY_COUNT)
+ );
+ }
+
+ @Test
+ public void testTempDirectoryNotWritable() throws IOException
+ {
+ File tempDir = temporaryFolder.newFolder();
+ if (!tempDir.setWritable(false)) {
+ throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName());
+ }
+ //noinspection ResultOfObjectAllocationIgnored
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT)
+ );
+ }
+
+ @Test
+ public void testTempDirectoryNotPresentButWritable() throws IOException
+ {
+ File tempDir = new File(temporaryFolder.newFolder() + "/notPresent1/notPresent2/notPresent3");
+ //noinspection ResultOfObjectAllocationIgnored
+ new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT);
+ }
+
+ @Test
+ public void testTempDirectoryPresent() throws IOException
+ {
+ File tempDir = new File(temporaryFolder.newFolder() + "/notPresent1/notPresent2/notPresent3");
+ FileUtils.mkdirp(tempDir);
+ //noinspection ResultOfObjectAllocationIgnored
+ new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT);
+ }
+}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java
new file mode 100644
index 000000000000..ecf99666ce70
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AzureOutputSerdeTest
+{
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ public void sanity() throws IOException
+ {
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"container\": \"TEST\",\n"
+ + " \"prefix\": \"abc\",\n"
+ + " \"tempDir\": \"/tmp\",\n"
+ + " \"chunkSize\":104857600,\n"
+ + " \"maxRetry\": 2\n"
+ + "}\n");
+
+ AzureOutputConfig azureOutputConfig = new AzureOutputConfig(
+ "TEST",
+ "abc",
+ new File("/tmp"),
+ HumanReadableBytes.valueOf(HumanReadableBytes.parse("100Mib")),
+ 2
+ );
+
+ Assert.assertEquals(
+ json,
+ MAPPER.writeValueAsString(azureOutputConfig)
+ );
+
+ Assert.assertEquals(azureOutputConfig, MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+ @Test
+ public void noPrefix()
+ {
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"container\": \"TEST\",\n"
+ + " \"tempDir\": \"/tmp\",\n"
+ + " \"chunkSize\":104857600,\n"
+ + " \"maxRetry\": 2\n"
+ + "}\n");
+ Assert.assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+ @Test
+ public void noContainer()
+ {
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"prefix\": \"abc\",\n"
+ + " \"tempDir\": \"/tmp\",\n"
+ + " \"chunkSize\":104857600,\n"
+ + " \"maxRetry\": 2\n"
+ + "}\n");
+ Assert.assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+ @Test
+ public void noTempDir()
+ {
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"prefix\": \"abc\",\n"
+ + " \"container\": \"TEST\",\n"
+ + " \"chunkSize\":104857600,\n"
+ + " \"maxRetry\": 2\n"
+ + "}\n");
+ Assert.assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+ @Test
+ public void leastArguments() throws JsonProcessingException
+ {
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"tempDir\": \"/tmp\",\n"
+ + " \"prefix\": \"abc\",\n"
+ + " \"container\": \"TEST\"\n"
+ + "}\n");
+
+ AzureOutputConfig azureOutputConfig = new AzureOutputConfig(
+ "TEST",
+ "abc",
+ new File("/tmp"),
+ null,
+ null
+ );
+ Assert.assertEquals(azureOutputConfig, MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+
+ @Test
+ public void testChunkValidation()
+ {
+
+ String json = jsonStringReadyForAssert("{\n"
+ + " \"prefix\": \"abc\",\n"
+ + " \"container\": \"TEST\",\n"
+ + " \"tempDir\": \"/tmp\",\n"
+ + " \"chunkSize\":104,\n"
+ + " \"maxRetry\": 2\n"
+ + "}\n");
+ Assert.assertThrows(ValueInstantiationException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
+ }
+
+ private static String jsonStringReadyForAssert(String input)
+ {
+ return StringUtils.removeChar(StringUtils.removeChar(input, '\n'), ' ');
+ }
+}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java
new file mode 100644
index 000000000000..50a856c71255
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.ProvisionException;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.StorageConnectorModule;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.apache.druid.storage.azure.AzureStorageDruidModule;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Properties;
+
+public class AzureStorageConnectorProviderTest
+{
+ private static final String CUSTOM_NAMESPACE = "custom";
+
+ @Test
+ public void createAzureStorageFactoryWithRequiredProperties()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+ properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);
+
+ Assert.assertTrue(s3StorageConnectorProvider instanceof AzureStorageConnectorProvider);
+ Assert.assertTrue(s3StorageConnectorProvider.get() instanceof AzureStorageConnector);
+ Assert.assertEquals("container", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getContainer());
+ Assert.assertEquals("prefix", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
+ Assert.assertEquals(new File("/tmp"), ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getTempDir());
+
+ }
+
+ @Test
+ public void createAzureStorageFactoryWithMissingPrefix()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
+ properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ Assert.assertThrows(
+ "Missing required creator property 'prefix'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+
+ @Test
+ public void createAzureStorageFactoryWithMissingContainer()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+ properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
+ Assert.assertThrows(
+ "Missing required creator property 'container'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+ @Test
+ public void createAzureStorageFactoryWithMissingTempDir()
+ {
+
+ final Properties properties = new Properties();
+ properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
+ properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
+ properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
+
+ Assert.assertThrows(
+ "Missing required creator property 'tempDir'",
+ ProvisionException.class,
+ () -> getStorageConnectorProvider(properties)
+ );
+ }
+
+ private StorageConnectorProvider getStorageConnectorProvider(Properties properties)
+ {
+ StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add(
+ new AzureStorageDruidModule(),
+ new StorageConnectorModule(),
+ new AzureStorageConnectorModule(),
+ binder -> {
+ JsonConfigProvider.bind(
+ binder,
+ CUSTOM_NAMESPACE,
+ StorageConnectorProvider.class,
+ Names.named(CUSTOM_NAMESPACE)
+ );
+
+ binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
+ .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
+ .in(LazySingleton.class);
+ }
+ ).withProperties(properties);
+
+ Injector injector = startupInjectorBuilder.build();
+ injector.getInstance(ObjectMapper.class).registerModules(new AzureStorageConnectorModule().getJacksonModules());
+ injector.getInstance(ObjectMapper.class).setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(
+ AzureStorage.class,
+ EasyMock.mock(AzureStorage.class)
+ ));
+
+
+ return injector.getInstance(Key.get(
+ StorageConnectorProvider.class,
+ Names.named(CUSTOM_NAMESPACE)
+ ));
+ }
+}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
new file mode 100644
index 000000000000..f8592c32eaf8
--- /dev/null
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.azure.output;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.microsoft.azure.storage.StorageException;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.azure.AzureStorage;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+public class AzureStorageConnectorTest
+{
+
+ private static final String CONTAINER = "CONTAINER";
+ private static final String PREFIX = "P/R/E/F/I/X";
+ public static final String TEST_FILE = "test.csv";
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private StorageConnector storageConnector;
+ private final AzureStorage azureStorage = EasyMock.createMock(AzureStorage.class);
+
+ @Before
+ public void setup() throws IOException
+ {
+ storageConnector = new AzureStorageConnector(
+ new AzureOutputConfig(CONTAINER, PREFIX, temporaryFolder.newFolder(), null, null),
+ azureStorage
+ );
+ }
+
+
+ @Test
+ public void testPathExistsSuccess() throws URISyntaxException, StorageException, IOException
+ {
+ final Capture bucket = Capture.newInstance();
+ final Capture path = Capture.newInstance();
+ EasyMock.reset(azureStorage);
+ EasyMock.expect(azureStorage.getBlockBlobExists(EasyMock.capture(bucket), EasyMock.capture(path), EasyMock.anyInt()))
+ .andReturn(true);
+ EasyMock.replay(azureStorage);
+ Assert.assertTrue(storageConnector.pathExists(TEST_FILE));
+ Assert.assertEquals(CONTAINER, bucket.getValue());
+ Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+ EasyMock.verify(azureStorage);
+ }
+
+ @Test
+ public void testPathExistsNotFound() throws URISyntaxException, StorageException, IOException
+ {
+ final Capture bucket = Capture.newInstance();
+ final Capture path = Capture.newInstance();
+ EasyMock.reset(azureStorage);
+ EasyMock.expect(azureStorage.getBlockBlobExists(EasyMock.capture(bucket), EasyMock.capture(path), EasyMock.anyInt()))
+ .andReturn(false);
+ EasyMock.replay(azureStorage);
+ Assert.assertFalse(storageConnector.pathExists(TEST_FILE));
+ Assert.assertEquals(CONTAINER, bucket.getValue());
+ Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue());
+ EasyMock.verify(azureStorage);
+ }
+
+ @Test
+ public void testRead() throws URISyntaxException, StorageException, IOException
+ {
+ EasyMock.reset(azureStorage);
+
+ String data = "test";
+ EasyMock.expect(azureStorage.getBlockBlobLength(EasyMock.anyString(), EasyMock.anyString()))
+ .andReturn(4L);
+ EasyMock.expect(
+ azureStorage.getBlockBlobInputStream(
+ EasyMock.anyLong(),
+ EasyMock.anyLong(),
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.anyInt()
+ )
+ ).andReturn(IOUtils.toInputStream(data, StandardCharsets.UTF_8));
+
+ EasyMock.replay(azureStorage);
+ InputStream is = storageConnector.read(TEST_FILE);
+ byte[] dataBytes = new byte[data.length()];
+ Assert.assertEquals(data.length(), is.read(dataBytes));
+ Assert.assertEquals(-1, is.read());
+ Assert.assertEquals(data, new String(dataBytes, StandardCharsets.UTF_8));
+
+ EasyMock.reset(azureStorage);
+ }
+
+ @Test
+ public void testReadRange() throws URISyntaxException, StorageException, IOException
+ {
+ String data = "test";
+
+ for (int start = 0; start < data.length(); ++start) {
+ for (long length = 1; length <= data.length() - start; ++length) {
+ String dataQueried = data.substring(start, start + ((Long) length).intValue());
+ EasyMock.reset(azureStorage);
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(
+ EasyMock.anyLong(),
+ EasyMock.anyLong(),
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.anyInt()
+ ))
+ .andReturn(IOUtils.toInputStream(dataQueried, StandardCharsets.UTF_8));
+ EasyMock.replay(azureStorage);
+
+ InputStream is = storageConnector.readRange(TEST_FILE, start, length);
+ byte[] dataBytes = new byte[((Long) length).intValue()];
+ Assert.assertEquals(length, is.read(dataBytes));
+ Assert.assertEquals(-1, is.read());
+ Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
+ EasyMock.reset(azureStorage);
+ }
+ }
+ }
+
+ @Test
+ public void testDeleteSinglePath() throws URISyntaxException, StorageException, IOException
+ {
+ EasyMock.reset(azureStorage);
+ Capture containerCapture = EasyMock.newCapture();
+ Capture> pathsCapture = EasyMock.newCapture();
+ azureStorage.batchDeleteFiles(
+ EasyMock.capture(containerCapture),
+ EasyMock.capture(pathsCapture),
+ EasyMock.anyInt()
+ );
+ EasyMock.replay(azureStorage);
+ storageConnector.deleteFile(TEST_FILE);
+ Assert.assertEquals(CONTAINER, containerCapture.getValue());
+ Assert.assertEquals(Collections.singletonList(PREFIX + "/" + TEST_FILE), pathsCapture.getValue());
+ EasyMock.reset(azureStorage);
+ }
+
+ @Test
+ public void testDeleteMultiplePaths() throws URISyntaxException, StorageException, IOException
+ {
+ EasyMock.reset(azureStorage);
+ Capture containerCapture = EasyMock.newCapture();
+ Capture> pathsCapture = EasyMock.newCapture();
+ azureStorage.batchDeleteFiles(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture), EasyMock.anyInt());
+ EasyMock.replay(azureStorage);
+ storageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.part"));
+ Assert.assertEquals(CONTAINER, containerCapture.getValue());
+ Assert.assertEquals(
+ ImmutableList.of(
+ PREFIX + "/" + TEST_FILE + "_1.part",
+ PREFIX + "/" + TEST_FILE + "_2.part"
+ ),
+ Lists.newArrayList(pathsCapture.getValue())
+ );
+ EasyMock.reset(azureStorage);
+ }
+
+ @Test
+ public void testListDir() throws URISyntaxException, StorageException, IOException
+ {
+ EasyMock.reset(azureStorage);
+ EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt()))
+ .andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX + "/p/q/r/" + TEST_FILE));
+ EasyMock.replay(azureStorage);
+ List ret = Lists.newArrayList(storageConnector.listDir(""));
+ Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + TEST_FILE), ret);
+ EasyMock.reset(azureStorage);
+ }
+}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index 81dfb5747621..a68ed9c1c00c 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -27,40 +27,29 @@
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-import org.apache.commons.io.input.NullInputStream;
import org.apache.druid.data.input.impl.CloudObjectLocation;
-import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnector;
+import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nonnull;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.SequenceInputStream;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* In this implementation, all remote calls to aws s3 are retried {@link S3OutputConfig#getMaxRetry()} times.
*/
-public class S3StorageConnector implements StorageConnector
+public class S3StorageConnector extends ChunkingStorageConnector
{
private static final Logger log = new Logger(S3StorageConnector.class);
@@ -69,7 +58,6 @@ public class S3StorageConnector implements StorageConnector
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
- private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
private static final int MAX_NUMBER_OF_LISTINGS = 1000;
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
@@ -105,169 +93,61 @@ public boolean pathExists(String path) throws IOException
}
@Override
- public InputStream read(String path)
+ public ChunkingStorageConnectorParameters buildInputParams(String path)
{
- return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
- }
-
- @Override
- public InputStream readRange(String path, long from, long size)
- {
- if (from < 0 || size < 0) {
- throw new IAE(
- "Invalid arguments for reading %s. from = %d, readSize = %d",
- objectPath(path),
- from,
- size
+ long size;
+ try {
+ size = S3Utils.retryS3Operation(
+ () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(),
+ config.getMaxRetry()
);
}
- return buildInputStream(
- new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
- path
- );
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return buildInputParams(path, 0, size);
}
- private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size)
{
- // fetch the size of the whole object to make chunks
- long readEnd;
- AtomicLong currReadStart = new AtomicLong(0);
- if (getObjectRequest.getRange() != null) {
- currReadStart.set(getObjectRequest.getRange()[0]);
- readEnd = getObjectRequest.getRange()[1] + 1;
- } else {
- try {
- readEnd = S3Utils.retryS3Operation(
- () -> this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(),
- config.getMaxRetry()
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
-
- // build a sequence input stream from chunks
- return new SequenceInputStream(new Enumeration()
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(from);
+ builder.end(from + size);
+ builder.cloudStoragePath(objectPath(path));
+ builder.tempDirSupplier(config::getTempDir);
+ builder.maxRetry(config.getMaxRetry());
+ builder.retryCondition(S3Utils.S3RETRY);
+ builder.objectSupplier((start, end) -> new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(start, end - 1));
+ builder.objectOpenFunction(new ObjectOpenFunction()
{
- boolean initStream = false;
@Override
- public boolean hasMoreElements()
+ public InputStream open(GetObjectRequest object)
{
- // checking if the stream was already closed. If it was, then don't iterate over the remaining chunks
- // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them
- // lazily, we don't need to close them.
- if (isSequenceStreamClosed.get()) {
- return false;
- }
- // don't stop until the whole object is downloaded
- return currReadStart.get() < readEnd;
- }
-
- @Override
- public InputStream nextElement()
- {
- // since Sequence input stream calls nextElement in the constructor, we start chunking as soon as we call read.
- // to avoid that we pass a nullInputStream for the first iteration.
- if (!initStream) {
- initStream = true;
- return new NullInputStream();
- }
- File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
- // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
- long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
try {
- if (!outFile.createNewFile()) {
- throw new IOE(
- StringUtils.format(
- "Could not create temporary file [%s] for copying [%s]",
- outFile.getAbsolutePath(),
- objectPath(path)
- )
- );
- }
- FileUtils.copyLarge(
- () -> new RetryingInputStream<>(
- new GetObjectRequest(
- config.getBucket(),
- objectPath(path)
- ).withRange(currReadStart.get(), endPoint),
- new ObjectOpenFunction()
- {
- @Override
- public InputStream open(GetObjectRequest object)
- {
- try {
- return S3Utils.retryS3Operation(
- () -> s3Client.getObject(object).getObjectContent(),
- config.getMaxRetry()
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InputStream open(GetObjectRequest object, long offset)
- {
- if (object.getRange() != null) {
- long[] oldRange = object.getRange();
- object.setRange(oldRange[0] + offset, oldRange[1]);
- } else {
- object.setRange(offset);
- }
- return open(object);
- }
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- ),
- outFile,
- new byte[8 * 1024],
- Predicates.alwaysFalse(),
- 1,
- StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
+ return S3Utils.retryS3Operation(
+ () -> s3Client.getObject(object).getObjectContent(),
+ config.getMaxRetry()
);
}
- catch (IOException e) {
- throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile));
- }
- try {
- AtomicBoolean isClosed = new AtomicBoolean(false);
- return new FileInputStream(outFile)
- {
- @Override
- public void close() throws IOException
- {
- // close should be idempotent
- if (isClosed.get()) {
- return;
- }
- isClosed.set(true);
- super.close();
- // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
- currReadStart.set(endPoint + 1);
- if (!outFile.delete()) {
- throw new RE("Cannot delete temp file [%s]", outFile);
- }
- }
- };
- }
- catch (FileNotFoundException e) {
- throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
}
- })
- {
+
@Override
- public void close() throws IOException
+ public InputStream open(GetObjectRequest object, long offset)
{
- isSequenceStreamClosed.set(true);
- super.close();
+ if (object.getRange() != null) {
+ long[] oldRange = object.getRange();
+ object.setRange(oldRange[0] + offset, oldRange[1]);
+ } else {
+ object.setRange(offset);
+ }
+ return open(object);
}
- };
+ });
+ return builder.build();
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
index 9682ac01d88e..a0bf3a91f0cc 100644
--- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
+++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorModule.java
@@ -33,8 +33,10 @@ public class StorageConnectorModule implements DruidModule
@Override
public List extends Module> getJacksonModules()
{
- return ImmutableList.of(new SimpleModule(StorageConnector.class.getSimpleName()).registerSubtypes(
- LocalFileStorageConnectorProvider.class));
+ return ImmutableList.of(
+ new SimpleModule(StorageConnector.class.getSimpleName())
+ .registerSubtypes(LocalFileStorageConnectorProvider.class)
+ );
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
new file mode 100644
index 000000000000..5d181b724881
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An abstract implementation of the storage connectors that download the file from the remote storage in chunks
+ * and presents the downloaded chunks as a single {@link InputStream} for the consumers of the connector.
+ * This implementation benefits over keeping the InputStream to the remote source open since we don't require the
+ * connection to be open for the entire duration.
+ * Checkout {@link ChunkingStorageConnectorParameters} to see the inputs required to support chunking
+ */
+public abstract class ChunkingStorageConnector implements StorageConnector
+{
+ /**
+ * Default size for chunking of the storage connector. Set to 100MBs to keep the chunk size small relative to the
+ * total frame size, while also preventing a large number of calls to the remote storage. While fetching a single
+ * file, 100MBs would be required in the disk space.
+ */
+ private static final long DOWNLOAD_MAX_CHUNK_SIZE_BYTES = 100_000_000;
+
+ /**
+ * Default fetch buffer size while copying from the remote location to the download file. Set to default sizing given
+ * in the {@link org.apache.commons.io.IOUtils}
+ */
+ private static final int FETCH_BUFFER_SIZE_BYTES = 8 * 1024;
+
+ private final long chunkSizeBytes;
+
+ public ChunkingStorageConnector()
+ {
+ this(DOWNLOAD_MAX_CHUNK_SIZE_BYTES);
+ }
+
+ public ChunkingStorageConnector(
+ final long chunkSizeBytes
+ )
+ {
+ this.chunkSizeBytes = chunkSizeBytes;
+ }
+
+ @Override
+ public InputStream read(String path) throws IOException
+ {
+ return buildInputStream(buildInputParams(path));
+ }
+
+ @Override
+ public InputStream readRange(String path, long from, long size)
+ {
+ return buildInputStream(buildInputParams(path, from, size));
+ }
+
+ public abstract ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException;
+
+ public abstract ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size);
+
+ private InputStream buildInputStream(ChunkingStorageConnectorParameters params)
+ {
+ // Position from where the read needs to be resumed
+ final AtomicLong currentReadStartPosition = new AtomicLong(params.getStart());
+
+ // Final position, exclusive
+ long readEnd = params.getEnd();
+
+ AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
+
+ return new SequenceInputStream(
+
+ new Enumeration()
+ {
+ boolean initStream = false;
+
+ @Override
+ public boolean hasMoreElements()
+ {
+ // Checking if the stream was already closed. If it was, then don't iterate over the remaining chunks
+ // SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them
+ // lazily, we don't need to close them.
+ if (isSequenceStreamClosed.get()) {
+ return false;
+ }
+ // Don't stop until the whole object is downloaded
+ return currentReadStartPosition.get() < readEnd;
+ }
+
+ @Override
+ public InputStream nextElement()
+ {
+ if (!initStream) {
+ initStream = true;
+ return new NullInputStream();
+ }
+
+ File outFile = new File(
+ params.getTempDirSupplier().get().getAbsolutePath(),
+ UUID.randomUUID().toString()
+ );
+
+ long currentReadEndPosition = Math.min(
+ currentReadStartPosition.get() + chunkSizeBytes,
+ readEnd
+ );
+
+ try {
+ if (!outFile.createNewFile()) {
+ throw new IOE(
+ StringUtils.format(
+ "Could not create temporary file [%s] for copying [%s]",
+ outFile.getAbsolutePath(),
+ params.getCloudStoragePath()
+ )
+ );
+ }
+
+ FileUtils.copyLarge(
+ () -> new RetryingInputStream<>(
+ params.getObjectSupplier().getObject(currentReadStartPosition.get(), currentReadEndPosition),
+ params.getObjectOpenFunction(),
+ params.getRetryCondition(),
+ params.getMaxRetry()
+ ),
+ outFile,
+ new byte[FETCH_BUFFER_SIZE_BYTES],
+ Predicates.alwaysFalse(),
+ 1,
+ StringUtils.format(
+ "Retrying copying of [%s] to [%s]",
+ params.getCloudStoragePath(),
+ outFile.getAbsolutePath()
+ )
+ );
+ }
+ catch (IOException e) {
+ throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", params.getCloudStoragePath(), outFile));
+ }
+
+ try {
+ AtomicBoolean fileInputStreamClosed = new AtomicBoolean(false);
+ return new FileInputStream(outFile)
+ {
+ @Override
+ public void close() throws IOException
+ {
+ // close should be idempotent
+ if (fileInputStreamClosed.get()) {
+ return;
+ }
+ fileInputStreamClosed.set(true);
+ super.close();
+ currentReadStartPosition.set(currentReadEndPosition);
+ if (!outFile.delete()) {
+ throw new RE("Cannot delete temp file [%s]", outFile);
+ }
+ }
+
+ };
+ }
+ catch (FileNotFoundException e) {
+ throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
+ }
+ }
+ }
+ )
+ {
+ @Override
+ public void close() throws IOException
+ {
+ isSequenceStreamClosed.set(true);
+ super.close();
+ }
+ };
+ }
+
+ public interface GetObjectFromRangeFunction
+ {
+ T getObject(long start, long end);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
new file mode 100644
index 000000000000..03f5ecad1b13
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParameters.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * POJO for storing the parameters required to support chunking of the downloads by {@link ChunkingStorageConnector}.
+ * The implementations of the {@link ChunkingStorageConnector} should essentially provide a way to build this object,
+ * which contains the information required to support chunking.
+ * Therefore, to a call of {@link org.apache.druid.storage.StorageConnector#readRange(String, long, long)}, the
+ * implementations of the chunking storage connectors would fetch the required chunks using the information present in
+ * this POJO.
+ */
+public class ChunkingStorageConnectorParameters
+{
+ /**
+ * Starting point from where to begin reading the cloud object. This is inclusive.
+ */
+ private final long start;
+
+ /**
+ * Ending point till where to end reading the cloud object. This is exclusive.
+ */
+ private final long end;
+
+ /**
+ * Absolute storage path of the cloud object.
+ */
+ private final String cloudStoragePath;
+
+ /**
+ * Given a range (start inclusive, end exclusive), fetch the object which represents the provided range of the remote
+ * object
+ */
+ private final ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier;
+
+ /**
+ * Fetching function, which opens the input stream to the range provided by the given object
+ */
+ private final ObjectOpenFunction objectOpenFunction;
+
+ /**
+ * Condition to initiate a retry if downloading the chunk errors out
+ */
+ private final Predicate retryCondition;
+
+ /**
+ * Max number of retries while reading the storage connector
+ */
+ private final int maxRetry;
+
+ /**
+ * Temporary directory where the chunks are stored
+ */
+ private final Supplier tempDirSupplier;
+
+ public ChunkingStorageConnectorParameters(
+ long start,
+ long end,
+ String cloudStoragePath,
+ ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier,
+ ObjectOpenFunction objectOpenFunction,
+ Predicate retryCondition,
+ int maxRetry,
+ Supplier tempDirSupplier
+ )
+ {
+ this.start = start;
+ this.end = end;
+ this.cloudStoragePath = cloudStoragePath;
+ this.objectSupplier = objectSupplier;
+ this.objectOpenFunction = objectOpenFunction;
+ this.retryCondition = retryCondition;
+ this.maxRetry = maxRetry;
+ this.tempDirSupplier = tempDirSupplier;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public long getEnd()
+ {
+ return end;
+ }
+
+ public String getCloudStoragePath()
+ {
+ return cloudStoragePath;
+ }
+
+ public ChunkingStorageConnector.GetObjectFromRangeFunction getObjectSupplier()
+ {
+ return objectSupplier;
+ }
+
+ public ObjectOpenFunction getObjectOpenFunction()
+ {
+ return objectOpenFunction;
+ }
+
+ public Predicate getRetryCondition()
+ {
+ return retryCondition;
+ }
+
+ public int getMaxRetry()
+ {
+ return maxRetry;
+ }
+
+ public Supplier getTempDirSupplier()
+ {
+ return tempDirSupplier;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ChunkingStorageConnectorParameters> that = (ChunkingStorageConnectorParameters>) o;
+ return start == that.start &&
+ end == that.end &&
+ maxRetry == that.maxRetry &&
+ Objects.equals(cloudStoragePath, that.cloudStoragePath) &&
+ Objects.equals(objectSupplier, that.objectSupplier) &&
+ Objects.equals(objectOpenFunction, that.objectOpenFunction) &&
+ Objects.equals(retryCondition, that.retryCondition) &&
+ Objects.equals(tempDirSupplier, that.tempDirSupplier);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ start,
+ end,
+ cloudStoragePath,
+ objectSupplier,
+ objectOpenFunction,
+ retryCondition,
+ maxRetry,
+ tempDirSupplier
+ );
+ }
+
+ /**
+ * Builder for {@link ChunkingStorageConnectorParameters}. Performs null checks and asserts preconditions before
+ * building the instance
+ */
+ public static class Builder
+ {
+ private long start;
+ private long end;
+ private String cloudStoragePath;
+ private ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier;
+ private ObjectOpenFunction objectOpenFunction;
+ private Predicate retryCondition;
+ private int maxRetry;
+ private Supplier tempDirSupplier;
+
+
+ public Builder start(long start)
+ {
+ this.start = start;
+ return this;
+ }
+
+ public Builder end(long end)
+ {
+ this.end = end;
+ return this;
+ }
+
+ public Builder cloudStoragePath(String cloudStoragePath)
+ {
+ this.cloudStoragePath = cloudStoragePath;
+ return this;
+ }
+
+ public Builder objectSupplier(ChunkingStorageConnector.GetObjectFromRangeFunction objectSupplier)
+ {
+ this.objectSupplier = objectSupplier;
+ return this;
+ }
+
+ public Builder objectOpenFunction(ObjectOpenFunction objectOpenFunction)
+ {
+ this.objectOpenFunction = objectOpenFunction;
+ return this;
+ }
+
+ public Builder retryCondition(Predicate retryCondition)
+ {
+ this.retryCondition = retryCondition;
+ return this;
+ }
+
+ public Builder maxRetry(int maxRetry)
+ {
+ this.maxRetry = maxRetry;
+ return this;
+ }
+
+ public Builder tempDirSupplier(Supplier tempDirSupplier)
+ {
+ this.tempDirSupplier = tempDirSupplier;
+ return this;
+ }
+
+ public ChunkingStorageConnectorParameters build()
+ {
+ Preconditions.checkArgument(start >= 0, "'start' not provided or an incorrect value [%s] passed", start);
+ Preconditions.checkArgument(end >= 0, "'end' not provided or an incorrect value [%s] passed", end);
+ Preconditions.checkArgument(start <= end, "'start' should not be greater than 'end'");
+ Preconditions.checkArgument(maxRetry >= 0, "'maxRetry' not provided or an incorrect value [%s] passed", maxRetry);
+ return new ChunkingStorageConnectorParameters(
+ start,
+ end,
+ Preconditions.checkNotNull(cloudStoragePath, "'cloudStoragePath' not supplied"),
+ Preconditions.checkNotNull(objectSupplier, "'objectSupplier' not supplied"),
+ Preconditions.checkNotNull(objectOpenFunction, "'objectOpenFunction' not supplied"),
+ Preconditions.checkNotNull(retryCondition, "'retryCondition' not supplied"),
+ maxRetry,
+ Preconditions.checkNotNull(tempDirSupplier, "'tempDirSupplier' not supplied")
+ );
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java
new file mode 100644
index 000000000000..d8b879da433a
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorParametersTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class ChunkingStorageConnectorParametersTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(ChunkingStorageConnectorParameters.class)
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
+ public void testIncorrectParameters()
+ {
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(-1);
+ Assert.assertThrows(IllegalArgumentException.class, builder::build);
+ }
+
+ @Test
+ public void testCorrectParameters()
+ {
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(0);
+ builder.end(10);
+ builder.objectSupplier((start, end) -> null);
+ builder.objectOpenFunction(obj -> null);
+ builder.maxRetry(10);
+ builder.cloudStoragePath("/path");
+ builder.retryCondition(Predicates.alwaysTrue());
+ builder.tempDirSupplier(() -> new File("/tmp"));
+ ChunkingStorageConnectorParameters parameters = builder.build();
+ Assert.assertEquals(0, parameters.getStart());
+ Assert.assertEquals(10, parameters.getEnd());
+ Assert.assertEquals(10, parameters.getMaxRetry());
+ Assert.assertEquals("/path", parameters.getCloudStoragePath());
+ Assert.assertEquals("/tmp", parameters.getTempDirSupplier().get().getAbsolutePath());
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java
new file mode 100644
index 000000000000..ccadab3a8844
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/storage/remote/ChunkingStorageConnectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.storage.StorageConnector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class ChunkingStorageConnectorTest
+{
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private StorageConnector storageConnector;
+
+ @Before
+ public void setup() throws IOException
+ {
+ storageConnector = new TestStorageConnector(temporaryFolder.newFolder());
+ }
+
+ @Test
+ public void testRead() throws IOException
+ {
+ InputStream is = storageConnector.read("");
+ byte[] dataBytes = IOUtils.toByteArray(is);
+ Assert.assertEquals(TestStorageConnector.DATA, new String(dataBytes, StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testReadRange() throws IOException
+ {
+
+ List ranges = ImmutableList.of(
+ TestStorageConnector.CHUNK_SIZE_BYTES,
+ TestStorageConnector.CHUNK_SIZE_BYTES * 2,
+ TestStorageConnector.CHUNK_SIZE_BYTES * 7,
+ TestStorageConnector.CHUNK_SIZE_BYTES + 1,
+ TestStorageConnector.CHUNK_SIZE_BYTES + 2,
+ TestStorageConnector.CHUNK_SIZE_BYTES + 3
+ );
+
+ List startPositions = ImmutableList.of(0, 25, 37, TestStorageConnector.DATA.length() - 10);
+
+ for (int range : ranges) {
+ for (int startPosition : startPositions) {
+ int limitedRange = startPosition + range > TestStorageConnector.DATA.length()
+ ? TestStorageConnector.DATA.length() - startPosition
+ : range;
+ InputStream is = storageConnector.readRange("", startPosition, limitedRange);
+ byte[] dataBytes = IOUtils.toByteArray(is);
+ Assert.assertEquals(
+ TestStorageConnector.DATA.substring(startPosition, startPosition + limitedRange),
+ new String(dataBytes, StandardCharsets.UTF_8)
+ );
+ }
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java b/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java
new file mode 100644
index 000000000000..e5757a949265
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/storage/remote/TestStorageConnector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+public class TestStorageConnector extends ChunkingStorageConnector
+{
+
+ public static final String DATA = "This is some random data text. This should be returned in chunks by the methods, "
+ + "however the connector should reassemble it as a single stream of text";
+
+ public static final int CHUNK_SIZE_BYTES = 4;
+
+ private final File tempDir;
+
+ public TestStorageConnector(
+ final File tempDir
+ )
+ {
+ super(CHUNK_SIZE_BYTES);
+ this.tempDir = tempDir;
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(String path)
+ {
+ return buildInputParams(path, 0, DATA.length());
+ }
+
+ @Override
+ public ChunkingStorageConnectorParameters buildInputParams(
+ String path,
+ long from,
+ long size
+ )
+ {
+ ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>();
+ builder.start(from);
+ builder.end(from + size);
+ builder.cloudStoragePath(path);
+ builder.tempDirSupplier(() -> tempDir);
+ builder.retryCondition(Predicates.alwaysFalse());
+ builder.maxRetry(2);
+ builder.objectSupplier((start, end) -> new InputRange((int) start, (int) end));
+ builder.objectOpenFunction(new ObjectOpenFunction()
+ {
+ @Override
+ public InputStream open(InputRange ir)
+ {
+ return IOUtils.toInputStream(DATA.substring(ir.start, ir.end), StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public InputStream open(InputRange ir, long offset)
+ {
+ return open(new InputRange(ir.start + (int) offset, ir.end));
+ }
+ });
+ return builder.build();
+ }
+
+ @Override
+ public boolean pathExists(String path)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OutputStream write(String path)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteFile(String path)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteFiles(Iterable paths)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteRecursively(String path)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator listDir(String dirName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class InputRange
+ {
+ private final int start;
+ private final int end;
+
+ public InputRange(int start, int end)
+ {
+ this.start = start;
+ this.end = end;
+ }
+ }
+}