diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 50f07ff80b48..6facbaedcb3d 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -155,8 +155,8 @@ The following runtime parameters must be configured to export into an S3 destina | Runtime Parameter | Required | Description | Default | |----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----| -| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | | `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a | +| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | | `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | | `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | @@ -186,12 +186,12 @@ Supported arguments for the function: The following runtime parameters must be configured to export into a GCS destination: -| Runtime Parameter | Required | Description | Default | -|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | +| Runtime Parameter | Required | Description | Default | +|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| | `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | -| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | -| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | +| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | +| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | ##### LOCAL @@ -531,7 +531,7 @@ Common properties to configure the behavior of durable storage |--|--|--| |`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false | |`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a | -|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a | +|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data. If the property is not configured on the indexer or middle manager, it defaults to using the task temporary directory. | n/a | |`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | |`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB | diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java index 7af9c856c5f5..004e79a66a60 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java @@ -42,6 +42,7 @@ public class AzureOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -64,7 +65,7 @@ public class AzureOutputConfig public AzureOutputConfig( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -77,7 +78,6 @@ public AzureOutputConfig( validateFields(); } - public String getContainer() { return container; @@ -88,6 +88,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -103,6 +104,11 @@ public int getMaxRetry() return maxRetry; } + public AzureOutputConfig withTempDir(File tempDir) + { + return new AzureOutputConfig(container, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) { @@ -113,6 +119,13 @@ private void validateFields() AZURE_MAX_CHUNK_SIZE_BYTES ); } + } + + public void validateTempDirectory() + { + if (tempDir == null) { + throw DruidException.defensive("The runtime property `druid.msq.intermediate.storage.tempDir` must be configured."); + } try { FileUtils.mkdirp(tempDir); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java index 79be724c17f7..0f2cdb47d441 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java @@ -45,7 +45,7 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements public AzureStorageConnectorProvider( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -54,8 +54,10 @@ public AzureStorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(final File defaultTempDir) { - return new AzureStorageConnector(this, azureStorage); + AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + config.validateTempDirectory(); + return new AzureStorageConnector(config, azureStorage); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java index 058887316ec9..fca486a4246f 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java @@ -57,10 +57,9 @@ public void testTempDirectoryNotWritable(@TempDir File tempDir) throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName()); } - //noinspection ResultOfObjectAllocationIgnored assertThrows( DruidException.class, - () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT) + () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT).validateTempDirectory() ); } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java index aea5232217a7..1ece63ccfcc1 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java @@ -86,18 +86,6 @@ public void noContainer() assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); } - @Test - public void noTempDir() - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"container\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); - } - @Test public void leastArguments() throws JsonProcessingException { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java index 0b76f02af29e..74141519bf96 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java @@ -25,10 +25,9 @@ import com.google.inject.Key; import com.google.inject.ProvisionException; import com.google.inject.name.Names; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.azure.AzureStorage; @@ -59,7 +58,7 @@ public void createAzureStorageFactoryWithRequiredProperties() StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties); assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider); - assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get()); + assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.createStorageConnector(new File("/tmp"))); assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer()); assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix()); assertEquals(new File("/tmp"), @@ -107,30 +106,36 @@ public void createAzureStorageFactoryWithMissingTempDir() properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); assertThrows( - ProvisionException.class, - () -> getStorageConnectorProvider(properties), - "Missing required creator property 'tempDir'" + DruidException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null), + "The runtime property `druid.msq.intermediate.storage.tempDir` must be configured." ); } + @Test + public void createAzureStorageFactoryWithMissingTempDirButProvidedDuringRuntime() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure"); + properties.setProperty(CUSTOM_NAMESPACE + ".container", "container"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + + getStorageConnectorProvider(properties).createStorageConnector(new File("/tmp")); + } + private StorageConnectorProvider getStorageConnectorProvider(Properties properties) { StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add( new AzureStorageDruidModule(), new StorageConnectorModule(), new AzureStorageConnectorModule(), - binder -> { - JsonConfigProvider.bind( - binder, - CUSTOM_NAMESPACE, - StorageConnectorProvider.class, - Names.named(CUSTOM_NAMESPACE) - ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); - } + binder -> JsonConfigProvider.bind( + binder, + CUSTOM_NAMESPACE, + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + ) ).withProperties(properties); Injector injector = startupInjectorBuilder.build(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java index 8d0c6b50b316..eba6545272d4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java @@ -70,13 +70,12 @@ public GoogleExportStorageProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = googleExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export."); + final String exportConfigTempDir = googleExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = googleExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +88,7 @@ public StorageConnector get() final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, googleExportConfig.getChunkSize(), googleExportConfig.getMaxRetry() ); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index c9c78151ae99..4e42c6d274af 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -37,6 +37,7 @@ public class GoogleOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -58,7 +59,7 @@ public class GoogleOutputConfig public GoogleOutputConfig( final String bucket, final String prefix, - final File tempDir, + @Nullable final File tempDir, @Nullable final HumanReadableBytes chunkSize, @Nullable final Integer maxRetry ) @@ -82,6 +83,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -97,6 +99,11 @@ public Integer getMaxRetry() return maxRetry; } + public GoogleOutputConfig withTempDir(File tempDir) + { + return new GoogleOutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 49856a9c1eff..ac5c6a3c9642 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -47,7 +47,7 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement public GoogleStorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -56,8 +56,9 @@ public GoogleStorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { - return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig); + GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java index e40846a848d7..8b9c8691849c 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java @@ -21,14 +21,18 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnector; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.List; public class GoogleExportStorageProviderTest { + private final File tempDir = FileUtils.createTempDir(); + private final List validPrefixes = ImmutableList.of( "gs://bucket-name/validPath1", "gs://bucket-name/validPath2" @@ -39,7 +43,7 @@ public void testGoogleExportStorageProvider() { GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1"); googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes); - StorageConnector storageConnector = googleExportStorageProvider.get(); + StorageConnector storageConnector = googleExportStorageProvider.createStorageConnector(tempDir); Assert.assertNotNull(storageConnector); Assert.assertTrue(storageConnector instanceof GoogleStorageConnector); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java index df6c66e84c3f..a264ead94a13 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java @@ -26,9 +26,8 @@ import com.google.inject.ProvisionException; import com.google.inject.name.Names; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.google.GoogleInputDataConfig; @@ -44,6 +43,7 @@ public class GoogleStorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createGoogleStorageFactoryWithRequiredProperties() @@ -57,7 +57,7 @@ public void createGoogleStorageFactoryWithRequiredProperties() StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider); - Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector); + Assert.assertTrue(googleStorageConnectorProvider.createStorageConnector(tempDir) instanceof GoogleStorageConnector); Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir()); @@ -124,10 +124,6 @@ private StorageConnectorProvider getStorageConnectorProvider(Properties properti StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } ).withProperties(properties); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 687660ba750d..7f5e4a0a2b09 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -35,6 +35,8 @@ import org.apache.druid.query.Query; import org.apache.druid.server.DruidNode; +import java.io.File; + /** * Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own * implementations. @@ -104,6 +106,14 @@ WorkerManager newWorkerManager( WorkerFailureListener workerFailureListener ); + /** + * Fetch a directory for temporary outputs + */ + default File taskTempDir() + { + throw new UnsupportedOperationException(); + } + /** * Client for communicating with workers. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 0043aaff294e..0615ba802bf2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -573,7 +573,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) final QueryDefinition queryDef = makeQueryDefinition( context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), querySpec, - context.jsonMapper(), + context, resultsContext ); @@ -1566,26 +1566,7 @@ private void handleQueryResults( } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); - - final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); - //noinspection unchecked - - - Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); - if (!(resultObjectForStage instanceof List)) { - // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. - log.warn("Was unable to create manifest file due to "); - return; - } - @SuppressWarnings("unchecked") - List exportedFiles = (List) queryKernel.getResultObjectForStage(finalStageId); - log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size()); - exportMetadataManager.writeMetadata(exportedFiles); - } else if (MSQControllerTask.isExport(querySpec)) { - // Write manifest file. - ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); + ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir()); final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked @@ -1734,10 +1715,11 @@ private void cleanUpDurableStorageIfNeeded() private static QueryDefinition makeQueryDefinition( final QueryKitSpec queryKitSpec, final MSQSpec querySpec, - final ObjectMapper jsonMapper, + final ControllerContext controllerContext, final ResultsContext resultsContext ) { + final ObjectMapper jsonMapper = controllerContext.jsonMapper(); final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; @@ -1855,7 +1837,7 @@ private static QueryDefinition makeQueryDefinition( try { // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export. - Iterator filesIterator = exportStorageProvider.get().listDir(""); + Iterator filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir(""); if (filesIterator.hasNext()) { throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.RUNTIME_FAILURE) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java index 3b9d0296de5a..00c5d7799de3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.storage.StorageConnector; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -42,15 +43,17 @@ public class ExportMetadataManager public static final int MANIFEST_FILE_VERSION = 1; private static final Logger log = new Logger(ExportMetadataManager.class); private final ExportStorageProvider exportStorageProvider; + private final File tmpDir; - public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider, final File tmpDir) { this.exportStorageProvider = exportStorageProvider; + this.tmpDir = tmpDir; } public void writeMetadata(List exportedFiles) throws IOException { - final StorageConnector storageConnector = exportStorageProvider.get(); + final StorageConnector storageConnector = exportStorageProvider.createStorageConnector(tmpDir); log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath()); if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 7139377495a8..55d60a6473e1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -27,14 +27,12 @@ import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; import org.apache.druid.storage.NilStorageConnector; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorProvider; import java.util.List; @@ -84,10 +82,6 @@ public void configure(Binder binder) MultiStageQuery.class ); - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)) - .in(LazySingleton.class); - if (nodeRoles.contains(NodeRole.OVERLORD)) { JsonConfigProvider.bind( binder, @@ -99,11 +93,9 @@ public void configure(Binder binder) .addBinding() .to(DurableStorageCleaner.class); } - } else if (nodeRoles.contains(NodeRole.BROKER)) { - // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance()); } else { - // do nothing + // bind with nil implementation so that configs are not required during service startups. + binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance()); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index ca93c673a4b9..a4d7778a841a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -40,6 +40,7 @@ import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.exec.WorkerManager; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.error.MSQException; @@ -59,7 +60,10 @@ import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.server.DruidNode; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; +import java.io.File; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -91,12 +95,16 @@ public IndexerControllerContext( { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); IndexTaskUtils.setTaskDimensions(metricBuilder, task); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } @Override @@ -212,6 +220,12 @@ public WorkerManager newWorkerManager( ); } + @Override + public File taskTempDir() + { + return toolbox.getIndexingTmpDir(); + } + @Override public QueryKitSpec makeQueryKitSpec( final QueryKit> queryKit, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index fbb0bff95563..a26eded43221 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -43,6 +43,7 @@ import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.IndexerControllerClient; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.client.WorkerChatHandler; @@ -61,6 +62,8 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.server.DruidNode; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.io.File; import java.util.concurrent.ExecutorService; @@ -106,7 +109,6 @@ public IndexerWorkerContext( { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.overlordClient = overlordClient; this.indexIO = indexIO; this.dataSegmentProvider = dataSegmentProvider; @@ -121,6 +123,11 @@ public IndexerWorkerContext( IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES ); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } public static IndexerWorkerContext createProductionInstance( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java index 630499bdd876..6b45273fa72f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.util.HashSet; import java.util.Iterator; @@ -55,12 +56,12 @@ public class DurableStorageCleaner implements OverlordDuty @Inject public DurableStorageCleaner( final DurableStorageCleanerConfig config, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, @JacksonInject final Provider taskMasterProvider ) { this.config = config; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.taskMasterProvider = taskMasterProvider; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java index fe2598a95141..d4ba2120c9c3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java @@ -160,7 +160,7 @@ public ProcessorsAndChannels makeProcessors( readableInput.getChannel(), exportFormat, readableInput.getChannelFrameReader(), - exportStorageProvider.get(), + exportStorageProvider.createStorageConnector(frameContext.tempDir()), frameContext.jsonMapper(), channelCounter, getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index f56622804786..c92bfa955fb6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlResource; import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.servlet.http.HttpServletRequest; @@ -134,14 +135,14 @@ public SqlStatementResource( final @MultiStageQuery SqlStatementFactory msqSqlStatementFactory, final ObjectMapper jsonMapper, final OverlordClient overlordClient, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, final AuthorizerMapper authorizerMapper ) { this.msqSqlStatementFactory = msqSqlStatementFactory; this.jsonMapper = jsonMapper; this.overlordClient = overlordClient; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.authorizerMapper = authorizerMapper; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java index 3da4adcbc3f5..0ddf807d7075 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.duty.DutySchedule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; +import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; import org.easymock.Capture; import org.easymock.EasyMock; @@ -59,7 +60,7 @@ public void setUp() durableStorageCleanerConfig.enabled = true; durableStorageCleaner = new DurableStorageCleaner( durableStorageCleanerConfig, - STORAGE_CONNECTOR, + s -> STORAGE_CONNECTOR, () -> TASK_MASTER ); } @@ -126,7 +127,7 @@ public void testGetSchedule() DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig(); cleanerConfig.delaySeconds = 10L; cleanerConfig.enabled = true; - DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null); + DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, (temp) -> NilStorageConnector.getInstance(), null); DutySchedule schedule = durableStorageCleaner.getSchedule(); Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 8de80cf109f8..7ac389b98037 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -22,12 +22,16 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.inject.Injector; +import com.google.inject.Key; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.rpc.ServiceLocation; import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.storage.NilStorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -45,6 +49,8 @@ public void setup() final Injector injectorMock = Mockito.mock(Injector.class); Mockito.when(injectorMock.getInstance(SegmentCacheManagerFactory.class)) .thenReturn(Mockito.mock(SegmentCacheManagerFactory.class)); + Mockito.when(injectorMock.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))) + .thenReturn(defaultTempDir -> NilStorageConnector.getInstance()); final MSQWorkerTask task = Mockito.mock(MSQWorkerTask.class, Mockito.withSettings().strictness(Strictness.STRICT_STUBS)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index cef3e00daa2d..656086cef434 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -54,7 +54,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -76,7 +75,7 @@ public void init() sqlStatementFactory, objectMapper, indexingServiceClient, - localFileStorageConnector, + s -> localFileStorageConnector, authorizerMapper ); } @@ -330,7 +329,7 @@ public void durableStorageDisabledTest() sqlStatementFactory, objectMapper, indexingServiceClient, - NilStorageConnector.getInstance(), + s -> NilStorageConnector.getInstance(), authorizerMapper ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index a79edee53b8f..40dcb303b1dc 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -81,7 +81,6 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlResourceTest; -import org.apache.druid.storage.local.LocalFileStorageConnector; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; @@ -685,7 +684,7 @@ private static AuthenticationResult makeAuthResultForUser(String user) } @BeforeEach - public void init() throws Exception + public void init() { overlordClient = Mockito.mock(OverlordClient.class); setupMocks(overlordClient); @@ -693,7 +692,7 @@ public void init() throws Exception sqlStatementFactory, objectMapper, overlordClient, - new LocalFileStorageConnector(newTempFolder("local")), + tempDir -> localFileStorageConnector, authorizerMapper ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 4dadeae5bc10..22c7cff88477 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; @@ -72,6 +73,7 @@ import org.mockito.Mockito; import javax.annotation.Nullable; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -92,6 +94,7 @@ public class MSQTestControllerContext implements ControllerContext NUM_WORKERS, "MultiStageQuery-test-controller-client" )); + private final File tempDir = FileUtils.createTempDir(); private final CoordinatorClient coordinatorClient; private final DruidNode node = new DruidNode( "controller", @@ -360,6 +363,12 @@ public WorkerManager newWorkerManager( ); } + @Override + public File taskTempDir() + { + return tempDir; + } + @Override public void registerController(Controller controller, Closer closer) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java index d5477c2998e0..fec5776aedc3 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java @@ -39,7 +39,7 @@ public class S3ExportConfig @JsonCreator public S3ExportConfig( - @JsonProperty("tempLocalDir") final String tempLocalDir, + @JsonProperty("tempLocalDir") @Nullable final String tempLocalDir, @JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize, @JsonProperty("maxRetry") @Nullable final Integer maxRetry, @JsonProperty("allowedExportPaths") final List allowedExportPaths) @@ -50,6 +50,7 @@ public S3ExportConfig( this.allowedExportPaths = allowedExportPaths; } + @Nullable public String getTempLocalDir() { return tempLocalDir; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index ca599fc9d495..129622dfbd77 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -69,14 +69,14 @@ public S3ExportStorageProvider( this.prefix = prefix; } + @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = s3ExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.s3.tempLocalDir` must be configured for S3 export."); + final String exportConfigTempDir = s3ExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = s3ExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +89,7 @@ public StorageConnector get() final S3OutputConfig s3OutputConfig = new S3OutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, s3ExportConfig.getChunkSize(), s3ExportConfig.getMaxRetry() ); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java index 35e228f7ef3e..cacbe4272e6d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java @@ -40,6 +40,7 @@ public class S3OutputConfig @JsonProperty private String prefix; + @Nullable @JsonProperty private File tempDir; @@ -57,7 +58,7 @@ public class S3OutputConfig public S3OutputConfig( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -69,6 +70,7 @@ public S3OutputConfig( protected S3OutputConfig( String bucket, String prefix, + @Nullable File tempDir, @Nullable HumanReadableBytes chunkSize, @@ -120,6 +122,7 @@ public String getPrefix() return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -135,6 +138,11 @@ public int getMaxRetry() return maxRetry; } + public S3OutputConfig withTempDir(File tempDir) + { + return new S3OutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private static void validateChunkSize(long chunkSize) { if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index f86aee9a1aaf..57f131746f0d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -30,6 +30,7 @@ import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import javax.annotation.Nullable; import java.io.File; @JsonTypeName(S3StorageDruidModule.SCHEME) @@ -45,7 +46,7 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag public S3StorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -54,8 +55,9 @@ public S3StorageConnectorProvider( } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { - return new S3StorageConnector(this, s3, s3UploadManager); + S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + return new S3StorageConnector(config, s3, s3UploadManager); } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index a880d6f2efa5..3210a26cc584 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -29,12 +29,11 @@ import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSModule; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.s3.output.S3ExportConfig; @@ -54,6 +53,7 @@ public class S3StorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createS3StorageFactoryWithRequiredProperties() @@ -67,7 +67,7 @@ public void createS3StorageFactoryWithRequiredProperties() StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(s3StorageConnectorProvider instanceof S3StorageConnectorProvider); - Assert.assertTrue(s3StorageConnectorProvider.get() instanceof S3StorageConnector); + Assert.assertTrue(s3StorageConnectorProvider.createStorageConnector(tempDir) instanceof S3StorageConnector); Assert.assertEquals("bucket", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((S3StorageConnectorProvider) s3StorageConnectorProvider).getTempDir()); @@ -115,9 +115,9 @@ public void createS3StorageFactoryWithMissingTempDir() properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); Assert.assertThrows( - "Missing required creator property 'tempDir'", - ProvisionException.class, - () -> getStorageConnectorProvider(properties) + "tempDir is null in s3 config", + NullPointerException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null) ); } @@ -138,10 +138,6 @@ public void configure(Binder binder) StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } } ).withProperties(properties); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java index 72ea888615fc..3e2476537136 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java @@ -97,20 +97,6 @@ public void noBucket() throws JsonProcessingException MAPPER.readValue(json, S3OutputConfig.class); } - @Test - public void noTempDir() throws JsonProcessingException - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"bucket\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - expectedException.expect(MismatchedInputException.class); - expectedException.expectMessage("Missing required creator property 'tempDir'"); - MAPPER.readValue(json, S3OutputConfig.class); - } - @Test public void leastArguments() throws JsonProcessingException { diff --git a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java index 173544e2f8db..d19b2f5f7e1b 100644 --- a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java @@ -20,10 +20,11 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; + +import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface ExportStorageProvider extends Provider +public interface ExportStorageProvider { String getResourceType(); @@ -33,4 +34,6 @@ public interface ExportStorageProvider extends Provider String getBasePath(); String getFilePathForManifest(String fileName); + + StorageConnector createStorageConnector(File taskTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java index 9fece71eab8e..a2023e6aa946 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java @@ -20,9 +20,20 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; + +import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface StorageConnectorProvider extends Provider +public interface StorageConnectorProvider { + /** + * Returns the storage connector. Takes a parameter defaultTempDir to be possibly used as the temporary directory, if the + * storage connector requires one. This StorageConnectorProvider is not guaranteed to use this value, even if the + * StorageConnectorProvider requires one, as it gives priority to a value of defaultTempDir configured as a runtime + * configuration. + *
+ * This value needs to be passed instead of injected by Jackson as the default temporary directory is dependent on the + * task id, and such dynamic task specific bindings is not possible on indexers. + */ + StorageConnector createStorageConnector(File defaultTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java index 74b099aef885..8755b9373298 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java @@ -54,7 +54,7 @@ public LocalFileExportStorageProvider(@JsonProperty(value = "exportPath", requir } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath); try { diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java index 82d1623f8404..7c66f7906275 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java @@ -45,7 +45,7 @@ public LocalFileStorageConnectorProvider(@JsonProperty(value = "basePath", requi } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { try { return new LocalFileStorageConnector(basePath); diff --git a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java index df9a88d4813e..46cc01760eb3 100644 --- a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java +++ b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java @@ -46,8 +46,9 @@ public class StorageConnectorModuleTest public void testJsonSerde() throws JsonProcessingException { StorageConnectorProvider storageConnectorProvider = objectMapper.readValue(JSON, StorageConnectorProvider.class); - Assert.assertTrue(storageConnectorProvider.get() instanceof LocalFileStorageConnector); - Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnectorProvider.get()).getBasePath()); + StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(new File("/tmp/tmpDir")); + Assert.assertTrue(storageConnector instanceof LocalFileStorageConnector); + Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnector).getBasePath()); } diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java index 4bc8886f9998..d6eec86723b4 100644 --- a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java +++ b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java @@ -49,14 +49,14 @@ public class LocalFileStorageConnectorTest @Rule public ExpectedException expectedException = ExpectedException.none(); - private File tempDir; + private File storageDir; private StorageConnector storageConnector; @Before public void init() throws IOException { - tempDir = temporaryFolder.newFolder(); - storageConnector = new LocalFileStorageConnectorProvider(tempDir).get(); + storageDir = temporaryFolder.newFolder(); + storageConnector = new LocalFileStorageConnectorProvider(storageDir).createStorageConnector(null); } @Test @@ -69,14 +69,14 @@ public void sanityCheck() throws IOException // check if file is created Assert.assertTrue(storageConnector.pathExists(uuid)); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), uuid).exists()); // check contents checkContents(uuid); // delete file storageConnector.deleteFile(uuid); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid).exists()); } @Test @@ -96,14 +96,14 @@ public void deleteRecursivelyTest() throws IOException checkContents(uuid1); checkContents(uuid2); - File baseFile = new File(tempDir.getAbsolutePath(), uuid_base); + File baseFile = new File(storageDir.getAbsolutePath(), uuid_base); Assert.assertTrue(baseFile.exists()); Assert.assertTrue(baseFile.isDirectory()); Assert.assertEquals(2, baseFile.listFiles().length); storageConnector.deleteRecursively(uuid_base); Assert.assertFalse(baseFile.exists()); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), topLevelDir).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), topLevelDir).exists()); } @Test @@ -118,8 +118,8 @@ public void batchDelete() throws IOException // delete file storageConnector.deleteFiles(ImmutableList.of(uuid1, uuid2)); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid1).exists()); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid2).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid1).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid2).exists()); } @Test @@ -128,7 +128,7 @@ public void incorrectBasePath() throws IOException File file = temporaryFolder.newFile(); expectedException.expect(IAE.class); StorageConnectorProvider storageConnectorProvider = new LocalFileStorageConnectorProvider(file); - storageConnectorProvider.get(); + storageConnectorProvider.createStorageConnector(null); } @Test