diff --git a/.travis.yml b/.travis.yml index 4211f1897b7b..15b38d8fc016 100644 --- a/.travis.yml +++ b/.travis.yml @@ -451,6 +451,14 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with Indexer" env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - <<: *integration_perfect_rollup_parallel_batch_index + name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with deep storage as intermediate store" + env: TESTNG_GROUPS='-Dgroups=shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + + - <<: *integration_perfect_rollup_parallel_batch_index + name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with deep storage as intermediate store with indexer" + env: TESTNG_GROUPS='-Dgroups=shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_kafka_index name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test" stage: Tests - phase 2 @@ -597,13 +605,13 @@ jobs: stage: Tests - phase 2 jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer" - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests" diff --git a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java index 98c75505c05f..ce93bb3662b3 100644 --- a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java +++ b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java @@ -68,6 +68,11 @@ public interface DataSegmentPusher */ DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException; + default DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException + { + throw new UnsupportedOperationException("not supported"); + } + //use map instead of LoadSpec class to avoid dependency pollution. Map makeLoadSpec(URI finalIndexZipFilePath); diff --git a/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java index 1af11acd2849..44ae9686a911 100644 --- a/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java +++ b/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java @@ -50,6 +50,12 @@ public DataSegment push(File file, DataSegment segment, boolean replaceExisting) return segment; } + @Override + public DataSegment pushToPath(File file, DataSegment segment, String storageDirSuffix) + { + return segment; + } + @Override public Map makeLoadSpec(URI uri) { diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b1869ac51784..ed96e2a8862c 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1298,7 +1298,7 @@ Processing properties set on the Middlemanager will be passed through to Peons. |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| -|`druid.processing.intermediaryData.storage.type`|Storage type for storing intermediary segments of data shuffle between native parallel index tasks. Current choice are only "local" which stores segment files in local storage of Middle Managers (or Indexer).|local| +|`druid.processing.intermediaryData.storage.type`|Storage type for storing intermediary segments of data shuffle between native parallel index tasks. Current choices are "local" which stores segment files in local storage of Middle Managers (or Indexer) or "deepstore" which uses configured deep storage. Note - With "deepstore" type data is stored in `shuffle-data` directory under the configured deep storage path, auto clean up for this directory is not supported yet. One can setup cloud storage lifecycle rules for auto clean up of data at `shuffle-data` prefix location.|local| The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java index 3f2fd2fde62d..b824232c6538 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java +++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java @@ -77,8 +77,13 @@ public List getAllowedPropertyPrefixesForHadoop() public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath) throws IOException { - final String path = OssUtils.constructSegmentPath(config.getPrefix(), getStorageDir(inSegment, useUniquePath)); + return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath)); + } + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) throws IOException + { + final String path = OssUtils.constructSegmentPath(config.getPrefix(), storageDirSuffix); log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), path); final File zipOutFile = File.createTempFile("druid", "index.zip"); diff --git a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java index c61443f924a9..b96902c33650 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -77,11 +77,16 @@ public String getPathForHadoop(String dataSource) public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean useUniquePath) throws IOException { log.info("Writing [%s] to C*", indexFilesDir); + return pushToPath(indexFilesDir, segment, this.getStorageDir(segment, useUniquePath)); + } + + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException + { String key = JOINER.join( config.getKeyspace().isEmpty() ? null : config.getKeyspace(), - this.getStorageDir(segment, useUniquePath) - ); - + storageDirSuffix + ); // Create index final File compressedIndexFile = File.createTempFile("druid", "index.zip"); long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile); diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index 42fe23f94318..22ca44473b3a 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -70,12 +70,17 @@ public String getPathForHadoop(final String dataSource) @Override public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath) + { + return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath)); + } + + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) { final String segmentPath = CloudFilesUtils.buildCloudFilesPath( this.config.getBasePath(), - getStorageDir(inSegment, useUniquePath) - ); - + storageDirSuffix + ); File descriptorFile = null; File zipOutFile = null; diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java index 461b107b160a..1e46239fd21a 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java @@ -91,10 +91,7 @@ public String getPathForHadoop() @Override public String getStorageDir(DataSegment dataSegment, boolean useUniquePath) { - String prefix = segmentConfig.getPrefix(); - boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix); String seg = JOINER.join( - prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix), dataSegment.getDataSource(), StringUtils.format( "%s_%s", @@ -107,7 +104,7 @@ public String getStorageDir(DataSegment dataSegment, boolean useUniquePath) useUniquePath ? DataSegmentPusher.generateUniquePath() : null ); - log.info("DataSegment: [%s]", seg); + log.info("DataSegment Suffix: [%s]", seg); // Replace colons with underscores, since they are not supported through wasb:// prefix return seg; @@ -124,6 +121,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin throws IOException { log.info("Uploading [%s] to Azure.", indexFilesDir); + final String azurePathSuffix = getAzurePath(segment, useUniquePath); + return pushToPath(indexFilesDir, segment, azurePathSuffix); + } + + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException + { + String prefix = segmentConfig.getPrefix(); + boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix); + final String azurePath = JOINER.join( + prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix), + storageDirSuffix + ); final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir); File zipOutFile = null; @@ -132,8 +142,6 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin final File outFile = zipOutFile = File.createTempFile("index", ".zip"); final long size = CompressionUtils.zip(indexFilesDir, zipOutFile); - final String azurePath = getAzurePath(segment, useUniquePath); - return AzureUtils.retryAzureOperation( () -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath), accountConfig.getMaxTries() diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java index 8024d6763ff5..f3e65c7923f5 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -145,7 +145,11 @@ public void test_push_nonUniquePathWithPrefix_succeeds() throws Exception Files.write(DATA, tmp); String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); - azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath)); + azureStorage.uploadBlob( + EasyMock.anyObject(File.class), + EasyMock.eq(CONTAINER_NAME), + EasyMock.eq(PREFIX + "/" + azurePath) + ); EasyMock.expectLastCall(); replayAll(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java index 35c763f2be3a..e352926de7d4 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java @@ -104,15 +104,20 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin throws IOException { log.debug("Uploading [%s] to Google.", indexFilesDir); + final String storageDir = this.getStorageDir(segment, useUniquePath); + return pushToPath(indexFilesDir, segment, storageDir); + } + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException + { final int version = SegmentUtils.getVersionFromDir(indexFilesDir); File indexFile = null; try { indexFile = File.createTempFile("index", ".zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile); - final String storageDir = this.getStorageDir(segment, useUniquePath); - final String indexPath = buildPath(storageDir + "/" + "index.zip"); + final String indexPath = buildPath(storageDirSuffix + "/" + "index.zip"); final DataSegment outSegment = segment .withSize(indexSize) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index 0354e3c34688..e262b40da707 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -105,13 +105,30 @@ public DataSegment push(final File inDir, final DataSegment segment, final boole // '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths. final String storageDir = this.getStorageDir(segment, false); + + final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; + final String outIndexFilePathSuffix = StringUtils.format( + "%s/%d_%sindex.zip", + storageDir, + segment.getShardSpec().getPartitionNum(), + uniquePrefix + ); + + return pushToPath(inDir, segment, outIndexFilePathSuffix); + } + + @Override + public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException + { log.debug( "Copying segment[%s] to HDFS at location[%s/%s]", segment.getId(), fullyQualifiedStorageDirectory.get(), - storageDir + storageDirSuffix ); + final String storageDir = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix); + Path tmpIndexFile = new Path(StringUtils.format( "%s/%s/%s/%s_index.zip", fullyQualifiedStorageDirectory.get(), @@ -130,16 +147,7 @@ public DataSegment push(final File inDir, final DataSegment segment, final boole try (FSDataOutputStream out = fs.create(tmpIndexFile)) { size = CompressionUtils.zip(inDir, out); } - - final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; - final Path outIndexFile = new Path(StringUtils.format( - "%s/%s/%d_%sindex.zip", - fullyQualifiedStorageDirectory.get(), - storageDir, - segment.getShardSpec().getPartitionNum(), - uniquePrefix - )); - + final Path outIndexFile = new Path(storageDir); dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java index 17dac631a117..da4b27ef7056 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java @@ -79,8 +79,13 @@ public List getAllowedPropertyPrefixesForHadoop() public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath) throws IOException { - final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment, useUniquePath)); + return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath)); + } + @Override + public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) throws IOException + { + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), storageDirSuffix); log.debug("Copying segment[%s] to S3 at location[%s]", inSegment.getId(), s3Path); final File zipOutFile = File.createTempFile("druid", "index.zip"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java new file mode 100644 index 000000000000..f845b74f6e42 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.timeline.partition.BuildingShardSpec; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents the intermediary deep storage location where the partition of {@code interval} and {@code shardSpec} + * is stored. + */ +public class DeepStoragePartitionLocation implements PartitionLocation +{ + private final String subTaskId; + private final Interval interval; + private final BuildingShardSpec shardSpec; + private final Map loadSpec; + + @JsonCreator + public DeepStoragePartitionLocation( + @JsonProperty("subTaskId") String subTaskId, + @JsonProperty("interval") Interval interval, + @JsonProperty("shardSpec") BuildingShardSpec shardSpec, + @JsonProperty("loadSpec") Map loadSpec + ) + { + this.subTaskId = subTaskId; + this.interval = interval; + this.shardSpec = shardSpec; + this.loadSpec = loadSpec; + } + + @JsonIgnore + @Override + public int getBucketId() + { + return shardSpec.getBucketId(); + } + + @JsonProperty + @Override + public Interval getInterval() + { + return interval; + } + + @JsonProperty + @Override + public BuildingShardSpec getShardSpec() + { + return shardSpec; + } + + @JsonProperty + @Override + public String getSubTaskId() + { + return subTaskId; + } + + @JsonProperty + public Map getLoadSpec() + { + return loadSpec; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeepStoragePartitionLocation that = (DeepStoragePartitionLocation) o; + return subTaskId.equals(that.subTaskId) + && interval.equals(that.interval) + && shardSpec.equals(that.shardSpec) + && loadSpec.equals(that.loadSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(subTaskId, interval, shardSpec, loadSpec); + } + + @Override + public String toString() + { + return "DeepStoragePartitionLocation{" + + "subTaskId='" + subTaskId + '\'' + + ", interval=" + interval + + ", shardSpec=" + shardSpec + + ", loadSpec=" + loadSpec + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java new file mode 100644 index 000000000000..7d780437a8ac --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.timeline.partition.BucketNumberedShardSpec; +import org.apache.druid.timeline.partition.BuildingShardSpec; +import org.joda.time.Interval; + +import java.util.Map; +import java.util.Objects; + +/** + * Similar to {@link GenericPartitionStat} but contains information about deep storage location where it is stored + */ +public class DeepStoragePartitionStat implements PartitionStat +{ + public static final String TYPE = "deepstore"; + static final String PROP_SHARD_SPEC = "shardSpec"; + private final Map loadSpec; + // Primary partition key + private final Interval interval; + // Secondary partition key + private final BucketNumberedShardSpec shardSpec; + + @JsonCreator + public DeepStoragePartitionStat( + @JsonProperty("interval") Interval interval, + @JsonProperty(PROP_SHARD_SPEC) BucketNumberedShardSpec shardSpec, + @JsonProperty("loadSpec") Map loadSpec + ) + { + this.interval = interval; + this.shardSpec = shardSpec; + this.loadSpec = loadSpec; + } + + @JsonProperty + public Map getLoadSpec() + { + return loadSpec; + } + + @JsonProperty + @Override + public Interval getInterval() + { + return interval; + } + + @JsonProperty(PROP_SHARD_SPEC) + @Override + public BucketNumberedShardSpec getSecondaryPartition() + { + return shardSpec; + } + + @Override + public int getBucketId() + { + return shardSpec.getBucketId(); + } + + @Override + public DeepStoragePartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec secondaryParition) + { + return new DeepStoragePartitionLocation(subtaskId, interval, secondaryParition, loadSpec); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeepStoragePartitionStat that = (DeepStoragePartitionStat) o; + return loadSpec.equals(that.loadSpec) && interval.equals(that.interval) && shardSpec.equals(that.shardSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(loadSpec, interval, shardSpec); + } + + @Override + public String toString() + { + return "DeepStoragePartitionStat{" + + "loadSpec=" + loadSpec + + ", interval=" + interval + + ", shardSpec=" + shardSpec + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java new file mode 100644 index 000000000000..ee906badbdcc --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.commons.io.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.SegmentLoadingException; + +import java.io.File; +import java.io.IOException; + +public class DeepStorageShuffleClient implements ShuffleClient +{ + private static final Logger LOG = new Logger(DeepStorageShuffleClient.class); + private final ObjectMapper objectMapper; + + @Inject + public DeepStorageShuffleClient(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public File fetchSegmentFile(File partitionDir, String supervisorTaskId, DeepStoragePartitionLocation location) + throws IOException + { + final LoadSpec loadSpec = objectMapper.convertValue(location.getLoadSpec(), LoadSpec.class); + final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId())); + FileUtils.forceMkdir(unzippedDir); + try { + loadSpec.loadSegment(unzippedDir); + } + catch (SegmentLoadingException e) { + LOG.error(e, "Failed to load segment"); + throw new IOException(e); + } + return unzippedDir; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index b19a5fc723b4..553cc1d6e8e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -26,16 +26,16 @@ /** * Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is - * collected by {@link ParallelIndexSupervisorTask} and used to generate {@link PartialGenericSegmentMergeIOConfig}. + * collected by {@link ParallelIndexSupervisorTask} and used to generate {@link PartialSegmentMergeIOConfig}. */ -class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport +class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport { public static final String TYPE = "generated_partitions_metadata"; @JsonCreator GeneratedPartitionsMetadataReport( @JsonProperty("taskId") String taskId, - @JsonProperty("partitionStats") List partitionStats + @JsonProperty("partitionStats") List partitionStats ) { super(taskId, partitionStats); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index bfe8cef79c3f..d05d4ddd9436 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -29,12 +29,12 @@ * This report is collected by {@link ParallelIndexSupervisorTask} and * used to generate {@link PartialSegmentMergeIOConfig}. */ -abstract class GeneratedPartitionsReport implements SubTaskReport +public class GeneratedPartitionsReport implements SubTaskReport { private final String taskId; - private final List partitionStats; + private final List partitionStats; - GeneratedPartitionsReport(String taskId, List partitionStats) + GeneratedPartitionsReport(String taskId, List partitionStats) { this.taskId = taskId; this.partitionStats = partitionStats; @@ -48,7 +48,7 @@ public String getTaskId() } @JsonProperty - public List getPartitionStats() + public List getPartitionStats() { return partitionStats; } @@ -72,4 +72,13 @@ public int hashCode() { return Objects.hash(taskId, partitionStats); } + + @Override + public String toString() + { + return "GeneratedPartitionsReport{" + + "taskId='" + taskId + '\'' + + ", partitionStats=" + partitionStats + + '}'; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java index 74c4c1738b0d..c921efef9e4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java @@ -22,15 +22,26 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.timeline.partition.BuildingShardSpec; import org.joda.time.Interval; +import java.net.URI; +import java.util.Objects; + /** * This class represents the intermediary data server where the partition of {@code interval} and {@code shardSpec} * is stored. */ -public class GenericPartitionLocation extends PartitionLocation +public class GenericPartitionLocation implements PartitionLocation { + private final String host; + private final int port; + private final boolean useHttps; + private final String subTaskId; + private final Interval interval; + private final BuildingShardSpec shardSpec; + @JsonCreator public GenericPartitionLocation( @JsonProperty("host") String host, @@ -41,19 +52,111 @@ public GenericPartitionLocation( @JsonProperty("shardSpec") BuildingShardSpec shardSpec ) { - super(host, port, useHttps, subTaskId, interval, shardSpec); + this.host = host; + this.port = port; + this.useHttps = useHttps; + this.subTaskId = subTaskId; + this.interval = interval; + this.shardSpec = shardSpec; + } + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public int getPort() + { + return port; + } + + @JsonProperty + public boolean isUseHttps() + { + return useHttps; + } + + @JsonProperty + @Override + public String getSubTaskId() + { + return subTaskId; + } + + @JsonProperty + @Override + public Interval getInterval() + { + return interval; } @JsonIgnore @Override public int getBucketId() { - return getSecondaryPartition().getBucketId(); + return shardSpec.getBucketId(); } @JsonProperty - BuildingShardSpec getShardSpec() + @Override + public BuildingShardSpec getShardSpec() + { + return shardSpec; + } + + final URI toIntermediaryDataServerURI(String supervisorTaskId) + { + return URI.create( + StringUtils.format( + "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&bucketId=%d", + useHttps ? "https" : "http", + host, + port, + StringUtils.urlEncode(supervisorTaskId), + StringUtils.urlEncode(subTaskId), + interval.getStart(), + interval.getEnd(), + getBucketId() + ) + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericPartitionLocation that = (GenericPartitionLocation) o; + return port == that.port + && useHttps == that.useHttps + && host.equals(that.host) + && subTaskId.equals(that.subTaskId) + && interval.equals(that.interval) + && shardSpec.equals(that.shardSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(host, port, useHttps, subTaskId, interval, shardSpec); + } + + @Override + public String toString() { - return getSecondaryPartition(); + return "GenericPartitionLocation{" + + "host='" + host + '\'' + + ", port=" + port + + ", useHttps=" + useHttps + + ", subTaskId='" + subTaskId + '\'' + + ", interval=" + interval + + ", shardSpec=" + shardSpec + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java index a4ac80bdec04..aada0a250ca4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; +import org.apache.druid.timeline.partition.BuildingShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; @@ -34,10 +35,22 @@ * partition key). The {@link ShardSpec} is later used by {@link PartialGenericSegmentMergeTask} to merge the partial * segments. */ -public class GenericPartitionStat extends PartitionStat +public class GenericPartitionStat implements PartitionStat { - private static final String PROP_SHARD_SPEC = "shardSpec"; + public static final String TYPE = "local"; + static final String PROP_SHARD_SPEC = "shardSpec"; + // Host and port of the task executor + private final String taskExecutorHost; + private final int taskExecutorPort; + private final boolean useHttps; + // Primary partition key + private final Interval interval; + // numRows and sizeBytes are always null currently and will be filled properly in the future. + @Nullable + private final Integer numRows; + @Nullable + private final Long sizeBytes; // Secondary partition key private final BucketNumberedShardSpec shardSpec; @@ -52,10 +65,54 @@ public GenericPartitionStat( @JsonProperty("sizeBytes") @Nullable Long sizeBytes ) { - super(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes); + this.taskExecutorHost = taskExecutorHost; + this.taskExecutorPort = taskExecutorPort; + this.useHttps = useHttps; + this.interval = interval; + this.numRows = numRows == null ? 0 : numRows; + this.sizeBytes = sizeBytes == null ? 0 : sizeBytes; this.shardSpec = shardSpec; } + @JsonProperty + public final String getTaskExecutorHost() + { + return taskExecutorHost; + } + + @JsonProperty + public final int getTaskExecutorPort() + { + return taskExecutorPort; + } + + @JsonProperty + public final boolean isUseHttps() + { + return useHttps; + } + + @JsonProperty + @Override + public final Interval getInterval() + { + return interval; + } + + @Nullable + @JsonProperty + public final Integer getNumRows() + { + return numRows; + } + + @Nullable + @JsonProperty + public final Long getSizeBytes() + { + return sizeBytes; + } + @Override public int getBucketId() { @@ -64,11 +121,24 @@ public int getBucketId() @JsonProperty(PROP_SHARD_SPEC) @Override - BucketNumberedShardSpec getSecondaryPartition() + public BucketNumberedShardSpec getSecondaryPartition() { return shardSpec; } + @Override + public GenericPartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec secondaryParition) + { + return new GenericPartitionLocation( + getTaskExecutorHost(), + getTaskExecutorPort(), + isUseHttps(), + subtaskId, + getInterval(), + secondaryParition + ); + } + @Override public boolean equals(Object o) { @@ -78,16 +148,33 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - if (!super.equals(o)) { - return false; - } GenericPartitionStat that = (GenericPartitionStat) o; - return Objects.equals(shardSpec, that.shardSpec); + return taskExecutorPort == that.taskExecutorPort + && useHttps == that.useHttps + && taskExecutorHost.equals(that.taskExecutorHost) + && interval.equals(that.interval) + && Objects.equals(numRows, that.numRows) + && Objects.equals(sizeBytes, that.sizeBytes) + && shardSpec.equals(that.shardSpec); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), shardSpec); + return Objects.hash(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes, shardSpec); + } + + @Override + public String toString() + { + return "GenericPartitionStat{" + + "taskExecutorHost='" + taskExecutorHost + '\'' + + ", taskExecutorPort=" + taskExecutorPort + + ", useHttps=" + useHttps + + ", interval=" + interval + + ", numRows=" + numRows + + ", sizeBytes=" + sizeBytes + + ", shardSpec=" + shardSpec + + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java index 58c41e61cd0e..13abcaeb79fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java @@ -24,9 +24,11 @@ import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.utils.CompressionUtils; import org.jboss.netty.handler.codec.http.HttpMethod; import java.io.File; @@ -38,8 +40,10 @@ * HTTP-based ShuffleClient. * This class is injected as a lazy singleton instance and thus must be stateless. */ -public class HttpShuffleClient implements ShuffleClient +public class HttpShuffleClient implements ShuffleClient { + private static final Logger LOG = new Logger(HttpShuffleClient.class); + @VisibleForTesting static final int NUM_FETCH_RETRIES = 3; @@ -54,10 +58,10 @@ public HttpShuffleClient(@EscalatedClient HttpClient httpClient) } @Override - public > File fetchSegmentFile( + public File fetchSegmentFile( File partitionDir, String supervisorTaskId, - P location + GenericPartitionLocation location ) throws IOException { // Create a local buffer since this class is not thread-safe. @@ -82,6 +86,16 @@ public > File fetchSegmentFile( NUM_FETCH_RETRIES, StringUtils.format("Failed to fetch file[%s]", uri) ); - return zippedFile; + final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId())); + try { + org.apache.commons.io.FileUtils.forceMkdir(unzippedDir); + CompressionUtils.unzip(zippedFile, unzippedDir); + } + finally { + if (!zippedFile.delete()) { + LOG.warn("Failed to delete temp file[%s]", zippedFile); + } + } + return unzippedDir; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 91cdcc08225f..47e2bd46551e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -351,7 +351,7 @@ PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGene @VisibleForTesting PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner( TaskToolbox toolbox, - List ioConfigs, + List ioConfigs, ParallelIndexIngestionSpec ingestionSchema ) { @@ -675,7 +675,7 @@ private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw // 1. Partial segment generation phase final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse; - ParallelIndexTaskRunner> indexingRunner = + ParallelIndexTaskRunner indexingRunner = createRunner( toolbox, f -> createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards) @@ -688,9 +688,9 @@ private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw // 2. Partial segment merge phase // partition (interval, partitionId) -> partition locations - Map, List> partitionToLocations = + Map, List> partitionToLocations = groupGenericPartitionLocationsPerPartition(indexingRunner.getReports()); - final List ioConfigs = createGenericMergeIOConfigs( + final List ioConfigs = createGenericMergeIOConfigs( ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations ); @@ -747,7 +747,7 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro ); final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse; - ParallelIndexTaskRunner> indexingRunner = + ParallelIndexTaskRunner indexingRunner = createRunner( toolbox, tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions, segmentCreateIngestionSpec) @@ -759,9 +759,9 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro } // partition (interval, partitionId) -> partition locations - Map, List> partitionToLocations = + Map, List> partitionToLocations = groupGenericPartitionLocationsPerPartition(indexingRunner.getReports()); - final List ioConfigs = createGenericMergeIOConfigs( + final List ioConfigs = createGenericMergeIOConfigs( ingestionSchema.getTuningConfig().getTotalNumMergeTasks(), partitionToLocations ); @@ -859,13 +859,13 @@ private PartitionBoundaries determineRangePartition(Collection, List> groupGenericPartitionLocationsPerPartition( - Map> subTaskIdToReport + private static Map, List> groupGenericPartitionLocationsPerPartition( + Map subTaskIdToReport ) { final Map, BuildingShardSpec> intervalAndIntegerToShardSpec = new HashMap<>(); final Object2IntMap intervalToNextPartitionId = new Object2IntOpenHashMap<>(); - final BiFunction createPartitionLocationFunction = + final BiFunction createPartitionLocationFunction = (subtaskId, partitionStat) -> { final BuildingShardSpec shardSpec = intervalAndIntegerToShardSpec.computeIfAbsent( Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()), @@ -879,31 +879,24 @@ private static Map, List> grou return partitionStat.getSecondaryPartition().convert(partitionId); } ); - return new GenericPartitionLocation( - partitionStat.getTaskExecutorHost(), - partitionStat.getTaskExecutorPort(), - partitionStat.isUseHttps(), - subtaskId, - partitionStat.getInterval(), - shardSpec - ); + return partitionStat.toPartitionLocation(subtaskId, shardSpec); }; return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction); } - private static + private static Map, List> groupPartitionLocationsPerPartition( - Map> subTaskIdToReport, - BiFunction createPartitionLocationFunction + Map subTaskIdToReport, + BiFunction createPartitionLocationFunction ) { // partition (interval, partitionId) -> partition locations final Map, List> partitionToLocations = new HashMap<>(); - for (Entry> entry : subTaskIdToReport.entrySet()) { + for (Entry entry : subTaskIdToReport.entrySet()) { final String subTaskId = entry.getKey(); - final GeneratedPartitionsReport report = entry.getValue(); - for (S partitionStat : report.getPartitionStats()) { + final GeneratedPartitionsReport report = entry.getValue(); + for (PartitionStat partitionStat : report.getPartitionStats()) { final List locationsOfSamePartition = partitionToLocations.computeIfAbsent( Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()), k -> new ArrayList<>() @@ -915,15 +908,15 @@ Map, List> groupPartitionLocationsPerPartition( return partitionToLocations; } - private static List createGenericMergeIOConfigs( + private static List createGenericMergeIOConfigs( int totalNumMergeTasks, - Map, List> partitionToLocations + Map, List> partitionToLocations ) { return createMergeIOConfigs( totalNumMergeTasks, partitionToLocations, - PartialGenericSegmentMergeIOConfig::new + PartialSegmentMergeIOConfig::new ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java deleted file mode 100644 index bbec73f9a446..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.batch.parallel; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.segment.indexing.IOConfig; - -import java.util.List; - -@JsonTypeName(PartialGenericSegmentMergeTask.TYPE) -class PartialGenericSegmentMergeIOConfig extends PartialSegmentMergeIOConfig - implements IOConfig -{ - @JsonCreator - PartialGenericSegmentMergeIOConfig( - @JsonProperty("partitionLocations") List partitionLocations - ) - { - super(partitionLocations); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java deleted file mode 100644 index 52edad6e1e91..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.batch.parallel; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.indexing.DataSchema; - -class PartialGenericSegmentMergeIngestionSpec - extends PartialSegmentMergeIngestionSpec -{ - @JsonCreator - PartialGenericSegmentMergeIngestionSpec( - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("ioConfig") PartialGenericSegmentMergeIOConfig ioConfig, - @JsonProperty("tuningConfig") ParallelIndexTuningConfig tuningConfig - ) - { - super(dataSchema, ioConfig, tuningConfig); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java index 4050a011071c..8babf50d8265 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java @@ -37,7 +37,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner private static final String PHASE_NAME = "partial segment merge"; private final DataSchema dataSchema; - private final List mergeIOConfigs; + private final List mergeIOConfigs; PartialGenericSegmentMergeParallelIndexTaskRunner( TaskToolbox toolbox, @@ -45,7 +45,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner String groupId, String baseSubtaskSpecName, DataSchema dataSchema, - List mergeIOConfigs, + List mergeIOConfigs, ParallelIndexTuningConfig tuningConfig, Map context ) @@ -75,9 +75,9 @@ int estimateTotalNumSubTasks() } @VisibleForTesting - SubTaskSpec newTaskSpec(PartialGenericSegmentMergeIOConfig ioConfig) + SubTaskSpec newTaskSpec(PartialSegmentMergeIOConfig ioConfig) { - final PartialGenericSegmentMergeIngestionSpec ingestionSpec = new PartialGenericSegmentMergeIngestionSpec( + final PartialSegmentMergeIngestionSpec ingestionSpec = new PartialSegmentMergeIngestionSpec( dataSchema, ioConfig, getTuningConfig() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java index 5dafc6ee51a5..72f902087857 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java @@ -38,11 +38,11 @@ /** * {@link ParallelIndexTaskRunner} for the phase to merge generic partitioned segments in multi-phase parallel indexing. */ -public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask +public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask { public static final String TYPE = "partial_index_generic_merge"; - private final PartialGenericSegmentMergeIngestionSpec ingestionSchema; + private final PartialSegmentMergeIngestionSpec ingestionSchema; private final Table> intervalAndIntegerToShardSpec; @JsonCreator @@ -55,7 +55,7 @@ public PartialGenericSegmentMergeTask( // subtaskSpecId can be null only for old task versions. @JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId, @JsonProperty("numAttempts") final int numAttempts, // zero-based counting - @JsonProperty("spec") final PartialGenericSegmentMergeIngestionSpec ingestionSchema, + @JsonProperty("spec") final PartialSegmentMergeIngestionSpec ingestionSchema, @JsonProperty("context") final Map context ) { @@ -79,7 +79,7 @@ public PartialGenericSegmentMergeTask( } private static Table> createIntervalAndIntegerToShardSpec( - List partitionLocations + List partitionLocations ) { final Table> intervalAndIntegerToShardSpec = HashBasedTable.create(); @@ -107,7 +107,7 @@ private static Table> createIntervalAndI } @JsonProperty("spec") - private PartialGenericSegmentMergeIngestionSpec getIngestionSchema() + private PartialSegmentMergeIngestionSpec getIngestionSchema() { return ingestionSchema; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java index 2f4cea95c531..3bb9a1544f5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java @@ -30,7 +30,7 @@ * {@link ParallelIndexTaskRunner} for the phase to create hash partitioned segments in multi-phase parallel indexing. */ class PartialHashSegmentGenerateParallelIndexTaskRunner - extends InputSourceSplitParallelIndexTaskRunner> + extends InputSourceSplitParallelIndexTaskRunner { private static final String PHASE_NAME = "partial segment generation"; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 9770653bcd73..ea320a4703df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -33,7 +33,6 @@ import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.Interval; @@ -164,25 +163,12 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd @Override GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments) { - List partitionStats = segments.stream() - .map(segment -> createPartitionStat(toolbox, segment)) + List partitionStats = segments.stream() + .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); return new GeneratedPartitionsMetadataReport(getId(), partitionStats); } - private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment) - { - return new GenericPartitionStat( - toolbox.getTaskExecutorNode().getHost(), - toolbox.getTaskExecutorNode().getPortToUse(), - toolbox.getTaskExecutorNode().isEnableTlsPort(), - segment.getInterval(), - (BucketNumberedShardSpec) segment.getShardSpec(), - null, // numRows is not supported yet - null // sizeBytes is not supported yet - ); - } - /** * Creates shard specs based on the given configurations. The return value is a map between intervals created * based on the segment granularity and the shard specs to be created. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java index cec1f7aca395..d37b432965b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java @@ -30,7 +30,7 @@ * {@link ParallelIndexTaskRunner} for the phase to create range partitioned segments in multi-phase parallel indexing. */ class PartialRangeSegmentGenerateParallelIndexTaskRunner - extends InputSourceSplitParallelIndexTaskRunner> + extends InputSourceSplitParallelIndexTaskRunner { private static final String PHASE_NAME = "partial segment generation"; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index a67a3df81a72..61287e04d2bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -34,7 +34,6 @@ import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.joda.time.Interval; @@ -176,22 +175,9 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd @Override GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments) { - List partitionStats = segments.stream() - .map(segment -> createPartitionStat(toolbox, segment)) + List partitionStats = segments.stream() + .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); return new GeneratedPartitionsMetadataReport(getId(), partitionStats); } - - private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment) - { - return new GenericPartitionStat( - toolbox.getTaskExecutorNode().getHost(), - toolbox.getTaskExecutorNode().getPortToUse(), - toolbox.getTaskExecutorNode().isEnableTlsPort(), - segment.getInterval(), - (BucketNumberedShardSpec) segment.getShardSpec(), - null, // numRows is not supported yet - null // sizeBytes is not supported yet - ); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java index 50e3a09c2238..90990a552c61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java @@ -19,17 +19,21 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.segment.indexing.IOConfig; import java.util.List; -abstract class PartialSegmentMergeIOConfig implements IOConfig +@JsonTypeName(PartialGenericSegmentMergeTask.TYPE) +public class PartialSegmentMergeIOConfig implements IOConfig { - private final List partitionLocations; + private final List partitionLocations; - PartialSegmentMergeIOConfig(List partitionLocations) + @JsonCreator + PartialSegmentMergeIOConfig(@JsonProperty("partitionLocations") List partitionLocations) { Preconditions.checkState( partitionLocations != null && !partitionLocations.isEmpty(), @@ -39,7 +43,7 @@ abstract class PartialSegmentMergeIOConfig implemen } @JsonProperty - public List getPartitionLocations() + public List getPartitionLocations() { return partitionLocations; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java index b0ea81d8fd31..2683844c4e1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java @@ -19,16 +19,16 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.IngestionSpec; -abstract class PartialSegmentMergeIngestionSpec - extends IngestionSpec +public class PartialSegmentMergeIngestionSpec extends IngestionSpec { PartialSegmentMergeIngestionSpec( - DataSchema dataSchema, - T ioConfig, - ParallelIndexTuningConfig tuningConfig + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") PartialSegmentMergeIOConfig ioConfig, + @JsonProperty("tuningConfig") ParallelIndexTuningConfig tuningConfig ) { super(dataSchema, ioConfig, tuningConfig); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 81cc66ecc2c8..38a7261b786c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -49,7 +49,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; -import org.apache.druid.utils.CompressionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -70,12 +69,12 @@ /** * Base class for creating task that merges partial segments created by {@link PartialSegmentGenerateTask}. */ -abstract class PartialSegmentMergeTask extends PerfectRollupWorkerTask +abstract class PartialSegmentMergeTask extends PerfectRollupWorkerTask { private static final Logger LOG = new Logger(PartialSegmentMergeTask.class); - private final PartialSegmentMergeIOConfig

ioConfig; + private final PartialSegmentMergeIOConfig ioConfig; private final int numAttempts; private final String supervisorTaskId; private final String subtaskSpecId; @@ -88,7 +87,7 @@ abstract class PartialSegmentMergeTask ioConfig, + PartialSegmentMergeIOConfig ioConfig, ParallelIndexTuningConfig tuningConfig, final int numAttempts, // zero-based counting final Map context @@ -142,8 +141,8 @@ public boolean isReady(TaskActionClient taskActionClient) public TaskStatus runTask(TaskToolbox toolbox) throws Exception { // Group partitionLocations by interval and partitionId - final Map>> intervalToBuckets = new HashMap<>(); - for (P location : ioConfig.getPartitionLocations()) { + final Map>> intervalToBuckets = new HashMap<>(); + for (PartitionLocation location : ioConfig.getPartitionLocations()) { intervalToBuckets.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap<>()) .computeIfAbsent(location.getBucketId(), k -> new ArrayList<>()) .add(location); @@ -205,7 +204,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception private Map>> fetchSegmentFiles( TaskToolbox toolbox, - Map>> intervalToBuckets + Map>> intervalToBuckets ) throws IOException { final File tempDir = toolbox.getIndexingTmpDir(); @@ -214,9 +213,9 @@ private Map>> fetchSegmentFiles( final Map>> intervalToUnzippedFiles = new HashMap<>(); // Fetch partition files - for (Entry>> entryPerInterval : intervalToBuckets.entrySet()) { + for (Entry>> entryPerInterval : intervalToBuckets.entrySet()) { final Interval interval = entryPerInterval.getKey(); - for (Int2ObjectMap.Entry> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) { + for (Int2ObjectMap.Entry> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) { final int bucketId = entryPerBucketId.getIntKey(); final File partitionDir = FileUtils.getFile( tempDir, @@ -225,21 +224,11 @@ private Map>> fetchSegmentFiles( Integer.toString(bucketId) ); FileUtils.forceMkdir(partitionDir); - for (P location : entryPerBucketId.getValue()) { - final File zippedFile = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, location); - try { - final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId())); - FileUtils.forceMkdir(unzippedDir); - CompressionUtils.unzip(zippedFile, unzippedDir); - intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>()) - .computeIfAbsent(bucketId, k -> new ArrayList<>()) - .add(unzippedDir); - } - finally { - if (!zippedFile.delete()) { - LOG.warn("Failed to delete temp file[%s]", zippedFile); - } - } + for (PartitionLocation location : entryPerBucketId.getValue()) { + final File unzippedDir = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, location); + intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>()) + .computeIfAbsent(bucketId, k -> new ArrayList<>()) + .add(unzippedDir); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java index da382cec0604..fc62eda31932 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java @@ -19,133 +19,24 @@ package org.apache.druid.indexing.common.task.batch.parallel; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.java.util.common.StringUtils; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.timeline.partition.BuildingShardSpec; import org.joda.time.Interval; -import java.net.URI; -import java.util.Objects; - /** - * This class represents the intermediary data server where the partition of {@link #interval} and + * This class represents the intermediary data server where the partition of {@link #getInterval()} and * {@link #getBucketId()} is stored. */ -abstract class PartitionLocation +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GenericPartitionLocation.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = GenericPartitionStat.TYPE, value = GenericPartitionLocation.class), + @JsonSubTypes.Type(name = DeepStoragePartitionStat.TYPE, value = DeepStoragePartitionLocation.class) +}) +public interface PartitionLocation { - private final String host; - private final int port; - private final boolean useHttps; - private final String subTaskId; - private final Interval interval; - private final T secondaryPartition; - - PartitionLocation( - String host, - int port, - boolean useHttps, - String subTaskId, - Interval interval, - T secondaryPartition - ) - { - this.host = host; - this.port = port; - this.useHttps = useHttps; - this.subTaskId = subTaskId; - this.interval = interval; - this.secondaryPartition = secondaryPartition; - } - - @JsonProperty - public String getHost() - { - return host; - } - - @JsonProperty - public int getPort() - { - return port; - } - - @JsonProperty - public boolean isUseHttps() - { - return useHttps; - } - - @JsonProperty - public String getSubTaskId() - { - return subTaskId; - } - - @JsonProperty - public Interval getInterval() - { - return interval; - } - - @JsonIgnore - public T getSecondaryPartition() - { - return secondaryPartition; - } - - abstract int getBucketId(); - - final URI toIntermediaryDataServerURI(String supervisorTaskId) - { - return URI.create( - StringUtils.format( - "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&bucketId=%d", - useHttps ? "https" : "http", - host, - port, - StringUtils.urlEncode(supervisorTaskId), - StringUtils.urlEncode(subTaskId), - interval.getStart(), - interval.getEnd(), - getBucketId() - ) - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionLocation that = (PartitionLocation) o; - return port == that.port && - useHttps == that.useHttps && - Objects.equals(host, that.host) && - Objects.equals(subTaskId, that.subTaskId) && - Objects.equals(interval, that.interval) && - Objects.equals(secondaryPartition, that.secondaryPartition); - } - - @Override - public int hashCode() - { - return Objects.hash(host, port, useHttps, subTaskId, interval, secondaryPartition); - } - - @Override - public String toString() - { - return "PartitionLocation{" + - "host='" + host + '\'' + - ", port=" + port + - ", useHttps=" + useHttps + - ", subTaskId='" + subTaskId + '\'' + - ", interval=" + interval + - ", secondaryPartition=" + secondaryPartition + - '}'; - } + int getBucketId(); + Interval getInterval(); + BuildingShardSpec getShardSpec(); + String getSubTaskId(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java index c7f1a55c2a2e..4ef58871a44d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java @@ -19,119 +19,41 @@ package org.apache.druid.indexing.common.task.batch.parallel; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.timeline.partition.BucketNumberedShardSpec; +import org.apache.druid.timeline.partition.BuildingShardSpec; import org.joda.time.Interval; -import javax.annotation.Nullable; -import java.util.Objects; - /** * Statistics about a partition created by {@link PartialSegmentGenerateTask}. Each partition is a * set of data of the same time chunk (primary partition key) and the same secondary partition key - * ({@link T}). This class holds the statistics of a single partition created by a task. + * ({@link BucketNumberedShardSpec}). This class holds the statistics of a single partition created by a task. */ -abstract class PartitionStat +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GenericPartitionStat.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = GenericPartitionStat.TYPE, value = GenericPartitionStat.class), + @JsonSubTypes.Type(name = DeepStoragePartitionStat.TYPE, value = DeepStoragePartitionStat.class) +}) +public interface PartitionStat { - // Host and port of the task executor - private final String taskExecutorHost; - private final int taskExecutorPort; - private final boolean useHttps; - - // Primary partition key - private final Interval interval; - - // numRows and sizeBytes are always null currently and will be filled properly in the future. - @Nullable - private final Integer numRows; - @Nullable - private final Long sizeBytes; - - PartitionStat( - String taskExecutorHost, - int taskExecutorPort, - boolean useHttps, - Interval interval, - @Nullable Integer numRows, - @Nullable Long sizeBytes - ) - { - this.taskExecutorHost = taskExecutorHost; - this.taskExecutorPort = taskExecutorPort; - this.useHttps = useHttps; - this.interval = interval; - this.numRows = numRows == null ? 0 : numRows; - this.sizeBytes = sizeBytes == null ? 0 : sizeBytes; - } - - @JsonProperty - public final String getTaskExecutorHost() - { - return taskExecutorHost; - } - - @JsonProperty - public final int getTaskExecutorPort() - { - return taskExecutorPort; - } - - @JsonProperty - public final boolean isUseHttps() - { - return useHttps; - } - - @JsonProperty - public final Interval getInterval() - { - return interval; - } - - @Nullable - @JsonProperty - public final Integer getNumRows() - { - return numRows; - } - - @Nullable - @JsonProperty - public final Long getSizeBytes() - { - return sizeBytes; - } - /** * @return Uniquely identifying index from 0..N-1 of the N partitions */ - abstract int getBucketId(); + int getBucketId(); /** * @return Definition of secondary partition. For example, for range partitioning, this should include the start/end. */ - abstract T getSecondaryPartition(); + BucketNumberedShardSpec getSecondaryPartition(); - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionStat that = (PartitionStat) o; - return taskExecutorPort == that.taskExecutorPort && - useHttps == that.useHttps && - Objects.equals(taskExecutorHost, that.taskExecutorHost) && - Objects.equals(interval, that.interval) && - Objects.equals(numRows, that.numRows) && - Objects.equals(sizeBytes, that.sizeBytes); - } + /** + * @return interval for the partition + */ + Interval getInterval(); - @Override - public int hashCode() - { - return Objects.hash(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes); - } + /** + * Converts partition stat to PartitionLocation + * */ + PartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec shardSpec); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java index 2b33f0fb3371..2da097b02912 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java @@ -34,13 +34,14 @@ * @see PartialSegmentMergeTask */ @ExtensionPoint -public interface ShuffleClient +public interface ShuffleClient

{ /** * Fetch the segment file into the local storage for the given supervisorTaskId and the location. * If the segment file should be fetched from a remote site, the returned file will be created under the given * partitionDir. Otherwise, the returned file can be located in any path. + * @return dir containing the unzipped segment files */ - > File fetchSegmentFile(File partitionDir, String supervisorTaskId, P location) + File fetchSegmentFile(File partitionDir, String supervisorTaskId, P location) throws IOException; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java new file mode 100644 index 000000000000..34199fc9e230 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker.shuffle; + +import com.google.common.io.ByteSource; +import com.google.inject.Inject; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.BucketNumberedShardSpec; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Optional; + +public class DeepStorageIntermediaryDataManager implements IntermediaryDataManager +{ + public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data"; + private final DataSegmentPusher dataSegmentPusher; + + @Inject + public DeepStorageIntermediaryDataManager(DataSegmentPusher dataSegmentPusher) + { + this.dataSegmentPusher = dataSegmentPusher; + } + + @Override + public void start() + { + // nothing + } + + @Override + public void stop() + { + // nothing + } + + @Override + public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) + throws IOException + { + if (!(segment.getShardSpec() instanceof BucketNumberedShardSpec)) { + throw new IAE( + "Invalid shardSpec type. Expected [%s] but got [%s]", + BucketNumberedShardSpec.class.getName(), + segment.getShardSpec().getClass().getName() + ); + } + final BucketNumberedShardSpec bucketNumberedShardSpec = (BucketNumberedShardSpec) segment.getShardSpec(); + final String partitionFilePath = getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID + ); + return dataSegmentPusher.pushToPath(segmentDir, segment, SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath); + } + + @Override + public DeepStoragePartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment) + { + return new DeepStoragePartitionStat( + segment.getInterval(), + (BucketNumberedShardSpec) segment.getShardSpec(), + segment.getLoadSpec() + ); + } + + @Nullable + @Override + public Optional findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId) + { + throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec"); + } + + @Override + public void deletePartitions(String supervisorTaskId) + { + throw new UnsupportedOperationException("Not supported"); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java index a903bfc3302a..17f898a6d32d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java @@ -21,12 +21,15 @@ import com.google.common.io.ByteSource; import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.batch.parallel.PartitionStat; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Optional; /** @@ -43,6 +46,10 @@ @ExtensionPoint public interface IntermediaryDataManager { + void start(); + + void stop(); + /** * Write a segment into one of configured locations * @@ -51,9 +58,9 @@ public interface IntermediaryDataManager * @param segment - Segment to write * @param segmentDir - Directory of the segment to write * - * @return size of the writen segment + * @return the writen segment */ - long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException; + DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException; /** * Find the partition file. Note that the returned ByteSource method size() should be fast. @@ -74,4 +81,30 @@ public interface IntermediaryDataManager * */ void deletePartitions(String supervisorTaskId) throws IOException; + + PartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment); + + default String getPartitionFilePath( + String supervisorTaskId, + String subTaskId, + Interval interval, + int bucketId + ) + { + return Paths.get(getPartitionDirPath(supervisorTaskId, interval, bucketId), subTaskId).toString(); + } + + default String getPartitionDirPath( + String supervisorTaskId, + Interval interval, + int bucketId + ) + { + return Paths.get( + supervisorTaskId, + interval.getStart().toString(), + interval.getEnd().toString(), + String.valueOf(bucketId) + ).toString(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java index 7331cf6c6e52..feaa867c6f4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.worker.shuffle; +import com.google.common.base.Throwables; import com.google.common.collect.Iterators; import com.google.common.io.ByteSource; import com.google.common.io.Files; @@ -29,7 +30,9 @@ import org.apache.druid.client.indexing.TaskStatus; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -40,6 +43,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; @@ -52,7 +56,6 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -124,6 +127,7 @@ public LocalIntermediaryDataManager( this.indexingServiceClient = indexingServiceClient; } + @Override @LifecycleStart public void start() { @@ -162,12 +166,18 @@ public void start() ); } + @Override @LifecycleStop - public void stop() throws InterruptedException + public void stop() { if (supervisorTaskChecker != null) { supervisorTaskChecker.shutdownNow(); - supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS); + try { + supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Throwables.propagate(e); + } } supervisorTaskCheckTimes.clear(); } @@ -268,7 +278,7 @@ private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws Interrup * supervisorTaskId. */ @Override - public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) + public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException { // Get or create the location iterator for supervisorTask. @@ -341,7 +351,7 @@ public long addSegment(String supervisorTaskId, String subTaskId, DataSegment se subTaskId, destFile ); - return unzippedSizeBytes; + return segment.withSize(unzippedSizeBytes).withBinaryVersion(SegmentUtils.getVersionFromDir(segmentDir)); } catch (Exception e) { location.release(partitionFilePath, tempZippedFile.length()); @@ -364,7 +374,7 @@ public Optional findPartitionFile(String supervisorTaskId, String su { IdUtils.validateId("supervisorTaskId", supervisorTaskId); for (StorageLocation location : shuffleDataLocations) { - final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId)); + final File partitionDir = new File(location.getPath(), getPartitionDirPath(supervisorTaskId, interval, bucketId)); if (partitionDir.exists()) { supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow()); final File[] segmentFiles = partitionDir.listFiles(); @@ -384,6 +394,20 @@ public Optional findPartitionFile(String supervisorTaskId, String su return Optional.empty(); } + @Override + public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment) + { + return new GenericPartitionStat( + toolbox.getTaskExecutorNode().getHost(), + toolbox.getTaskExecutorNode().getPortToUse(), + toolbox.getTaskExecutorNode().isEnableTlsPort(), + segment.getInterval(), + (BucketNumberedShardSpec) segment.getShardSpec(), + null, // numRows is not supported yet + null // sizeBytes is not supported yet + ); + } + private DateTime getExpiryTimeFromNow() { return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); @@ -405,28 +429,4 @@ public void deletePartitions(String supervisorTaskId) throws IOException } supervisorTaskCheckTimes.remove(supervisorTaskId); } - - private static String getPartitionFilePath( - String supervisorTaskId, - String subTaskId, - Interval interval, - int bucketId - ) - { - return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString(); - } - - private static String getPartitionDir( - String supervisorTaskId, - Interval interval, - int bucketId - ) - { - return Paths.get( - supervisorTaskId, - interval.getStart().toString(), - interval.getEnd().toString(), - String.valueOf(bucketId) - ).toString(); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java index 6bc83ba17baf..e1726d130a2d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.worker.shuffle; -import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; @@ -64,9 +63,7 @@ public String getPathForHadoop() @Override public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException { - final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file); - return segment.withSize(unzippedSize) - .withBinaryVersion(SegmentUtils.getVersionFromDir(file)); + return intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index f1566bcef9f4..a4f776d130b0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -95,6 +95,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CompressionUtils; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -702,7 +703,7 @@ static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask } } - static class LocalShuffleClient implements ShuffleClient + static class LocalShuffleClient implements ShuffleClient { private final IntermediaryDataManager intermediaryDataManager; @@ -712,10 +713,10 @@ static class LocalShuffleClient implements ShuffleClient } @Override - public > File fetchSegmentFile( + public File fetchSegmentFile( File partitionDir, String supervisorTaskId, - P location + GenericPartitionLocation location ) throws IOException { final java.util.Optional zippedFile = intermediaryDataManager.findPartitionFile( @@ -732,7 +733,17 @@ public > File fetchSegmentFile( fetchedFile, out -> zippedFile.get().copyTo(out) ); - return fetchedFile; + final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId())); + try { + org.apache.commons.io.FileUtils.forceMkdir(unzippedDir); + CompressionUtils.unzip(fetchedFile, unzippedDir); + } + finally { + if (!fetchedFile.delete()) { + LOG.warn("Failed to delete temp file[%s]", zippedFile); + } + } + return unzippedDir; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java similarity index 58% rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java index c96adb89a755..de51d7d109af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java @@ -20,30 +20,28 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.segment.TestHelper; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.Collections; - -public class PartialGenericSegmentMergeIOConfigTest +public class DeepStoragePartitionLocationTest { private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); - private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation( - ParallelIndexTestingFactory.HOST, - ParallelIndexTestingFactory.PORT, - ParallelIndexTestingFactory.USE_HTTPS, - ParallelIndexTestingFactory.SUBTASK_ID, - ParallelIndexTestingFactory.INTERVAL, - ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC - ); - private PartialGenericSegmentMergeIOConfig target; + private DeepStoragePartitionLocation target; @Before public void setup() { - target = new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION)); + target = new DeepStoragePartitionLocation( + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC, + ImmutableMap.of("path", "/test/path") + ); } @Test @@ -51,4 +49,19 @@ public void serializesDeserializes() { TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); } + + @Test + public void hasPartitionIdThatMatchesShardSpec() + { + Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getBucketId()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(DeepStoragePartitionLocation.class) + .withNonnullFields("subTaskId", "interval", "shardSpec", "loadSpec") + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java new file mode 100644 index 000000000000..0b9bdb348f4c --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.timeline.partition.HashBucketShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +public class DeepStoragePartitionStatTest +{ + private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); + + private DeepStoragePartitionStat target; + + @Before + public void setup() + { + target = new DeepStoragePartitionStat( + ParallelIndexTestingFactory.INTERVAL, + new HashBucketShardSpec( + ParallelIndexTestingFactory.PARTITION_ID, + ParallelIndexTestingFactory.PARTITION_ID + 1, + Collections.singletonList("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ), + ImmutableMap.of("path", "/dummy/index.zip") + ); + } + + @Test + public void serializesDeserializes() + { + TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); + } + + @Test + public void hasPartitionIdThatMatchesSecondaryPartition() + { + Assert.assertEquals(target.getSecondaryPartition().getBucketId(), target.getBucketId()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(DeepStoragePartitionStat.class) + .withNonnullFields("interval", "shardSpec", "loadSpec") + .usingGetClass() + .verify(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java new file mode 100644 index 000000000000..4438677c7278 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; +import org.apache.druid.guice.GuiceAnnotationIntrospector; +import org.apache.druid.guice.GuiceInjectableValues; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.loading.LocalDataSegmentPuller; +import org.apache.druid.segment.loading.LocalLoadSpec; +import org.apache.druid.utils.CompressionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +public class DeepStorageShuffleClientTest +{ + private DeepStorageShuffleClient deepStorageShuffleClient; + private ObjectMapper mapper; + private File segmentFile; + private String segmentFileName; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + + @Before + public void setUp() throws Exception + { + final Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> binder.bind(LocalDataSegmentPuller.class) + ) + ); + mapper = new DefaultObjectMapper(); + mapper.registerModule(new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class)); + mapper.setInjectableValues(new GuiceInjectableValues(injector)); + + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + mapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair(guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()), + new AnnotationIntrospectorPair(guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()) + ); + deepStorageShuffleClient = new DeepStorageShuffleClient(mapper); + + File temp = temporaryFolder.newFile(); + segmentFileName = temp.getName(); + try (Writer writer = Files.newBufferedWriter(temp.toPath(), StandardCharsets.UTF_8)) { + for (int j = 0; j < 10; j++) { + writer.write(StringUtils.format("let's write some data.\n")); + } + } + segmentFile = new File(temp.getAbsolutePath() + ".zip"); + CompressionUtils.zip(segmentFile.getParentFile(), segmentFile); + } + + @Test + public void fetchSegmentFile() throws IOException + { + File partitionDir = temporaryFolder.newFolder(); + String subTaskId = "subTask"; + File unzippedDir = deepStorageShuffleClient.fetchSegmentFile( + partitionDir, + "testSupervisor", + new DeepStoragePartitionLocation( + subTaskId, + Intervals.of("2000/2099"), + null, + ImmutableMap.of("type", "local", "path", segmentFile.getAbsolutePath()) + ) + ); + Assert.assertEquals( + StringUtils.format("%s/unzipped_%s", partitionDir.getAbsolutePath(), subTaskId), + unzippedDir.getAbsolutePath() + ); + File fetchedSegmentFile = unzippedDir.listFiles((dir, name) -> name.endsWith(".tmp"))[0]; + Assert.assertEquals(segmentFileName, fetchedSegmentFile.getName()); + Assert.assertTrue(fetchedSegmentFile.length() > 0); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java index 4e46e388f6c6..20643a2cc8f4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.segment.TestHelper; import org.junit.Assert; import org.junit.Before; @@ -55,4 +56,13 @@ public void hasPartitionIdThatMatchesShardSpec() { Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getBucketId()); } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(GenericPartitionLocation.class) + .withNonnullFields("host", "port", "useHttps", "subTaskId", "interval", "shardSpec") + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java index 276b9addd8fa..a93adc1a767f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.segment.TestHelper; import org.apache.druid.timeline.partition.HashBucketShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; @@ -66,4 +67,13 @@ public void hasPartitionIdThatMatchesSecondaryPartition() { Assert.assertEquals(target.getSecondaryPartition().getBucketId(), target.getBucketId()); } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(GenericPartitionStat.class) + .withNonnullFields("taskExecutorHost", "taskExecutorPort", "useHttps", "interval", "shardSpec") + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java index 3ddb63b65fbe..34e2b33b30c9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.utils.CompressionUtils; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; @@ -66,12 +67,14 @@ public class HttpShuffleClientTest @Before public void setup() throws IOException { - segmentFile = temporaryFolder.newFile(); - try (Writer writer = Files.newBufferedWriter(segmentFile.toPath(), StandardCharsets.UTF_8)) { + File temp = temporaryFolder.newFile(); + try (Writer writer = Files.newBufferedWriter(temp.toPath(), StandardCharsets.UTF_8)) { for (int j = 0; j < 10; j++) { writer.write(StringUtils.format("let's write some data.\n")); } } + segmentFile = new File(temp.getAbsolutePath() + ".zip"); + CompressionUtils.zip(segmentFile.getParentFile(), segmentFile); } @Test @@ -195,17 +198,17 @@ private HttpShuffleClient mockClient(int numFailures) throws FileNotFoundExcepti return new HttpShuffleClient(httpClient); } - private static class TestPartitionLocation extends PartitionLocation + private static class TestPartitionLocation extends GenericPartitionLocation { private TestPartitionLocation() { - super(HOST, PORT, false, SUBTASK_ID, INTERVAL, PARTITION_ID); + super(HOST, PORT, false, SUBTASK_ID, INTERVAL, null); } @Override - int getBucketId() + public int getBucketId() { - return getSecondaryPartition(); + return PARTITION_ID; } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index a28fb1efe1d0..8790da02d5c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -69,35 +70,43 @@ public class ParallelIndexSupervisorTaskTest public static class CreateMergeIoConfigsTest { private static final int TOTAL_NUM_MERGE_TASKS = 10; - private static final Function, PartialGenericSegmentMergeIOConfig> - CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialGenericSegmentMergeIOConfig::new; + private static final Function, PartialSegmentMergeIOConfig> + CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialSegmentMergeIOConfig::new; - @Parameterized.Parameters(name = "count = {0}") - public static Iterable data() + @Parameterized.Parameters(name = "count = {0}, partitionLocationType = {1}") + public static Iterable data() { // different scenarios for last (index = 10 - 1 = 9) partition: return Arrays.asList( - 20, // even partitions per task: round(20 / 10) * (10 - 1) = 2 * 9 = 18 < 20 - 24, // round down: round(24 / 10) * (10 - 1) = 2 * 9 = 18 < 24 - 25, // round up to greater: round(25 / 10) * (10 - 1) = 3 * 9 = 27 > 25 (index out of bounds) - 27 // round up to equal: round(27 / 10) * (10 - 1) = 3 * 9 = 27 == 27 (empty partition) + new Object[][]{ + {20, GenericPartitionStat.TYPE}, // even partitions per task: round(20 / 10) * (10 - 1) = 2 * 9 = 18 < 20 + {24, DeepStoragePartitionStat.TYPE}, // round down: round(24 / 10) * (10 - 1) = 2 * 9 = 18 < 24 + {25, GenericPartitionStat.TYPE}, // round up to greater: round(25 / 10) * (10 - 1) = 3 * 9 = 27 > 25 (index out of bounds) + {27, DeepStoragePartitionStat.TYPE} // round up to equal: round(27 / 10) * (10 - 1) = 3 * 9 = 27 == 27 (empty partition) + } ); } - @Parameterized.Parameter + public CreateMergeIoConfigsTest(int count, String partitionLocationType) + { + this.count = count; + this.partitionLocationType = partitionLocationType; + } + public int count; + public String partitionLocationType; @Test public void handlesLastPartitionCorrectly() { - List assignedPartitionLocation = createMergeIOConfigs(); + List assignedPartitionLocation = createMergeIOConfigs(); assertNoMissingPartitions(count, assignedPartitionLocation); } @Test public void sizesPartitionsEvenly() { - List assignedPartitionLocation = createMergeIOConfigs(); + List assignedPartitionLocation = createMergeIOConfigs(); List actualPartitionSizes = assignedPartitionLocation.stream() .map(i -> i.getPartitionLocations().size()) .collect(Collectors.toList()); @@ -113,42 +122,56 @@ public void sizesPartitionsEvenly() ); } - private List createMergeIOConfigs() + private List createMergeIOConfigs() { return ParallelIndexSupervisorTask.createMergeIOConfigs( TOTAL_NUM_MERGE_TASKS, - createPartitionToLocations(count), + createPartitionToLocations(count, partitionLocationType), CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG ); } - private static Map, List> createPartitionToLocations(int count) + private static Map, List> createPartitionToLocations( + int count, + String partitionLocationType + ) { return IntStream.range(0, count).boxed().collect( Collectors.toMap( i -> Pair.of(createInterval(i), i), - i -> Collections.singletonList(createPartitionLocation(i)) + i -> Collections.singletonList(createPartitionLocation(i, partitionLocationType)) ) ); } - private static GenericPartitionLocation createPartitionLocation(int id) + private static PartitionLocation createPartitionLocation(int id, String partitionLocationType) { - return new GenericPartitionLocation( - "host", - 0, - false, - "subTaskId", - createInterval(id), - new BuildingHashBasedNumberedShardSpec( - id, - id, - id + 1, - null, - HashPartitionFunction.MURMUR3_32_ABS, - new ObjectMapper() - ) - ); + if (DeepStoragePartitionStat.TYPE.equals(partitionLocationType)) { + return new DeepStoragePartitionLocation("", Intervals.of("2000/2099"), new BuildingHashBasedNumberedShardSpec( + id, + id, + id + 1, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ), ImmutableMap.of()); + } else { + return new GenericPartitionLocation( + "host", + 0, + false, + "subTaskId", + createInterval(id), + new BuildingHashBasedNumberedShardSpec( + id, + id, + id + 1, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ) + ); + } } private static Interval createInterval(int id) @@ -158,7 +181,7 @@ private static Interval createInterval(int id) private static void assertNoMissingPartitions( int count, - List assignedPartitionLocation + List assignedPartitionLocation ) { List expectedIds = IntStream.range(0, count).boxed().collect(Collectors.toList()); @@ -167,7 +190,7 @@ private static void assertNoMissingPartitions( .flatMap( i -> i.getPartitionLocations() .stream() - .map(GenericPartitionLocation::getBucketId) + .map(PartitionLocation::getBucketId) ) .sorted() .collect(Collectors.toList()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 9e38195d9d5a..dc6537ff9601 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -77,10 +77,10 @@ class ParallelIndexTestingFactory static final ShuffleClient SHUFFLE_CLIENT = new ShuffleClient() { @Override - public > File fetchSegmentFile( + public File fetchSegmentFile( File partitionDir, String supervisorTaskId, - P location + PartitionLocation location ) { return null; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java index b98f09eead66..97930b07f984 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.google.common.collect.ImmutableMap; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.segment.TestHelper; import org.hamcrest.Matchers; @@ -27,11 +28,27 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; import java.util.Collections; +@RunWith(Parameterized.class) public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSupervisorTaskTest { + @Parameterized.Parameters(name = "partitionLocation = {0}") + public static Iterable data() + { + return Arrays.asList( + GENERIC_PARTITION_LOCATION, + DEEP_STORE_PARTITION_LOCATION + ); + } + + @Parameterized.Parameter + public PartitionLocation partitionLocation; + private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation( ParallelIndexTestingFactory.HOST, ParallelIndexTestingFactory.PORT, @@ -40,26 +57,21 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup ParallelIndexTestingFactory.INTERVAL, ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC ); - private static final PartialGenericSegmentMergeIOConfig IO_CONFIG = - new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION)); - private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec( - null, - 1, - Collections.emptyList() + + private static final DeepStoragePartitionLocation DEEP_STORE_PARTITION_LOCATION = new DeepStoragePartitionLocation( + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC, + ImmutableMap.of() ); - private static final PartialGenericSegmentMergeIngestionSpec INGESTION_SPEC = - new PartialGenericSegmentMergeIngestionSpec( - ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS), - IO_CONFIG, - new ParallelIndexTestingFactory.TuningConfigBuilder() - .partitionsSpec(PARTITIONS_SPEC) - .build() - ); @Rule public ExpectedException exception = ExpectedException.none(); private PartialGenericSegmentMergeTask target; + private PartialSegmentMergeIOConfig ioConfig; + private HashedPartitionsSpec partitionsSpec; + private PartialSegmentMergeIngestionSpec ingestionSpec; public PartialGenericSegmentMergeTaskTest() { @@ -70,6 +82,19 @@ public PartialGenericSegmentMergeTaskTest() @Before public void setup() { + ioConfig = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation)); + partitionsSpec = new HashedPartitionsSpec( + null, + 1, + Collections.emptyList() + ); + ingestionSpec = new PartialSegmentMergeIngestionSpec( + ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS), + ioConfig, + new ParallelIndexTestingFactory.TuningConfigBuilder() + .partitionsSpec(partitionsSpec) + .build() + ); target = new PartialGenericSegmentMergeTask( ParallelIndexTestingFactory.AUTOMATIC_ID, ParallelIndexTestingFactory.GROUP_ID, @@ -77,7 +102,7 @@ public void setup() ParallelIndexTestingFactory.SUPERVISOR_TASK_ID, ParallelIndexTestingFactory.SUBTASK_SPEC_ID, ParallelIndexTestingFactory.NUM_ATTEMPTS, - INGESTION_SPEC, + ingestionSpec, ParallelIndexTestingFactory.CONTEXT ); } @@ -108,11 +133,11 @@ public void requiresGranularitySpecInputIntervals() ParallelIndexTestingFactory.SUPERVISOR_TASK_ID, ParallelIndexTestingFactory.SUBTASK_SPEC_ID, ParallelIndexTestingFactory.NUM_ATTEMPTS, - new PartialGenericSegmentMergeIngestionSpec( + new PartialSegmentMergeIngestionSpec( ParallelIndexTestingFactory.createDataSchema(null), - IO_CONFIG, + ioConfig, new ParallelIndexTestingFactory.TuningConfigBuilder() - .partitionsSpec(PARTITIONS_SPEC) + .partitionsSpec(partitionsSpec) .build() ), ParallelIndexTestingFactory.CONTEXT diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java new file mode 100644 index 000000000000..a7901ab31646 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.segment.TestHelper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.Collections; + +@RunWith(Parameterized.class) +public class PartialSegmentMergeIOConfigTest +{ + final PartitionLocation partitionLocation; + + public PartialSegmentMergeIOConfigTest(PartitionLocation partitionLocation) + { + this.partitionLocation = partitionLocation; + } + + @Parameterized.Parameters(name = "partitionLocation = {0}") + public static Collection data() + { + return ImmutableList.of(new Object[]{ + new GenericPartitionLocation( + ParallelIndexTestingFactory.HOST, + ParallelIndexTestingFactory.PORT, + ParallelIndexTestingFactory.USE_HTTPS, + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC + ) + }, new Object[]{ + new DeepStoragePartitionLocation( + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC, + ImmutableMap.of("path", "/test/path") + ) + }); + } + + private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); + private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation( + ParallelIndexTestingFactory.HOST, + ParallelIndexTestingFactory.PORT, + ParallelIndexTestingFactory.USE_HTTPS, + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC + ); + + private PartialSegmentMergeIOConfig target; + + @Before + public void setup() + { + target = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation)); + } + + @Test + public void serializesDeserializes() + { + TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java similarity index 60% rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java index c30cc9ee3b29..04c28190b865 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java @@ -20,16 +20,34 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.segment.TestHelper; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; import java.util.Collections; -public class PartialGenericSegmentMergeIngestionSpecTest +@RunWith(Parameterized.class) +public class PartialSegmentMergeIngestionSpecTest { private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); + + @Parameterized.Parameters(name = "partitionLocation = {0}") + public static Iterable data() + { + return Arrays.asList( + GENERIC_PARTITION_LOCATION, + DEEP_STORE_PARTITION_LOCATION + ); + } + + @Parameterized.Parameter + public PartitionLocation partitionLocation; + private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation( ParallelIndexTestingFactory.HOST, ParallelIndexTestingFactory.PORT, @@ -38,24 +56,32 @@ public class PartialGenericSegmentMergeIngestionSpecTest ParallelIndexTestingFactory.INTERVAL, ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC ); - private static final PartialGenericSegmentMergeIOConfig IO_CONFIG = - new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION)); - private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec( - null, - 1, - Collections.emptyList() + + private static final DeepStoragePartitionLocation DEEP_STORE_PARTITION_LOCATION = new DeepStoragePartitionLocation( + ParallelIndexTestingFactory.SUBTASK_ID, + ParallelIndexTestingFactory.INTERVAL, + ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC, + ImmutableMap.of() ); - private PartialGenericSegmentMergeIngestionSpec target; + private PartialSegmentMergeIngestionSpec target; + private PartialSegmentMergeIOConfig ioConfig; + private HashedPartitionsSpec partitionsSpec; @Before public void setup() { - target = new PartialGenericSegmentMergeIngestionSpec( + ioConfig = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation)); + partitionsSpec = new HashedPartitionsSpec( + null, + 1, + Collections.emptyList() + ); + target = new PartialSegmentMergeIngestionSpec( ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS), - IO_CONFIG, + ioConfig, new ParallelIndexTestingFactory.TuningConfigBuilder() - .partitionsSpec(PARTITIONS_SPEC) + .partitionsSpec(partitionsSpec) .build() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java index 632b1ef82cac..2031b190b53c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; @@ -112,7 +113,7 @@ public Map getTaskStatuses(Set taskIds) } @After - public void teardown() throws InterruptedException + public void teardown() { intermediaryDataManager.stop(); } @@ -136,6 +137,7 @@ private File generateSegmentDir(String fileName) throws IOException // Each file size is 138 bytes after compression final File segmentDir = tempDir.newFolder(); FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9)); return segmentDir; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java index 5ad391a25384..9a5dc8fc8d35 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.ByteSource; +import com.google.common.primitives.Ints; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; @@ -76,7 +77,7 @@ public void setup() throws IOException false, null, null, - ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)), + ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 1200L, null)), false, false ); @@ -86,7 +87,7 @@ public void setup() throws IOException } @After - public void teardown() throws InterruptedException + public void teardown() { intermediaryDataManager.stop(); } @@ -232,6 +233,7 @@ private File generateSegmentDir(String fileName) throws IOException // Each file size is 138 bytes after compression final File segmentDir = tempDir.newFolder(); FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9)); return segmentDir; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index f5fbb7ff493c..f05a1dc57f41 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -19,17 +19,30 @@ package org.apache.druid.indexing.worker.shuffle; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteSource; import com.google.common.io.Files; import com.google.common.primitives.Ints; +import com.google.inject.Injector; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.guice.GuiceAnnotationIntrospector; +import org.apache.druid.guice.GuiceInjectableValues; +import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; -import org.apache.druid.java.util.common.FileUtils.FileCopyResult; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.LocalDataSegmentPuller; +import org.apache.druid.segment.loading.LocalDataSegmentPusher; +import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; +import org.apache.druid.segment.loading.LocalLoadSpec; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; @@ -41,23 +54,45 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Optional; +@RunWith(Parameterized.class) public class ShuffleDataSegmentPusherTest { + private static final String LOCAL = "local"; + private static final String DEEPSTORE = "deepstore"; + + @Parameterized.Parameters(name = "intermediateDataManager={0}") + public static Collection data() + { + return ImmutableList.of(new Object[]{LOCAL}, new Object[]{DEEPSTORE}); + } + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - private LocalIntermediaryDataManager intermediaryDataManager; + private IntermediaryDataManager intermediaryDataManager; private ShuffleDataSegmentPusher segmentPusher; + private ObjectMapper mapper; + + private final String intermediateDataStore; + private File localDeepStore; + + public ShuffleDataSegmentPusherTest(String intermediateDataStore) + { + this.intermediateDataStore = intermediateDataStore; + } @Before public void setup() throws IOException @@ -77,19 +112,47 @@ public void setup() throws IOException false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); - intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + if (LOCAL.equals(intermediateDataStore)) { + intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + } else if (DEEPSTORE.equals(intermediateDataStore)) { + localDeepStore = temporaryFolder.newFolder("localStorage"); + intermediaryDataManager = new DeepStorageIntermediaryDataManager( + new LocalDataSegmentPusher( + new LocalDataSegmentPusherConfig() + { + @Override + public File getStorageDirectory() + { + return localDeepStore; + } + })); + } intermediaryDataManager.start(); segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager); + + final Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> binder.bind(LocalDataSegmentPuller.class) + ) + ); + mapper = new DefaultObjectMapper(); + mapper.registerModule(new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class)); + mapper.setInjectableValues(new GuiceInjectableValues(injector)); + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + mapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair(guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()), + new AnnotationIntrospectorPair(guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()) + ); } @After - public void teardown() throws InterruptedException + public void teardown() { intermediaryDataManager.stop(); } @Test - public void testPush() throws IOException + public void testPush() throws IOException, SegmentLoadingException { final File segmentDir = generateSegmentDir(); final DataSegment segment = newSegment(Intervals.of("2018/2019")); @@ -98,21 +161,33 @@ public void testPush() throws IOException Assert.assertEquals(9, pushed.getBinaryVersion().intValue()); Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version - final Optional zippedSegment = intermediaryDataManager.findPartitionFile( - "supervisorTaskId", - "subTaskId", - segment.getInterval(), - segment.getShardSpec().getPartitionNum() - ); - Assert.assertTrue(zippedSegment.isPresent()); final File tempDir = temporaryFolder.newFolder(); - final FileCopyResult result = CompressionUtils.unzip( - zippedSegment.get(), - tempDir, - org.apache.druid.java.util.common.FileUtils.IS_EXCEPTION, - false - ); - final List unzippedFiles = new ArrayList<>(result.getFiles()); + if (intermediaryDataManager instanceof LocalIntermediaryDataManager) { + final Optional zippedSegment = intermediaryDataManager.findPartitionFile( + "supervisorTaskId", + "subTaskId", + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + Assert.assertTrue(zippedSegment.isPresent()); + CompressionUtils.unzip( + zippedSegment.get(), + tempDir, + org.apache.druid.java.util.common.FileUtils.IS_EXCEPTION, + false + ); + } else if (intermediaryDataManager instanceof DeepStorageIntermediaryDataManager) { + final LoadSpec loadSpec = mapper.convertValue(pushed.getLoadSpec(), LoadSpec.class); + Assert.assertTrue(pushed.getLoadSpec() + .get("path") + .toString() + .startsWith(localDeepStore.getAbsolutePath() + + "/" + + DeepStorageIntermediaryDataManager.SHUFFLE_DATA_DIR_PREFIX)); + loadSpec.loadSegment(tempDir); + } + + final List unzippedFiles = Arrays.asList(tempDir.listFiles()); unzippedFiles.sort(Comparator.comparing(File::getName)); final File dataFile = unzippedFiles.get(0); Assert.assertEquals("test", dataFile.getName()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index 798b05f4e672..033403095285 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.worker.shuffle; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; @@ -153,7 +154,7 @@ public void testGetPartitionWithValidParamsReturnOk() throws IOException final Map snapshot = shuffleMetrics.snapshotAndReset(); Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests()); - Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes()); + Assert.assertEquals(254, snapshot.get(supervisorTaskId).getShuffleBytes()); } @Test @@ -213,6 +214,7 @@ private File generateSegmentDir(String fileName) throws IOException // Each file size is 138 bytes after compression final File segmentDir = tempDir.newFolder(); FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9)); return segmentDir; } } diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 845bf216245a..1dc42c8c725a 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -123,8 +123,8 @@ ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \ # Some test groups require pre-existing data to be setup && setupData \ # Export the service config file path to use in supervisord conf file - && export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \ + && export DRUID_SERVICE_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \ # Export the common config file path to use in supervisord conf file - && export DRUID_SERVICE_CONF_DIR="$(. /druid.sh; getConfPath _common)" \ + && export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath _common)" \ # Run Druid service using supervisord && exec /usr/bin/supervisord -c /etc/supervisor/conf.d/supervisord.conf diff --git a/integration-tests/docker/docker-compose.shuffle-deep-store.yml b/integration-tests/docker/docker-compose.shuffle-deep-store.yml new file mode 100644 index 000000000000..cf2670ed1f2d --- /dev/null +++ b/integration-tests/docker/docker-compose.shuffle-deep-store.yml @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "2.2" +services: + druid-zookeeper-kafka: + extends: + file: docker-compose.base.yml + service: druid-zookeeper-kafka + + druid-metadata-storage: + extends: + file: docker-compose.base.yml + service: druid-metadata-storage + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + + druid-coordinator: + extends: + file: docker-compose.base.yml + service: druid-coordinator + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-overlord: + extends: + file: docker-compose.base.yml + service: druid-overlord + env_file: + - ./environment-configs/common-shuffle-deep-store + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-coordinator + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-historical: + extends: + file: docker-compose.base.yml + service: druid-historical + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + + druid-middlemanager: + extends: + file: docker-compose.base.yml + service: druid-middlemanager + env_file: + - ./environment-configs/common-shuffle-deep-store + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + - druid-overlord + + druid-broker: + extends: + file: docker-compose.base.yml + service: druid-broker + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-coordinator + - druid-zookeeper-kafka + - druid-middlemanager + - druid-historical + + druid-router: + extends: + file: docker-compose.base.yml + service: druid-router + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + - druid-coordinator + - druid-broker + - druid-overlord + +networks: + druid-it-net: + name: druid-it-net + ipam: + config: + - subnet: 172.172.172.0/24 \ No newline at end of file diff --git a/integration-tests/docker/environment-configs/common-shuffle-deep-store b/integration-tests/docker/environment-configs/common-shuffle-deep-store new file mode 100644 index 000000000000..30117bf369eb --- /dev/null +++ b/integration-tests/docker/environment-configs/common-shuffle-deep-store @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +LANG=C.UTF-8 +LANGUAGE=C.UTF-8 +LC_ALL=C.UTF-8 + +# JAVA OPTS +COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp +DRUID_DEP_LIB_DIR=/shared/hadoop_xml:/shared/docker/lib/*:/usr/local/druid/lib/mysql-connector-java.jar + +# Druid configs +druid_extensions_loadList=[] +druid_extensions_directory=/shared/docker/extensions +druid_auth_authenticator_basic_authorizerName=basic +druid_auth_authenticator_basic_initialAdminPassword=priest +druid_auth_authenticator_basic_initialInternalClientPassword=warlock +druid_auth_authenticator_basic_type=basic +druid_auth_authenticatorChain=["basic"] +druid_auth_authorizer_basic_type=basic +druid_auth_authorizers=["basic"] +druid_client_https_certAlias=druid +druid_client_https_keyManagerPassword=druid123 +druid_client_https_keyStorePassword=druid123 +druid_client_https_keyStorePath=/tls/server.jks +druid_client_https_protocol=TLSv1.2 +druid_client_https_trustStoreAlgorithm=PKIX +druid_client_https_trustStorePassword=druid123 +druid_client_https_trustStorePath=/tls/truststore.jks +druid_enableTlsPort=true +druid_escalator_authorizerName=basic +druid_escalator_internalClientPassword=warlock +druid_escalator_internalClientUsername=druid_system +druid_escalator_type=basic +druid_lookup_numLookupLoadingThreads=1 +druid_server_http_numThreads=20 +# Allow OPTIONS method for ITBasicAuthConfigurationTest.testSystemSchemaAccess +druid_server_http_allowedHttpMethods=["OPTIONS"] +druid_server_https_certAlias=druid +druid_server_https_keyManagerPassword=druid123 +druid_server_https_keyStorePassword=druid123 +druid_server_https_keyStorePath=/tls/server.jks +druid_server_https_keyStoreType=jks +druid_server_https_requireClientCertificate=true +druid_server_https_trustStoreAlgorithm=PKIX +druid_server_https_trustStorePassword=druid123 +druid_server_https_trustStorePath=/tls/truststore.jks +druid_server_https_validateHostnames=true +druid_zk_service_host=druid-zookeeper-kafka +druid_auth_basic_common_maxSyncRetries=20 +druid_indexer_logs_directory=/shared/tasklogs +druid_sql_enable=true +druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies +druid_request_logging_type=slf4j +druid_coordinator_kill_supervisor_on=true +druid_coordinator_kill_supervisor_period=PT10S +druid_coordinator_kill_supervisor_durationToRetain=PT0M +druid_coordinator_period_metadataStoreManagementPeriod=PT10S + +# Testing the legacy config from https://github.com/apache/druid/pull/10267 +# Can remove this when the flag is no longer needed +druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true +# Test with deep storage as intermediate location to store shuffle data +# Local deep storage will be used here +druid_processing_intermediaryData_storage_type=deepstore diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh index db256963d774..ff84fd303e44 100644 --- a/integration-tests/script/docker_compose_args.sh +++ b/integration-tests/script/docker_compose_args.sh @@ -69,6 +69,10 @@ getComposeArgs() then # default + schema registry container echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml" + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "shuffle-deep-store" ] + then + # default + schema registry container + echo "-f ${DOCKERDIR}/docker-compose.shuffle-deep-store.yml" else # default echo "-f ${DOCKERDIR}/docker-compose.yml" diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index bd346ffa318d..f583be48f6c6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -155,4 +155,6 @@ public class TestNGGroup public static final String KINESIS_DATA_FORMAT = "kinesis-data-format"; public static final String HIGH_AVAILABILTY = "high-availability"; + + public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java index 1bb1a79bd49c..1cd90f09ac06 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java @@ -36,7 +36,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX) +@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java index 5117ff07c8f4..fc8aeb8c710e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java @@ -64,9 +64,15 @@ public String getPathForHadoop(String dataSource) @Override public DataSegment push(final File dataSegmentFile, final DataSegment segment, final boolean useUniquePath) throws IOException + { + return pushToPath(dataSegmentFile, segment, this.getStorageDir(segment, useUniquePath)); + } + + @Override + public DataSegment pushToPath(File dataSegmentFile, DataSegment segment, String storageDirSuffix) throws IOException { final File baseStorageDir = config.getStorageDirectory(); - final File outDir = new File(baseStorageDir, this.getStorageDir(segment, useUniquePath)); + final File outDir = new File(baseStorageDir, storageDirSuffix); log.debug("Copying segment[%s] to local filesystem at location[%s]", segment.getId(), outDir.toString()); @@ -81,7 +87,7 @@ public DataSegment push(final File dataSegmentFile, final DataSegment segment, f .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); } - final File tmpOutDir = new File(baseStorageDir, makeIntermediateDir()); + final File tmpOutDir = new File(config.getStorageDirectory(), makeIntermediateDir()); log.debug("Creating intermediate directory[%s] for segment[%s].", tmpOutDir.toString(), segment.getId()); org.apache.commons.io.FileUtils.forceMkdir(tmpOutDir); @@ -90,8 +96,8 @@ public DataSegment push(final File dataSegmentFile, final DataSegment segment, f final long size = compressSegment(dataSegmentFile, tmpIndexFile); final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI())) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); + .withSize(size) + .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); org.apache.commons.io.FileUtils.forceMkdir(outDir); final File indexFileTarget = new File(outDir, tmpIndexFile.getName()); diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 40a0a5ad369c..e98c95f8505c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.TaskManagementResource; import org.apache.druid.indexing.worker.http.WorkerResource; +import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.ShuffleModule; @@ -185,6 +186,7 @@ private void configureIntermediaryData(Binder binder) Key.get(IntermediaryDataManager.class) ); biddy.addBinding("local").to(LocalIntermediaryDataManager.class); + biddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 8c1fb005b61b..ab02a07e8df7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -77,6 +77,7 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTaskClientFactory; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient; import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskClientFactory; @@ -88,6 +89,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.worker.executor.ExecutorLifecycle; import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig; +import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager; import org.apache.druid.java.util.common.lifecycle.Lifecycle; @@ -467,6 +469,7 @@ static void configureIntermediaryData(Binder binder) Key.get(IntermediaryDataManager.class) ); intermediaryDataManagerBiddy.addBinding("local").to(LocalIntermediaryDataManager.class).in(LazySingleton.class); + intermediaryDataManagerBiddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class); PolyBind.createChoice( binder, @@ -479,5 +482,6 @@ static void configureIntermediaryData(Binder binder) Key.get(ShuffleClient.class) ); shuffleClientBiddy.addBinding("local").to(HttpShuffleClient.class).in(LazySingleton.class); + shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class); } } diff --git a/website/.spelling b/website/.spelling index d4b568c8c037..1b2836020493 100644 --- a/website/.spelling +++ b/website/.spelling @@ -238,6 +238,7 @@ datasketches datasource datasources dbcp +deepstore denormalization denormalize denormalized @@ -425,6 +426,7 @@ unintuitive unioned unmergeable unmerged +UNNEST unparseable unparsed unsetting