diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index c90281c0a..9274dbb40 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -343,6 +343,7 @@ object FileInfo { // IndexLogEntry-specific CoveringIndex that represents derived dataset. case class CoveringIndex(properties: CoveringIndex.Properties) { val kind = "CoveringIndex" + val kindAbbr = "CI" } object CoveringIndex { case class Properties(columns: Properties.Columns, diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala new file mode 100644 index 000000000..d46b25fb6 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -0,0 +1,50 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plans.logical + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation} +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.index.IndexLogEntry + +/** + * Wrapper class of HadoopFsRelation to indicate index application more explicitly in Plan string. + */ +class IndexHadoopFsRelation( + location: FileIndex, + partitionSchema: StructType, + dataSchema: StructType, + bucketSpec: Option[BucketSpec], + fileFormat: FileFormat, + options: Map[String, String])(spark: SparkSession, index: IndexLogEntry) + extends HadoopFsRelation( + location, + partitionSchema, + dataSchema, + bucketSpec, + fileFormat, + options)(spark) { + + val indexPlanStr: String = { + s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " + + s"Name: ${index.name}, LogVersion: ${index.id})" + } + override def toString(): String = indexPlanStr + +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 8a167d406..88c51ab69 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{LongType, StructType} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.index.plans.logical.BucketUnion +import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation} import com.microsoft.hyperspace.util.HyperspaceConf object RuleUtils { @@ -249,13 +249,13 @@ object RuleUtils { case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) => val location = new InMemoryFileIndex(spark, index.content.files, Map(), None) - val relation = HadoopFsRelation( + val relation = new IndexHadoopFsRelation( location, new StructType(), StructType(index.schema.filter(baseRelation.schema.contains(_))), if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) @@ -357,13 +357,13 @@ object RuleUtils { IndexConstants.DATA_FILE_NAME_ID)))) val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None) - val relation = HadoopFsRelation( + val relation = new IndexHadoopFsRelation( newLocation, new StructType(), newSchema, if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 4234b0400..a62b78428 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -72,60 +72,10 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { // Constructing expected output for given query from explain API val expectedOutput = new StringBuilder - // The format of the explain output looks as follows: - // scalastyle:off filelinelengthchecker - /** - *============================================================= - *Plan with indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * <----+- *(2) Project [Col1#21, Col2#22]----> - * <----+- *(2) Filter isnotnull(Col1#21)----> - * <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * - *============================================================= - *Plan without indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0----> - * <----: +- Exchange hashpartitioning(Col1#11, 5)----> - * <----: +- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct----> - * <----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0----> - * <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)----> - * - *============================================================= - *Indexes used: - *============================================================= - *joinIndex:src/test/resources/indexLocation/joinIndex/v__=0 - * - * ============================================================= - * Physical operator stats: - * ============================================================= - * +------------------+-------------------+------------------+----------+ - * | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +------------------+-------------------+------------------+----------+ - * | *Filter| 1| 2| 1| - * | *InputAdapter| 4| 2| -2| - * | *Project| 1| 2| 1| - * | *ReusedExchange| 1| 0| -1| - * | *Scan parquet| 1| 2| 1| - * | *ShuffleExchange| 1| 0| -1| - * | *Sort| 2| 0| -2| - * |*WholeStageCodegen| 4| 3| -1| - * | SortMergeJoin| 1| 1| 0| - * +------------------+-------------------+------------------+----------+ - */ - // scalastyle:on filelinelengthchecker - val joinIndexFilePath = getIndexFilesPath("joinIndex") - val joinIndexPath = getIndexRootPath("joinIndex") + // The format of the explain output looks as follows: // scalastyle:off filelinelengthchecker expectedOutput .append("=============================================================") @@ -140,7 +90,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -148,7 +98,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -193,31 +143,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 2| 1|") + .append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*WholeStageCodegen| 4| 3| -1|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| SortMergeJoin| 1| 1| 0|") + .append(defaultDisplayMode.newLine) + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -240,57 +192,6 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { val expectedOutput = new StringBuilder // The format of the explain output looks as follows: - // scalastyle:off filelinelengthchecker - /** - * ============================================================= - * Plan with indexes: - * ============================================================= - * Project [Col1#135] - * +- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145)) - * : +- Subquery subquery145 - * : +- *(1) Project [Col1#135] - * : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----: +- *(1) FileScan parquet [Col2#136,Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/indexes/filterIndex], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct - * +- Subquery subquery145 - * +- *(1) Project [Col1#135] - * +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----+- *(1) FileScan parquet [Col2#136,Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/indexes/filterIndex], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * - * ============================================================= - * Plan without indexes: - * ============================================================= - * Project [Col1#135] - * +- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145)) - * : +- Subquery subquery145 - * : +- *(1) Project [Col1#135] - * : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----: +- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct - * +- Subquery subquery145 - * +- *(1) Project [Col1#135] - * +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----+- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * - * ============================================================= - * Indexes used: - * ============================================================= - * filterIndex:src/test/resources/indexLocation/indexes/filterIndex - * - * ============================================================= - * Physical operator stats: - * ============================================================= - * +-----------------+-------------------+------------------+----------+ - * |Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +-----------------+-------------------+------------------+----------+ - * | Filter| 1| 1| 0| - * | Project| 1| 1| 0| - * | Scan parquet| 1| 1| 0| - * |WholeStageCodegen| 1| 1| 0| - * +-----------------+-------------------+------------------+----------+ - */ - // scalastyle:on filelinelengthchecker - // scalastyle:off filelinelengthchecker expectedOutput .append("=============================================================") @@ -309,7 +210,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan parquet [Col2#136,Col1#135]") + .append(" <----: +- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135]") .append(" Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -325,7 +226,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----+- *(1) FileScan parquet [Col2#136,Col1#135] " + + .append(" <----+- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135] " + "Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -447,55 +348,6 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { val expectedOutput = new StringBuilder // The format of the explain output looks as follows: - // scalastyle:off filelinelengthchecker - /** - *============================================================= - *Plan with indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * <----+- *(2) Project [Col1#21, Col2#22]----> - * <----+- *(2) Filter isnotnull(Col1#21)----> - * <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * - *============================================================= - *Plan without indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0----> - * <----: +- Exchange hashpartitioning(Col1#11, 5)----> - * <----: +- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct----> - * <----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0----> - * <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)----> - * - *============================================================= - *Indexes used: - *============================================================= - *joinIndex:src/test/resources/indexLocation/joinIndex/v__=0 - * - * ============================================================= - * Physical operator stats: - * ============================================================= - * +------------------+-------------------+------------------+----------+ - * | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +------------------+-------------------+------------------+----------+ - * | *Filter| 1| 2| 1| - * | *InputAdapter| 4| 2| -2| - * | *Project| 1| 2| 1| - * | *ReusedExchange| 1| 0| -1| - * | *Scan parquet| 1| 2| 1| - * | *ShuffleExchange| 1| 0| -1| - * | *Sort| 2| 0| -2| - * |*WholeStageCodegen| 4| 3| -1| - * | SortMergeJoin| 1| 1| 0| - * +------------------+-------------------+------------------+----------+ - */ - // scalastyle:on filelinelengthchecker - val joinIndexFilePath = getIndexFilesPath("joinIndex") val joinIndexPath = getIndexRootPath("joinIndex") @@ -514,7 +366,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -522,7 +374,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -567,31 +419,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") + .append(defaultDisplayMode.newLine) + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 2| 1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("|*WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -613,31 +467,6 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { val indexConfig = IndexConfig("filterIndex", Seq("Col2"), Seq("Col1")) hyperspace.createIndex(df, indexConfig) - // scalastyle:off filelinelengthchecker - /** - * Expected output with displayMode-specific strings substituted (not shown below): - * - * ============================================================= - * Plan with indexes: - * ============================================================= - * Project [Col1#500] - * +- Filter (isnotnull(Col2#501) && (Col2#501 = 2)) - * +- FileScan parquet [Col2#501,Col1#500] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/indexes/filterIndex], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct - * - * ============================================================= - * Plan without indexes: - * ============================================================= - * Project [Col1#500] - * +- Filter (isnotnull(Col2#501) && (Col2#501 = 2)) - * +- FileScan parquet [Col1#500,Col2#501] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ReadSchema: struct - * - * ============================================================= - * Indexes used: - * ============================================================= - * filterIndex:src/test/resources/indexLocation/indexes/filterIndex - * - */ - // scalastyle:on filelinelengthchecker val expectedOutput = new StringBuilder expectedOutput .append(displayMode.beginEndTag.open) @@ -651,7 +480,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") .append(displayMode.newLine) - .append(" " + displayMode.highlightTag.open ++ "+- FileScan parquet [Col2#,Col1#] ") + .append(" " + displayMode.highlightTag.open) + .append("+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] ") .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")