Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ The following runtime parameters must be configured to export into an S3 destina

| Runtime Parameter | Required | Description | Default |
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a |
| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a |
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |

Expand Down Expand Up @@ -186,12 +186,12 @@ Supported arguments for the function:

The following runtime parameters must be configured to export into a GCS destination:

| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |
| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |

##### LOCAL

Expand Down Expand Up @@ -531,7 +531,7 @@ Common properties to configure the behavior of durable storage
|--|--|--|
|`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false |
|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a |
|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a |
|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data. If the property is not configured on the indexer or middle manager, it defaults to using the task temporary directory. | n/a |
|`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
|`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class AzureOutputConfig
@JsonProperty
private final String prefix;

@Nullable
@JsonProperty
private final File tempDir;

Expand All @@ -64,7 +65,7 @@ public class AzureOutputConfig
public AzureOutputConfig(
@JsonProperty(value = "container", required = true) String container,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
Expand All @@ -77,7 +78,6 @@ public AzureOutputConfig(
validateFields();
}


public String getContainer()
{
return container;
Expand All @@ -88,6 +88,7 @@ public String getPrefix()
return prefix;
}

@Nullable
public File getTempDir()
{
return tempDir;
Expand All @@ -103,6 +104,11 @@ public int getMaxRetry()
return maxRetry;
}

public AzureOutputConfig withTempDir(File tempDir)
{
return new AzureOutputConfig(container, prefix, tempDir, chunkSize, maxRetry);
}

private void validateFields()
{
if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) {
Expand All @@ -113,6 +119,13 @@ private void validateFields()
AZURE_MAX_CHUNK_SIZE_BYTES
);
}
}

public void validateTempDirectory()
{
if (tempDir == null) {
throw DruidException.defensive("The runtime property `druid.msq.intermediate.storage.tempDir` must be configured.");
}

try {
FileUtils.mkdirp(tempDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements
public AzureStorageConnectorProvider(
@JsonProperty(value = "container", required = true) String container,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
Expand All @@ -54,8 +54,10 @@ public AzureStorageConnectorProvider(
}

@Override
public StorageConnector get()
public StorageConnector createStorageConnector(final File defaultTempDir)
{
return new AzureStorageConnector(this, azureStorage);
AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this;
config.validateTempDirectory();
return new AzureStorageConnector(config, azureStorage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ public void testTempDirectoryNotWritable(@TempDir File tempDir)
throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName());
}

//noinspection ResultOfObjectAllocationIgnored
assertThrows(
DruidException.class,
() -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT)
() -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT).validateTempDirectory()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,6 @@ public void noContainer()
assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
}

@Test
public void noTempDir()
{
String json = jsonStringReadyForAssert("{\n"
+ " \"prefix\": \"abc\",\n"
+ " \"container\": \"TEST\",\n"
+ " \"chunkSize\":104857600,\n"
+ " \"maxRetry\": 2\n"
+ "}\n");
assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
}

@Test
public void leastArguments() throws JsonProcessingException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import com.google.inject.Key;
import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.azure.AzureStorage;
Expand Down Expand Up @@ -59,7 +58,7 @@ public void createAzureStorageFactoryWithRequiredProperties()
StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties);

assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get());
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.createStorageConnector(new File("/tmp")));
assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix());
assertEquals(new File("/tmp"),
Expand Down Expand Up @@ -107,30 +106,36 @@ public void createAzureStorageFactoryWithMissingTempDir()
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");

assertThrows(
ProvisionException.class,
() -> getStorageConnectorProvider(properties),
"Missing required creator property 'tempDir'"
DruidException.class,
() -> getStorageConnectorProvider(properties).createStorageConnector(null),
"The runtime property `druid.msq.intermediate.storage.tempDir` must be configured."
);
}

@Test
public void createAzureStorageFactoryWithMissingTempDirButProvidedDuringRuntime()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");

getStorageConnectorProvider(properties).createStorageConnector(new File("/tmp"));
}

private StorageConnectorProvider getStorageConnectorProvider(Properties properties)
{
StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add(
new AzureStorageDruidModule(),
new StorageConnectorModule(),
new AzureStorageConnectorModule(),
binder -> {
JsonConfigProvider.bind(
binder,
CUSTOM_NAMESPACE,
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
);

binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
.toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
.in(LazySingleton.class);
}
binder -> JsonConfigProvider.bind(
binder,
CUSTOM_NAMESPACE,
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
)
).withProperties(properties);

Injector injector = startupInjectorBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ public GoogleExportStorageProvider(
}

@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File taskTempDir)
{
final String tempDir = googleExportConfig.getTempLocalDir();
if (tempDir == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export.");
final String exportConfigTempDir = googleExportConfig.getTempLocalDir();
final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir;
if (tempDirFile == null) {
throw DruidException.defensive("Couldn't find temporary directory for export.");
}
final List<String> allowedExportPaths = googleExportConfig.getAllowedExportPaths();
if (allowedExportPaths == null) {
Expand All @@ -89,7 +88,7 @@ public StorageConnector get()
final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig(
bucket,
prefix,
new File(tempDir),
tempDirFile,
googleExportConfig.getChunkSize(),
googleExportConfig.getMaxRetry()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class GoogleOutputConfig
@JsonProperty
private final String prefix;

@Nullable
@JsonProperty
private final File tempDir;

Expand All @@ -58,7 +59,7 @@ public class GoogleOutputConfig
public GoogleOutputConfig(
final String bucket,
final String prefix,
final File tempDir,
@Nullable final File tempDir,
@Nullable final HumanReadableBytes chunkSize,
@Nullable final Integer maxRetry
)
Expand All @@ -82,6 +83,7 @@ public String getPrefix()
return prefix;
}

@Nullable
public File getTempDir()
{
return tempDir;
Expand All @@ -97,6 +99,11 @@ public Integer getMaxRetry()
return maxRetry;
}

public GoogleOutputConfig withTempDir(File tempDir)
{
return new GoogleOutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry);
}

private void validateFields()
{
if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement
public GoogleStorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
Expand All @@ -56,8 +56,9 @@ public GoogleStorageConnectorProvider(
}

@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File defaultTempDir)
{
return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig);
GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this;
return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@

import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.storage.StorageConnector;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.util.List;

public class GoogleExportStorageProviderTest
{
private final File tempDir = FileUtils.createTempDir();

private final List<String> validPrefixes = ImmutableList.of(
"gs://bucket-name/validPath1",
"gs://bucket-name/validPath2"
Expand All @@ -39,7 +43,7 @@ public void testGoogleExportStorageProvider()
{
GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1");
googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes);
StorageConnector storageConnector = googleExportStorageProvider.get();
StorageConnector storageConnector = googleExportStorageProvider.createStorageConnector(tempDir);
Assert.assertNotNull(storageConnector);
Assert.assertTrue(storageConnector instanceof GoogleStorageConnector);

Expand Down
Loading