From 9a0a422847265b24754b6adf7da2c50e7e8f2784 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 22:52:37 +0530 Subject: [PATCH 01/14] Make changes to Azure --- .../azure/output/AzureOutputConfig.java | 17 +++++++++-- .../output/AzureStorageConnectorProvider.java | 8 +++-- .../AzureStorageConnectorProviderTest.java | 29 +++++++------------ .../storage/StorageConnectorProvider.java | 6 ++-- 4 files changed, 35 insertions(+), 25 deletions(-) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java index 7af9c856c5f5..9b02b5d1ce27 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java @@ -42,6 +42,7 @@ public class AzureOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -64,7 +65,7 @@ public class AzureOutputConfig public AzureOutputConfig( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -77,7 +78,6 @@ public AzureOutputConfig( validateFields(); } - public String getContainer() { return container; @@ -88,6 +88,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -103,6 +104,11 @@ public int getMaxRetry() return maxRetry; } + public AzureOutputConfig withTempDir(File tempDir) + { + return new AzureOutputConfig(container, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) { @@ -113,6 +119,13 @@ private void validateFields() AZURE_MAX_CHUNK_SIZE_BYTES ); } + } + + public void validateTempDirectory() + { + if (tempDir == null) { + throw DruidException.defensive("'tempDir' is required."); + } try { FileUtils.mkdirp(tempDir); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java index 79be724c17f7..1a3a4dab9cf0 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java @@ -45,7 +45,7 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements public AzureStorageConnectorProvider( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -54,8 +54,10 @@ public AzureStorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(final File tempDir) { - return new AzureStorageConnector(this, azureStorage); + AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + config.validateTempDirectory(); + return new AzureStorageConnector(config, azureStorage); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java index 0b76f02af29e..f2c923a3c0d8 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java @@ -25,10 +25,9 @@ import com.google.inject.Key; import com.google.inject.ProvisionException; import com.google.inject.name.Names; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.azure.AzureStorage; @@ -59,7 +58,7 @@ public void createAzureStorageFactoryWithRequiredProperties() StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties); assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider); - assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get()); + assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.createStorageConnector(new File("/tmp"))); assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer()); assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix()); assertEquals(new File("/tmp"), @@ -107,9 +106,9 @@ public void createAzureStorageFactoryWithMissingTempDir() properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); assertThrows( - ProvisionException.class, - () -> getStorageConnectorProvider(properties), - "Missing required creator property 'tempDir'" + DruidException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null), + "'tempDir' is required." ); } @@ -119,18 +118,12 @@ private StorageConnectorProvider getStorageConnectorProvider(Properties properti new AzureStorageDruidModule(), new StorageConnectorModule(), new AzureStorageConnectorModule(), - binder -> { - JsonConfigProvider.bind( - binder, - CUSTOM_NAMESPACE, - StorageConnectorProvider.class, - Names.named(CUSTOM_NAMESPACE) - ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); - } + binder -> JsonConfigProvider.bind( + binder, + CUSTOM_NAMESPACE, + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + ) ).withProperties(properties); Injector injector = startupInjectorBuilder.build(); diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java index 9fece71eab8e..2568d89802d6 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java @@ -20,9 +20,11 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; + +import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface StorageConnectorProvider extends Provider +public interface StorageConnectorProvider { + StorageConnector createStorageConnector(File tempDir); } From a53d767121bbe2535c60e65da30f0e2b7f3efc28 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 22:53:42 +0530 Subject: [PATCH 02/14] GCS changes --- .../storage/google/output/GoogleOutputConfig.java | 9 ++++++++- .../google/output/GoogleStorageConnectorProvider.java | 7 ++++--- .../output/GoogleStorageConnectorProviderTest.java | 10 +++------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index c9c78151ae99..4e42c6d274af 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -37,6 +37,7 @@ public class GoogleOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -58,7 +59,7 @@ public class GoogleOutputConfig public GoogleOutputConfig( final String bucket, final String prefix, - final File tempDir, + @Nullable final File tempDir, @Nullable final HumanReadableBytes chunkSize, @Nullable final Integer maxRetry ) @@ -82,6 +83,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -97,6 +99,11 @@ public Integer getMaxRetry() return maxRetry; } + public GoogleOutputConfig withTempDir(File tempDir) + { + return new GoogleOutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 49856a9c1eff..18ed8219a886 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -47,7 +47,7 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement public GoogleStorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -56,8 +56,9 @@ public GoogleStorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File tempDir) { - return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig); + GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java index df6c66e84c3f..a264ead94a13 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java @@ -26,9 +26,8 @@ import com.google.inject.ProvisionException; import com.google.inject.name.Names; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.google.GoogleInputDataConfig; @@ -44,6 +43,7 @@ public class GoogleStorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createGoogleStorageFactoryWithRequiredProperties() @@ -57,7 +57,7 @@ public void createGoogleStorageFactoryWithRequiredProperties() StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider); - Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector); + Assert.assertTrue(googleStorageConnectorProvider.createStorageConnector(tempDir) instanceof GoogleStorageConnector); Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir()); @@ -124,10 +124,6 @@ private StorageConnectorProvider getStorageConnectorProvider(Properties properti StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } ).withProperties(properties); From fc4781b2d7910d3d934e5cd823123b41aa1774f0 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 23:01:15 +0530 Subject: [PATCH 03/14] S3 changes --- .../druid/storage/s3/output/S3OutputConfig.java | 10 +++++++++- .../s3/output/S3StorageConnectorProvider.java | 8 +++++--- .../s3/S3StorageConnectorProviderTest.java | 16 ++++++---------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java index 35e228f7ef3e..cacbe4272e6d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java @@ -40,6 +40,7 @@ public class S3OutputConfig @JsonProperty private String prefix; + @Nullable @JsonProperty private File tempDir; @@ -57,7 +58,7 @@ public class S3OutputConfig public S3OutputConfig( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -69,6 +70,7 @@ public S3OutputConfig( protected S3OutputConfig( String bucket, String prefix, + @Nullable File tempDir, @Nullable HumanReadableBytes chunkSize, @@ -120,6 +122,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -135,6 +138,11 @@ public int getMaxRetry() return maxRetry; } + public S3OutputConfig withTempDir(File tempDir) + { + return new S3OutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private static void validateChunkSize(long chunkSize) { if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index f86aee9a1aaf..5a53c60cc1b0 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -30,6 +30,7 @@ import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import javax.annotation.Nullable; import java.io.File; @JsonTypeName(S3StorageDruidModule.SCHEME) @@ -45,7 +46,7 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag public S3StorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -54,8 +55,9 @@ public S3StorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File tempDir) { - return new S3StorageConnector(this, s3, s3UploadManager); + S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + return new S3StorageConnector(config, s3, s3UploadManager); } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index a880d6f2efa5..3210a26cc584 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -29,12 +29,11 @@ import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSModule; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.s3.output.S3ExportConfig; @@ -54,6 +53,7 @@ public class S3StorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createS3StorageFactoryWithRequiredProperties() @@ -67,7 +67,7 @@ public void createS3StorageFactoryWithRequiredProperties() StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(s3StorageConnectorProvider instanceof S3StorageConnectorProvider); - Assert.assertTrue(s3StorageConnectorProvider.get() instanceof S3StorageConnector); + Assert.assertTrue(s3StorageConnectorProvider.createStorageConnector(tempDir) instanceof S3StorageConnector); Assert.assertEquals("bucket", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((S3StorageConnectorProvider) s3StorageConnectorProvider).getTempDir()); @@ -115,9 +115,9 @@ public void createS3StorageFactoryWithMissingTempDir() properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); Assert.assertThrows( - "Missing required creator property 'tempDir'", - ProvisionException.class, - () -> getStorageConnectorProvider(properties) + "tempDir is null in s3 config", + NullPointerException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null) ); } @@ -138,10 +138,6 @@ public void configure(Binder binder) StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } } ).withProperties(properties); From 3425e52da50531722e594ed9217227a038c978ce Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 23:03:03 +0530 Subject: [PATCH 04/14] Indexer bindings --- .../msq/guice/MSQDurableStorageModule.java | 29 ++++++++++++++++--- .../indexing/IndexerControllerContext.java | 9 +++++- .../msq/indexing/IndexerWorkerContext.java | 9 +++++- .../LocalFileStorageConnectorProvider.java | 2 +- .../storage/StorageConnectorModuleTest.java | 5 ++-- .../local/LocalFileStorageConnectorTest.java | 20 ++++++------- 6 files changed, 55 insertions(+), 19 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 7139377495a8..5dadf96ac770 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -24,6 +24,7 @@ import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Key; +import com.google.inject.Provider; import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; @@ -84,10 +85,6 @@ public void configure(Binder binder) MultiStageQuery.class ); - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)) - .in(LazySingleton.class); - if (nodeRoles.contains(NodeRole.OVERLORD)) { JsonConfigProvider.bind( binder, @@ -95,6 +92,10 @@ public void configure(Binder binder) DurableStorageCleanerConfig.class ); + binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toProvider(NoTempDirectoryStorageConnector.class) + .in(LazySingleton.class); + Multibinder.newSetBinder(binder, OverlordDuty.class) .addBinding() .to(DurableStorageCleaner.class); @@ -102,11 +103,31 @@ public void configure(Binder binder) } else if (nodeRoles.contains(NodeRole.BROKER)) { // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance()); + + binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toProvider(NoTempDirectoryStorageConnector.class) + .in(LazySingleton.class); } else { // do nothing } } + private static class NoTempDirectoryStorageConnector implements Provider + { + private StorageConnectorProvider storageConnectorProvider; + + @Inject + public void inject(StorageConnectorProvider storageConnectorProvider) + { + this.storageConnectorProvider = storageConnectorProvider; + } + @Override + public StorageConnector get() + { + return storageConnectorProvider.createStorageConnector(null); + } + } + private boolean isDurableShuffleStorageEnabled() { return Boolean.parseBoolean(properties.getProperty(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false")); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1037aa6c2af0..e3f32a42b3fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -39,6 +39,7 @@ import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.exec.WorkerManager; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.error.MSQException; @@ -56,6 +57,8 @@ import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.server.DruidNode; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -85,11 +88,15 @@ public IndexerControllerContext( { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); IndexTaskUtils.setTaskDimensions(metricBuilder, task); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 0b3063ef48ba..a05618ed140c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -41,6 +41,7 @@ import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.IndexerControllerClient; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.client.WorkerChatHandler; @@ -59,6 +60,8 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.server.DruidNode; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.io.File; import java.util.concurrent.ExecutorService; @@ -99,7 +102,6 @@ public IndexerWorkerContext( { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.overlordClient = overlordClient; this.indexIO = indexIO; this.dataSegmentProvider = dataSegmentProvider; @@ -110,6 +112,11 @@ public IndexerWorkerContext( final QueryContext queryContext = QueryContext.of(task.getContext()); this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } public static IndexerWorkerContext createProductionInstance( diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java index 82d1623f8404..8434af07d11e 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java @@ -45,7 +45,7 @@ public LocalFileStorageConnectorProvider(@JsonProperty(value = "basePath", requi } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File tempDir) { try { return new LocalFileStorageConnector(basePath); diff --git a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java index df9a88d4813e..46cc01760eb3 100644 --- a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java +++ b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java @@ -46,8 +46,9 @@ public class StorageConnectorModuleTest public void testJsonSerde() throws JsonProcessingException { StorageConnectorProvider storageConnectorProvider = objectMapper.readValue(JSON, StorageConnectorProvider.class); - Assert.assertTrue(storageConnectorProvider.get() instanceof LocalFileStorageConnector); - Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnectorProvider.get()).getBasePath()); + StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(new File("/tmp/tmpDir")); + Assert.assertTrue(storageConnector instanceof LocalFileStorageConnector); + Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnector).getBasePath()); } diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java index 4bc8886f9998..d6eec86723b4 100644 --- a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java +++ b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java @@ -49,14 +49,14 @@ public class LocalFileStorageConnectorTest @Rule public ExpectedException expectedException = ExpectedException.none(); - private File tempDir; + private File storageDir; private StorageConnector storageConnector; @Before public void init() throws IOException { - tempDir = temporaryFolder.newFolder(); - storageConnector = new LocalFileStorageConnectorProvider(tempDir).get(); + storageDir = temporaryFolder.newFolder(); + storageConnector = new LocalFileStorageConnectorProvider(storageDir).createStorageConnector(null); } @Test @@ -69,14 +69,14 @@ public void sanityCheck() throws IOException // check if file is created Assert.assertTrue(storageConnector.pathExists(uuid)); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), uuid).exists()); // check contents checkContents(uuid); // delete file storageConnector.deleteFile(uuid); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid).exists()); } @Test @@ -96,14 +96,14 @@ public void deleteRecursivelyTest() throws IOException checkContents(uuid1); checkContents(uuid2); - File baseFile = new File(tempDir.getAbsolutePath(), uuid_base); + File baseFile = new File(storageDir.getAbsolutePath(), uuid_base); Assert.assertTrue(baseFile.exists()); Assert.assertTrue(baseFile.isDirectory()); Assert.assertEquals(2, baseFile.listFiles().length); storageConnector.deleteRecursively(uuid_base); Assert.assertFalse(baseFile.exists()); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), topLevelDir).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), topLevelDir).exists()); } @Test @@ -118,8 +118,8 @@ public void batchDelete() throws IOException // delete file storageConnector.deleteFiles(ImmutableList.of(uuid1, uuid2)); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid1).exists()); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid2).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid1).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid2).exists()); } @Test @@ -128,7 +128,7 @@ public void incorrectBasePath() throws IOException File file = temporaryFolder.newFile(); expectedException.expect(IAE.class); StorageConnectorProvider storageConnectorProvider = new LocalFileStorageConnectorProvider(file); - storageConnectorProvider.get(); + storageConnectorProvider.createStorageConnector(null); } @Test From 8b57c8fdd53dd148d0d56fafa0e1fd54db4a01ee Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 23:26:31 +0530 Subject: [PATCH 05/14] Fix injection --- .../msq/guice/MSQDurableStorageModule.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 5dadf96ac770..5d5d9b617a49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -86,27 +86,28 @@ public void configure(Binder binder) ); if (nodeRoles.contains(NodeRole.OVERLORD)) { + binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toProvider(NoTempDirectoryStorageConnector.class) + .in(LazySingleton.class); + JsonConfigProvider.bind( binder, String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"), DurableStorageCleanerConfig.class ); - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(NoTempDirectoryStorageConnector.class) - .in(LazySingleton.class); - Multibinder.newSetBinder(binder, OverlordDuty.class) .addBinding() .to(DurableStorageCleaner.class); } + if (nodeRoles.contains(NodeRole.BROKER)) { + binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toProvider(NoTempDirectoryStorageConnector.class) + .in(LazySingleton.class); + } } else if (nodeRoles.contains(NodeRole.BROKER)) { // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance()); - - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(NoTempDirectoryStorageConnector.class) - .in(LazySingleton.class); } else { // do nothing } From 6f71916259dbd2daf342ec393ef8c5597f39a5d4 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 5 Sep 2024 23:59:24 +0530 Subject: [PATCH 06/14] Changes for export --- .../output/GoogleExportStorageProvider.java | 13 ++++----- .../GoogleExportStorageProviderTest.java | 6 +++- .../druid/msq/exec/ControllerContext.java | 7 +++++ .../apache/druid/msq/exec/ControllerImpl.java | 28 ++++--------------- .../druid/msq/exec/ExportMetadataManager.java | 7 +++-- .../msq/guice/MSQDurableStorageModule.java | 3 +- .../indexing/IndexerControllerContext.java | 7 +++++ .../ExportResultsFrameProcessorFactory.java | 2 +- .../msq/test/MSQTestControllerContext.java | 9 ++++++ .../storage/s3/output/S3ExportConfig.java | 3 +- .../s3/output/S3ExportStorageProvider.java | 14 +++++----- .../druid/storage/ExportStorageProvider.java | 6 +++- .../local/LocalFileExportStorageProvider.java | 2 +- 13 files changed, 62 insertions(+), 45 deletions(-) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java index 8d0c6b50b316..eba6545272d4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java @@ -70,13 +70,12 @@ public GoogleExportStorageProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = googleExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export."); + final String exportConfigTempDir = googleExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = googleExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +88,7 @@ public StorageConnector get() final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, googleExportConfig.getChunkSize(), googleExportConfig.getMaxRetry() ); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java index e40846a848d7..8b9c8691849c 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java @@ -21,14 +21,18 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnector; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.List; public class GoogleExportStorageProviderTest { + private final File tempDir = FileUtils.createTempDir(); + private final List validPrefixes = ImmutableList.of( "gs://bucket-name/validPath1", "gs://bucket-name/validPath2" @@ -39,7 +43,7 @@ public void testGoogleExportStorageProvider() { GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1"); googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes); - StorageConnector storageConnector = googleExportStorageProvider.get(); + StorageConnector storageConnector = googleExportStorageProvider.createStorageConnector(tempDir); Assert.assertNotNull(storageConnector); Assert.assertTrue(storageConnector instanceof GoogleStorageConnector); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 40b114511c28..f21cefc4a2fa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -32,6 +32,8 @@ import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.server.DruidNode; +import java.io.File; + /** * Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own * implementations. @@ -96,6 +98,11 @@ WorkerManager newWorkerManager( WorkerFailureListener workerFailureListener ); + /** + * Fetch a directory for temporary outputs + */ + File taskTempDir(); + /** * Client for communicating with workers. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 77a0b7d48d69..d2a0d63c28ef 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -567,7 +567,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) queryId(), makeQueryControllerToolKit(), querySpec, - context.jsonMapper(), + context, resultsContext ); @@ -1548,26 +1548,7 @@ private void handleQueryResults( } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); - - final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); - //noinspection unchecked - - - Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); - if (!(resultObjectForStage instanceof List)) { - // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. - log.warn("Was unable to create manifest file due to "); - return; - } - @SuppressWarnings("unchecked") - List exportedFiles = (List) queryKernel.getResultObjectForStage(finalStageId); - log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size()); - exportMetadataManager.writeMetadata(exportedFiles); - } else if (MSQControllerTask.isExport(querySpec)) { - // Write manifest file. - ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); + ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir()); final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked @@ -1715,10 +1696,11 @@ private static QueryDefinition makeQueryDefinition( final String queryId, @SuppressWarnings("rawtypes") final QueryKit toolKit, final MSQSpec querySpec, - final ObjectMapper jsonMapper, + final ControllerContext controllerContext, final ResultsContext resultsContext ) { + final ObjectMapper jsonMapper = controllerContext.jsonMapper(); final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; @@ -1838,7 +1820,7 @@ private static QueryDefinition makeQueryDefinition( try { // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export. - Iterator filesIterator = exportStorageProvider.get().listDir(""); + Iterator filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir(""); if (filesIterator.hasNext()) { throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.RUNTIME_FAILURE) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java index 3b9d0296de5a..00c5d7799de3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.storage.StorageConnector; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -42,15 +43,17 @@ public class ExportMetadataManager public static final int MANIFEST_FILE_VERSION = 1; private static final Logger log = new Logger(ExportMetadataManager.class); private final ExportStorageProvider exportStorageProvider; + private final File tmpDir; - public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider, final File tmpDir) { this.exportStorageProvider = exportStorageProvider; + this.tmpDir = tmpDir; } public void writeMetadata(List exportedFiles) throws IOException { - final StorageConnector storageConnector = exportStorageProvider.get(); + final StorageConnector storageConnector = exportStorageProvider.createStorageConnector(tmpDir); log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath()); if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 5d5d9b617a49..89b452125808 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -118,10 +118,11 @@ private static class NoTempDirectoryStorageConnector implements Provider makeProcessors( readableInput.getChannel(), exportFormat, readableInput.getChannelFrameReader(), - exportStorageProvider.get(), + exportStorageProvider.createStorageConnector(frameContext.tempDir()), frameContext.jsonMapper(), channelCounter, getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat), diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e65104302032..8439fc800393 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -39,6 +39,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; @@ -69,6 +70,7 @@ import org.mockito.Mockito; import javax.annotation.Nullable; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -89,6 +91,7 @@ public class MSQTestControllerContext implements ControllerContext NUM_WORKERS, "MultiStageQuery-test-controller-client" )); + private final File tempDir = FileUtils.createTempDir(); private final CoordinatorClient coordinatorClient; private final DruidNode node = new DruidNode( "controller", @@ -331,6 +334,12 @@ public WorkerManager newWorkerManager( ); } + @Override + public File taskTempDir() + { + return tempDir; + } + @Override public void registerController(Controller controller, Closer closer) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java index d5477c2998e0..fec5776aedc3 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java @@ -39,7 +39,7 @@ public class S3ExportConfig @JsonCreator public S3ExportConfig( - @JsonProperty("tempLocalDir") final String tempLocalDir, + @JsonProperty("tempLocalDir") @Nullable final String tempLocalDir, @JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize, @JsonProperty("maxRetry") @Nullable final Integer maxRetry, @JsonProperty("allowedExportPaths") final List allowedExportPaths) @@ -50,6 +50,7 @@ public S3ExportConfig( this.allowedExportPaths = allowedExportPaths; } + @Nullable public String getTempLocalDir() { return tempLocalDir; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index ca599fc9d495..129622dfbd77 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -69,14 +69,14 @@ public S3ExportStorageProvider( this.prefix = prefix; } + @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = s3ExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.s3.tempLocalDir` must be configured for S3 export."); + final String exportConfigTempDir = s3ExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = s3ExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +89,7 @@ public StorageConnector get() final S3OutputConfig s3OutputConfig = new S3OutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, s3ExportConfig.getChunkSize(), s3ExportConfig.getMaxRetry() ); diff --git a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java index 173544e2f8db..48621263e9dd 100644 --- a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.inject.Provider; +import java.io.File; + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface ExportStorageProvider extends Provider +public interface ExportStorageProvider { String getResourceType(); @@ -33,4 +35,6 @@ public interface ExportStorageProvider extends Provider String getBasePath(); String getFilePathForManifest(String fileName); + + StorageConnector createStorageConnector(File taskTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java index 74b099aef885..8755b9373298 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java @@ -54,7 +54,7 @@ public LocalFileExportStorageProvider(@JsonProperty(value = "exportPath", requir } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath); try { From 3111bd2e0b63e1543389dff930ceca06e59c70a1 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 6 Sep 2024 08:45:01 +0530 Subject: [PATCH 07/14] Fix injection --- .../msq/guice/MSQDurableStorageModule.java | 31 +------------------ .../cleaner/DurableStorageCleaner.java | 5 +-- .../sql/resources/SqlStatementResource.java | 5 +-- .../indexing/DurableStorageCleanerTest.java | 2 +- .../SqlMSQStatementResourcePostTest.java | 5 ++- .../resources/SqlStatementResourceTest.java | 3 +- 6 files changed, 12 insertions(+), 39 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 89b452125808..bc758f0a3994 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -24,18 +24,15 @@ import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Key; -import com.google.inject.Provider; import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; import org.apache.druid.storage.NilStorageConnector; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorProvider; import java.util.List; @@ -86,10 +83,6 @@ public void configure(Binder binder) ); if (nodeRoles.contains(NodeRole.OVERLORD)) { - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(NoTempDirectoryStorageConnector.class) - .in(LazySingleton.class); - JsonConfigProvider.bind( binder, String.join(".", MSQ_INTERMEDIATE_STORAGE_PREFIX, "cleaner"), @@ -100,36 +93,14 @@ public void configure(Binder binder) .addBinding() .to(DurableStorageCleaner.class); } - if (nodeRoles.contains(NodeRole.BROKER)) { - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(NoTempDirectoryStorageConnector.class) - .in(LazySingleton.class); - } } else if (nodeRoles.contains(NodeRole.BROKER)) { // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance()); + binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance()); } else { // do nothing } } - private static class NoTempDirectoryStorageConnector implements Provider - { - private StorageConnectorProvider storageConnectorProvider; - - @Inject - public void inject(@MultiStageQuery StorageConnectorProvider storageConnectorProvider) - { - this.storageConnectorProvider = storageConnectorProvider; - } - - @Override - public StorageConnector get() - { - return storageConnectorProvider.createStorageConnector(null); - } - } - private boolean isDurableShuffleStorageEnabled() { return Boolean.parseBoolean(properties.getProperty(MSQ_INTERMEDIATE_STORAGE_ENABLED, "false")); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java index 630499bdd876..6b45273fa72f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.util.HashSet; import java.util.Iterator; @@ -55,12 +56,12 @@ public class DurableStorageCleaner implements OverlordDuty @Inject public DurableStorageCleaner( final DurableStorageCleanerConfig config, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, @JacksonInject final Provider taskMasterProvider ) { this.config = config; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.taskMasterProvider = taskMasterProvider; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index f56622804786..c92bfa955fb6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlResource; import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.servlet.http.HttpServletRequest; @@ -134,14 +135,14 @@ public SqlStatementResource( final @MultiStageQuery SqlStatementFactory msqSqlStatementFactory, final ObjectMapper jsonMapper, final OverlordClient overlordClient, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, final AuthorizerMapper authorizerMapper ) { this.msqSqlStatementFactory = msqSqlStatementFactory; this.jsonMapper = jsonMapper; this.overlordClient = overlordClient; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.authorizerMapper = authorizerMapper; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java index 3da4adcbc3f5..b83beadeccda 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java @@ -59,7 +59,7 @@ public void setUp() durableStorageCleanerConfig.enabled = true; durableStorageCleaner = new DurableStorageCleaner( durableStorageCleanerConfig, - STORAGE_CONNECTOR, + s -> STORAGE_CONNECTOR, () -> TASK_MASTER ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index cef3e00daa2d..656086cef434 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -54,7 +54,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -76,7 +75,7 @@ public void init() sqlStatementFactory, objectMapper, indexingServiceClient, - localFileStorageConnector, + s -> localFileStorageConnector, authorizerMapper ); } @@ -330,7 +329,7 @@ public void durableStorageDisabledTest() sqlStatementFactory, objectMapper, indexingServiceClient, - NilStorageConnector.getInstance(), + s -> NilStorageConnector.getInstance(), authorizerMapper ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index a79edee53b8f..477575441209 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -693,7 +693,8 @@ public void init() throws Exception sqlStatementFactory, objectMapper, overlordClient, - new LocalFileStorageConnector(newTempFolder("local")), + tempDir -> localFileStorageConnector, + authorizerMapper ); } From 982db62571ce68efea8334141a4c036ae1191faf Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 6 Sep 2024 09:06:51 +0530 Subject: [PATCH 08/14] Fix tests --- .../storage/azure/output/AzureOutputConfig.java | 2 +- .../azure/output/AzureOutputConfigTest.java | 3 +-- .../output/AzureStorageConnectorProviderTest.java | 14 +++++++++++++- .../org/apache/druid/msq/exec/ControllerImpl.java | 2 +- .../sql/resources/SqlStatementResourceTest.java | 1 - 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java index 9b02b5d1ce27..004e79a66a60 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java @@ -124,7 +124,7 @@ private void validateFields() public void validateTempDirectory() { if (tempDir == null) { - throw DruidException.defensive("'tempDir' is required."); + throw DruidException.defensive("The runtime property `druid.msq.intermediate.storage.tempDir` must be configured."); } try { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java index 058887316ec9..fca486a4246f 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java @@ -57,10 +57,9 @@ public void testTempDirectoryNotWritable(@TempDir File tempDir) throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName()); } - //noinspection ResultOfObjectAllocationIgnored assertThrows( DruidException.class, - () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT) + () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT).validateTempDirectory() ); } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java index f2c923a3c0d8..74141519bf96 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java @@ -108,10 +108,22 @@ public void createAzureStorageFactoryWithMissingTempDir() assertThrows( DruidException.class, () -> getStorageConnectorProvider(properties).createStorageConnector(null), - "'tempDir' is required." + "The runtime property `druid.msq.intermediate.storage.tempDir` must be configured." ); } + @Test + public void createAzureStorageFactoryWithMissingTempDirButProvidedDuringRuntime() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure"); + properties.setProperty(CUSTOM_NAMESPACE + ".container", "container"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + + getStorageConnectorProvider(properties).createStorageConnector(new File("/tmp")); + } + private StorageConnectorProvider getStorageConnectorProvider(Properties properties) { StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d2a0d63c28ef..5bbe9a9bf37b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1700,7 +1700,7 @@ private static QueryDefinition makeQueryDefinition( final ResultsContext resultsContext ) { - final ObjectMapper jsonMapper = controllerContext.jsonMapper(); + final ObjectMapper jsonMapper = controllerContext.jsonMapper(); final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index 477575441209..5fd689357d4b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -81,7 +81,6 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlResourceTest; -import org.apache.druid.storage.local.LocalFileStorageConnector; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; From 11ca65e59656df012d03b49cd89cde55e366f602 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 6 Sep 2024 10:19:48 +0530 Subject: [PATCH 09/14] Update docs --- docs/multi-stage-query/reference.md | 14 +++++++------- .../sql/resources/SqlStatementResourceTest.java | 3 +-- .../druid/storage/ExportStorageProvider.java | 1 - 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 50f07ff80b48..6facbaedcb3d 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -155,8 +155,8 @@ The following runtime parameters must be configured to export into an S3 destina | Runtime Parameter | Required | Description | Default | |----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----| -| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | | `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a | +| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | | `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | | `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | @@ -186,12 +186,12 @@ Supported arguments for the function: The following runtime parameters must be configured to export into a GCS destination: -| Runtime Parameter | Required | Description | Default | -|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | +| Runtime Parameter | Required | Description | Default | +|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| | `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | -| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | -| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | +| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | +| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | ##### LOCAL @@ -531,7 +531,7 @@ Common properties to configure the behavior of durable storage |--|--|--| |`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false | |`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a | -|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a | +|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data. If the property is not configured on the indexer or middle manager, it defaults to using the task temporary directory. | n/a | |`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | |`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB | diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index 5fd689357d4b..40dcb303b1dc 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -684,7 +684,7 @@ private static AuthenticationResult makeAuthResultForUser(String user) } @BeforeEach - public void init() throws Exception + public void init() { overlordClient = Mockito.mock(OverlordClient.class); setupMocks(overlordClient); @@ -693,7 +693,6 @@ public void init() throws Exception objectMapper, overlordClient, tempDir -> localFileStorageConnector, - authorizerMapper ); } diff --git a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java index 48621263e9dd..d19b2f5f7e1b 100644 --- a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java @@ -20,7 +20,6 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; import java.io.File; From 0e287bf4dc72221a70f714a48e354b1cd5172a9c Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 6 Sep 2024 10:42:02 +0530 Subject: [PATCH 10/14] Fix tests --- .../druid/storage/s3/output/S3OutputSerdeTest.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java index 72ea888615fc..3e2476537136 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java @@ -97,20 +97,6 @@ public void noBucket() throws JsonProcessingException MAPPER.readValue(json, S3OutputConfig.class); } - @Test - public void noTempDir() throws JsonProcessingException - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"bucket\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - expectedException.expect(MismatchedInputException.class); - expectedException.expectMessage("Missing required creator property 'tempDir'"); - MAPPER.readValue(json, S3OutputConfig.class); - } - @Test public void leastArguments() throws JsonProcessingException { From 1085f58f5f38e7642af0c1802b98ce8e0d612ede Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 16 Oct 2024 14:01:00 +0530 Subject: [PATCH 11/14] Add javadoc --- .../azure/output/AzureStorageConnectorProvider.java | 4 ++-- .../google/output/GoogleStorageConnectorProvider.java | 4 ++-- .../org/apache/druid/msq/exec/ControllerContext.java | 5 ++++- .../storage/s3/output/S3StorageConnectorProvider.java | 4 ++-- .../druid/storage/StorageConnectorProvider.java | 11 ++++++++++- .../local/LocalFileStorageConnectorProvider.java | 2 +- 6 files changed, 21 insertions(+), 9 deletions(-) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java index 1a3a4dab9cf0..0f2cdb47d441 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java @@ -54,9 +54,9 @@ public AzureStorageConnectorProvider( } @Override - public StorageConnector createStorageConnector(final File tempDir) + public StorageConnector createStorageConnector(final File defaultTempDir) { - AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; config.validateTempDirectory(); return new AzureStorageConnector(config, azureStorage); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 18ed8219a886..ac5c6a3c9642 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -56,9 +56,9 @@ public GoogleStorageConnectorProvider( } @Override - public StorageConnector createStorageConnector(File tempDir) + public StorageConnector createStorageConnector(File defaultTempDir) { - GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 181ceabb91e4..7f5e4a0a2b09 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -109,7 +109,10 @@ WorkerManager newWorkerManager( /** * Fetch a directory for temporary outputs */ - File taskTempDir(); + default File taskTempDir() + { + throw new UnsupportedOperationException(); + } /** * Client for communicating with workers. diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index 5a53c60cc1b0..57f131746f0d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -55,9 +55,9 @@ public S3StorageConnectorProvider( } @Override - public StorageConnector createStorageConnector(File tempDir) + public StorageConnector createStorageConnector(File defaultTempDir) { - S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(tempDir) : this; + S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; return new S3StorageConnector(config, s3, s3UploadManager); } } diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java index 2568d89802d6..a2023e6aa946 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java @@ -26,5 +26,14 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface StorageConnectorProvider { - StorageConnector createStorageConnector(File tempDir); + /** + * Returns the storage connector. Takes a parameter defaultTempDir to be possibly used as the temporary directory, if the + * storage connector requires one. This StorageConnectorProvider is not guaranteed to use this value, even if the + * StorageConnectorProvider requires one, as it gives priority to a value of defaultTempDir configured as a runtime + * configuration. + *
+ * This value needs to be passed instead of injected by Jackson as the default temporary directory is dependent on the + * task id, and such dynamic task specific bindings is not possible on indexers. + */ + StorageConnector createStorageConnector(File defaultTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java index 8434af07d11e..7c66f7906275 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java @@ -45,7 +45,7 @@ public LocalFileStorageConnectorProvider(@JsonProperty(value = "basePath", requi } @Override - public StorageConnector createStorageConnector(File tempDir) + public StorageConnector createStorageConnector(File defaultTempDir) { try { return new LocalFileStorageConnector(basePath); From 2ab1d7b3505beccfa0747d1f8800ce70cd956655 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 21 Oct 2024 14:30:12 +0530 Subject: [PATCH 12/14] Fix bindings --- .../storage/azure/output/AzureOutputSerdeTest.java | 12 ------------ .../druid/msq/guice/MSQDurableStorageModule.java | 6 ++---- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java index aea5232217a7..1ece63ccfcc1 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java @@ -86,18 +86,6 @@ public void noContainer() assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); } - @Test - public void noTempDir() - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"container\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); - } - @Test public void leastArguments() throws JsonProcessingException { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index bc758f0a3994..55d60a6473e1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -93,11 +93,9 @@ public void configure(Binder binder) .addBinding() .to(DurableStorageCleaner.class); } - } else if (nodeRoles.contains(NodeRole.BROKER)) { - // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. - binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance()); } else { - // do nothing + // bind with nil implementation so that configs are not required during service startups. + binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance()); } } From 055b0f651d1b22e21ffc274c9f9c2b8d8277f03c Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 22 Oct 2024 09:12:33 +0530 Subject: [PATCH 13/14] Fix test --- .../apache/druid/msq/indexing/DurableStorageCleanerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java index b83beadeccda..0ddf807d7075 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.duty.DutySchedule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; +import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; import org.easymock.Capture; import org.easymock.EasyMock; @@ -126,7 +127,7 @@ public void testGetSchedule() DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig(); cleanerConfig.delaySeconds = 10L; cleanerConfig.enabled = true; - DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null); + DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, (temp) -> NilStorageConnector.getInstance(), null); DutySchedule schedule = durableStorageCleaner.getSchedule(); Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis()); From f3ca7d333a31e411d2fcc77cae26654d44f6140c Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 22 Oct 2024 10:39:09 +0530 Subject: [PATCH 14/14] Fix test --- .../apache/druid/msq/indexing/IndexerWorkerContextTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 8de80cf109f8..7ac389b98037 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -22,12 +22,16 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.inject.Injector; +import com.google.inject.Key; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.rpc.ServiceLocation; import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.storage.NilStorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -45,6 +49,8 @@ public void setup() final Injector injectorMock = Mockito.mock(Injector.class); Mockito.when(injectorMock.getInstance(SegmentCacheManagerFactory.class)) .thenReturn(Mockito.mock(SegmentCacheManagerFactory.class)); + Mockito.when(injectorMock.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))) + .thenReturn(defaultTempDir -> NilStorageConnector.getInstance()); final MSQWorkerTask task = Mockito.mock(MSQWorkerTask.class, Mockito.withSettings().strictness(Strictness.STRICT_STUBS));