From e5e321e9438ff87b8b88954475fea0d34daabd4c Mon Sep 17 00:00:00 2001 From: sezruby Date: Mon, 18 Jan 2021 11:30:17 +0900 Subject: [PATCH 1/8] Introduce IndexHadoopFsRelation --- .../plans/logical/IndexHadoopFsRelation.scala | 43 +++++++++++ .../hyperspace/index/rules/RuleUtils.scala | 12 +-- .../index/plananalysis/ExplainTest.scala | 77 ++++++++++--------- 3 files changed, 91 insertions(+), 41 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala new file mode 100644 index 000000000..3d3caa9e7 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -0,0 +1,43 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plans.logical + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation} +import org.apache.spark.sql.types.StructType + +class IndexHadoopFsRelation( + spark: SparkSession, + location: FileIndex, + partitionSchema: StructType, + dataSchema: StructType, + bucketSpec: Option[BucketSpec], + fileFormat: FileFormat, + options: Map[String, String]) + extends HadoopFsRelation( + location, + partitionSchema, + dataSchema, + bucketSpec, + fileFormat, + options)(spark) { + + override def toString(): String = { + "CoveringIndex" + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index e9caf53c1..ec3cf9b66 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{LongType, StructType} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.index.plans.logical.BucketUnion +import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation} import com.microsoft.hyperspace.util.HyperspaceConf object RuleUtils { @@ -251,13 +251,14 @@ object RuleUtils { case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) => val location = new InMemoryFileIndex(spark, index.content.files, Map(), None) - val relation = HadoopFsRelation( + val relation = new IndexHadoopFsRelation( + spark, location, new StructType(), StructType(index.schema.filter(baseRelation.schema.contains(_))), if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER)) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) @@ -359,13 +360,14 @@ object RuleUtils { IndexConstants.DATA_FILE_NAME_ID)))) val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None) - val relation = HadoopFsRelation( + val relation = new IndexHadoopFsRelation( + spark, newLocation, new StructType(), newSchema, if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER)) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index e3efc5f0e..32e486afe 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -81,10 +81,10 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { * SortMergeJoin [Col1#11], [Col1#21], Inner * <----:- *(1) Project [Col1#11, Col2#12]----> * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> + * <----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> * <----+- *(2) Project [Col1#21, Col2#22]----> * <----+- *(2) Filter isnotnull(Col1#21)----> - * <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> + * <----+- *(2) FileScan CovieringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> * *============================================================= *Plan without indexes: @@ -113,7 +113,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { * | *InputAdapter| 4| 2| -2| * | *Project| 1| 2| 1| * | *ReusedExchange| 1| 0| -1| - * | *Scan parquet| 1| 2| 1| + * |Scan CoveringIndex| 0| 2| 2| + * | *Scan parquet| 1| 0| -1| * | *ShuffleExchange| 1| 0| -1| * | *Sort| 2| 0| -2| * |*WholeStageCodegen| 4| 3| -1| @@ -140,7 +141,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -148,7 +149,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan CoveringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -193,31 +194,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+-------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+-------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 2| 1|") + .append("|*Scan CoveringIndex| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*WholeStageCodegen| 4| 3| -1|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| SortMergeJoin| 1| 1| 0|") + .append(defaultDisplayMode.newLine) + .append("+-------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -309,7 +312,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan parquet [Col2#136,Col1#135]") + .append(" <----: +- *(1) FileScan CoveringIndex [Col2#136,Col1#135]") .append(" Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -325,7 +328,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----+- *(1) FileScan parquet [Col2#136,Col1#135] " + + .append(" <----+- *(1) FileScan CoveringIndex [Col2#136,Col1#135] " + "Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -514,7 +517,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -522,7 +525,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan CoveringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -567,31 +570,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+-------------------+-------------------+------------------+----------+") + .append(defaultDisplayMode.newLine) + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("+-------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("|*Scan CoveringIndex| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 2| 1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("|*WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+-------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -651,7 +656,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") .append(displayMode.newLine) - .append(" " + displayMode.highlightTag.open ++ "+- FileScan parquet [Col2#,Col1#] ") + .append(" " + displayMode.highlightTag.open ++ "+- FileScan CoveringIndex [Col2#,Col1#] ") .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") From f7ea5a7370462a5aafd0b13b8c0c02e881b367f4 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 20 Jan 2021 12:40:38 +0900 Subject: [PATCH 2/8] Add index name and log version --- .../microsoft/hyperspace/actions/Action.scala | 1 + .../hyperspace/actions/CreateAction.scala | 3 +- .../hyperspace/actions/CreateActionBase.scala | 5 +- .../hyperspace/actions/OptimizeAction.scala | 10 +- .../hyperspace/actions/RefreshAction.scala | 6 +- .../actions/RefreshIncrementalAction.scala | 4 +- .../hyperspace/index/IndexConstants.scala | 1 + .../hyperspace/index/IndexLogEntry.scala | 4 + .../plans/logical/IndexHadoopFsRelation.scala | 5 +- .../hyperspace/index/rules/RuleUtils.scala | 2 + .../index/plananalysis/ExplainTest.scala | 175 ++++-------------- 11 files changed, 68 insertions(+), 148 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index eb1143cde..782481954 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -33,6 +33,7 @@ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, HyperspaceE */ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession { protected val baseId: Int = logManager.getLatestId().getOrElse(-1) + protected def endId: Int = baseId + 2 def logEntry: LogEntry diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala index c48fca27e..d081f4fca 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala @@ -35,7 +35,8 @@ class CreateAction( dataManager: IndexDataManager) extends CreateActionBase(dataManager) with Action { - final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + final override def logEntry: LogEntry = + getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) final override val transientState: String = CREATING diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 7bffbf2b4..298af7e09 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -51,7 +51,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) spark: SparkSession, df: DataFrame, indexConfig: IndexConfig, - path: Path): IndexLogEntry = { + path: Path, + logVersionId: Int): IndexLogEntry = { val absolutePath = PathUtils.makeAbsolute(path) val numBuckets = numBucketsForIndex(spark) @@ -88,7 +89,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) coveringIndexProperties)), Content.fromDirectory(absolutePath, fileIdTracker), Source(SparkPlan(sourcePlanProperties)), - Map()) + Map(IndexConstants.INDEX_LOG_VERSION_PROPERTY -> logVersionId.toString)) case None => throw HyperspaceException("Invalid plan for creating an index.") } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index f2fb8575b..c81c3df91 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -146,9 +146,15 @@ class OptimizeAction( Directory.fromLeafFiles(filesToIgnoreStatuses, fileIdTracker) } val mergedContent = Content(newContent.root.merge(filesToIgnoreDirectory)) - previousIndexLogEntry.copy(content = mergedContent) + previousIndexLogEntry.copy( + content = mergedContent, + properties = previousIndexLogEntry.properties + + (IndexConstants.INDEX_LOG_VERSION_PROPERTY -> super[Action].endId.toString)) } else { - previousIndexLogEntry.copy(content = newContent) + previousIndexLogEntry.copy( + content = newContent, + properties = previousIndexLogEntry.properties + + (IndexConstants.INDEX_LOG_VERSION_PROPERTY -> super[Action].endId.toString)) } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala index 9ea64012a..edf1f3961 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala @@ -34,9 +34,11 @@ class RefreshAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) { + extends RefreshActionBase(spark, logManager, dataManager) + with Action { - override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + override def logEntry: LogEntry = + getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) final override def op(): Unit = write(spark, df, indexConfig) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala index 7f0198473..ba5490a93 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala @@ -48,7 +48,7 @@ class RefreshIncrementalAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) { + extends RefreshActionBase(spark, logManager, dataManager) with Action { final override def op(): Unit = { logInfo( @@ -127,7 +127,7 @@ class RefreshIncrementalAction( * @return Refreshed index log entry. */ override def logEntry: LogEntry = { - val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) // If there is no deleted files, there are index data files only for appended data in this // version and we need to add the index data files of previous index version. diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 68b6fd98a..611746043 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -97,6 +97,7 @@ object IndexConstants { private[hyperspace] val LINEAGE_PROPERTY = "lineage" // Indicate whether the source file format is parquet. private[hyperspace] val HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY = "hasParquetAsSourceFormat" + private[hyperspace] val INDEX_LOG_VERSION_PROPERTY = "indexLogVersionProperty" // Hyperspace allows users to use globbing patterns to create indexes on. E.g. if user wants to // create an index on "/temp/*/*", they can do so by setting this key to "/temp/*/*". If not set, diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index c90281c0a..9cf07ed0d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -542,6 +542,10 @@ case class IndexLogEntry( IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean } + def logVersion: String = { + properties.getOrElse(IndexConstants.INDEX_LOG_VERSION_PROPERTY, "undefined") + } + @JsonIgnore lazy val fileIdTracker: FileIdTracker = { val tracker = new FileIdTracker diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index 3d3caa9e7..7824b07b6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -21,8 +21,11 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation} import org.apache.spark.sql.types.StructType +import com.microsoft.hyperspace.index.IndexLogEntry + class IndexHadoopFsRelation( spark: SparkSession, + index: IndexLogEntry, location: FileIndex, partitionSchema: StructType, dataSchema: StructType, @@ -38,6 +41,6 @@ class IndexHadoopFsRelation( options)(spark) { override def toString(): String = { - "CoveringIndex" + s"Index ${index.name}_${index.logVersion}" } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index ec3cf9b66..b854c634f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -253,6 +253,7 @@ object RuleUtils { new InMemoryFileIndex(spark, index.content.files, Map(), None) val relation = new IndexHadoopFsRelation( spark, + index, location, new StructType(), StructType(index.schema.filter(baseRelation.schema.contains(_))), @@ -362,6 +363,7 @@ object RuleUtils { val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None) val relation = new IndexHadoopFsRelation( spark, + index, newLocation, new StructType(), newSchema, diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 32e486afe..ad6c5723a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -72,61 +72,10 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { // Constructing expected output for given query from explain API val expectedOutput = new StringBuilder - // The format of the explain output looks as follows: - // scalastyle:off filelinelengthchecker - /** - *============================================================= - *Plan with indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * <----+- *(2) Project [Col1#21, Col2#22]----> - * <----+- *(2) Filter isnotnull(Col1#21)----> - * <----+- *(2) FileScan CovieringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * - *============================================================= - *Plan without indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0----> - * <----: +- Exchange hashpartitioning(Col1#11, 200)----> - * <----: +- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct----> - * <----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0----> - * <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 200)----> - * - *============================================================= - *Indexes used: - *============================================================= - *joinIndex:src/test/resources/indexLocation/joinIndex/v__=0 - * - * ============================================================= - * Physical operator stats: - * ============================================================= - * +------------------+-------------------+------------------+----------+ - * | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +------------------+-------------------+------------------+----------+ - * | *Filter| 1| 2| 1| - * | *InputAdapter| 4| 2| -2| - * | *Project| 1| 2| 1| - * | *ReusedExchange| 1| 0| -1| - * |Scan CoveringIndex| 0| 2| 2| - * | *Scan parquet| 1| 0| -1| - * | *ShuffleExchange| 1| 0| -1| - * | *Sort| 2| 0| -2| - * |*WholeStageCodegen| 4| 3| -1| - * | SortMergeJoin| 1| 1| 0| - * +------------------+-------------------+------------------+----------+ - */ - // scalastyle:on filelinelengthchecker - val joinIndexFilePath = getIndexFilesPath("joinIndex") - val joinIndexPath = getIndexRootPath("joinIndex") + // The format of the explain output looks as follows: // scalastyle:off filelinelengthchecker expectedOutput .append("=============================================================") @@ -141,7 +90,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Index joinIndex_1 [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -149,7 +98,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan CoveringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Index joinIndex_1 [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -194,33 +143,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*Scan CoveringIndex| 0| 2| 2|") + .append("|*Scan Index joinIndex_1| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| *WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -243,57 +192,6 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { val expectedOutput = new StringBuilder // The format of the explain output looks as follows: - // scalastyle:off filelinelengthchecker - /** - * ============================================================= - * Plan with indexes: - * ============================================================= - * Project [Col1#135] - * +- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145)) - * : +- Subquery subquery145 - * : +- *(1) Project [Col1#135] - * : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----: +- *(1) FileScan parquet [Col2#136,Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/indexes/filterIndex], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct - * +- Subquery subquery145 - * +- *(1) Project [Col1#135] - * +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----+- *(1) FileScan parquet [Col2#136,Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/indexes/filterIndex], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * - * ============================================================= - * Plan without indexes: - * ============================================================= - * Project [Col1#135] - * +- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145)) - * : +- Subquery subquery145 - * : +- *(1) Project [Col1#135] - * : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----: +- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct - * +- Subquery subquery145 - * +- *(1) Project [Col1#135] - * +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1)) - * <----+- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ReadSchema: struct----> - * - * ============================================================= - * Indexes used: - * ============================================================= - * filterIndex:src/test/resources/indexLocation/indexes/filterIndex - * - * ============================================================= - * Physical operator stats: - * ============================================================= - * +-----------------+-------------------+------------------+----------+ - * |Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +-----------------+-------------------+------------------+----------+ - * | Filter| 1| 1| 0| - * | Project| 1| 1| 0| - * | Scan parquet| 1| 1| 0| - * |WholeStageCodegen| 1| 1| 0| - * +-----------------+-------------------+------------------+----------+ - */ - // scalastyle:on filelinelengthchecker - // scalastyle:off filelinelengthchecker expectedOutput .append("=============================================================") @@ -312,7 +210,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan CoveringIndex [Col2#136,Col1#135]") + .append(" <----: +- *(1) FileScan Index filterIndex_1 [Col2#136,Col1#135]") .append(" Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -328,7 +226,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----+- *(1) FileScan CoveringIndex [Col2#136,Col1#135] " + + .append(" <----+- *(1) FileScan Index filterIndex_1 [Col2#136,Col1#135] " + "Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -517,7 +415,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan CoveringIndex [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Index joinIndex_1 [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -525,7 +423,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan CoveringIndex [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Index joinIndex_1 [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -570,33 +468,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*Scan CoveringIndex| 0| 2| 2|") + .append("|*Scan Index joinIndex_1| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| *WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+-------------------+-------------------+------------------+----------+") + .append("+-----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -656,7 +554,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") .append(displayMode.newLine) - .append(" " + displayMode.highlightTag.open ++ "+- FileScan CoveringIndex [Col2#,Col1#] ") + .append(" " + displayMode.highlightTag.open) + .append("+- FileScan Index filterIndex_1 [Col2#,Col1#] ") .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") From 40666b7cba36c45926d6d9c2231db23adf16a122 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 20 Jan 2021 15:15:59 +0900 Subject: [PATCH 3/8] Add comment --- .../hyperspace/index/plans/logical/IndexHadoopFsRelation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index 7824b07b6..64e31764e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -23,6 +23,9 @@ import org.apache.spark.sql.types.StructType import com.microsoft.hyperspace.index.IndexLogEntry +/** + * Wrapper class of HadoopFsRelation to indicate index application more explicitly in Plan string. + */ class IndexHadoopFsRelation( spark: SparkSession, index: IndexLogEntry, From fae517a64e79efdf62d76618b764e5df3ddc80f3 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 20 Jan 2021 19:02:49 +0900 Subject: [PATCH 4/8] Add comment2 --- .../scala/com/microsoft/hyperspace/index/IndexConstants.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 611746043..a6f186986 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -97,6 +97,7 @@ object IndexConstants { private[hyperspace] val LINEAGE_PROPERTY = "lineage" // Indicate whether the source file format is parquet. private[hyperspace] val HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY = "hasParquetAsSourceFormat" + // Index log version info. private[hyperspace] val INDEX_LOG_VERSION_PROPERTY = "indexLogVersionProperty" // Hyperspace allows users to use globbing patterns to create indexes on. E.g. if user wants to From 7d9edab367e584e6b401e3ab078bf3c7f91d21dd Mon Sep 17 00:00:00 2001 From: sezruby Date: Thu, 21 Jan 2021 13:47:33 +0900 Subject: [PATCH 5/8] Change plan string --- .../hyperspace/index/IndexLogEntry.scala | 1 + .../plans/logical/IndexHadoopFsRelation.scala | 3 +- .../index/plananalysis/ExplainTest.scala | 70 +++++++++---------- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 9cf07ed0d..4e2d2c5e2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -343,6 +343,7 @@ object FileInfo { // IndexLogEntry-specific CoveringIndex that represents derived dataset. case class CoveringIndex(properties: CoveringIndex.Properties) { val kind = "CoveringIndex" + val kindAbbr = "CI" } object CoveringIndex { case class Properties(columns: Properties.Columns, diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index 64e31764e..0580310d4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -44,6 +44,7 @@ class IndexHadoopFsRelation( options)(spark) { override def toString(): String = { - s"Index ${index.name}_${index.logVersion}" + s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " + + s"Name: ${index.name}, LogVersion: ${index.logVersion})" } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index b73c63388..a62b78428 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -90,7 +90,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan Index joinIndex_1 [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -98,7 +98,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan Index joinIndex_1 [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -143,33 +143,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*Scan Index joinIndex_1| 0| 2| 2|") + .append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| *WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -210,7 +210,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan Index filterIndex_1 [Col2#136,Col1#135]") + .append(" <----: +- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135]") .append(" Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -226,7 +226,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") .append(displayMode.newLine) - .append(" <----+- *(1) FileScan Index filterIndex_1 [Col2#136,Col1#135] " + + .append(" <----+- *(1) FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#136,Col1#135] " + "Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") @@ -366,7 +366,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan Index joinIndex_1 [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append(s"<----: +- *(1) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -374,7 +374,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan Index joinIndex_1 [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(s" <----+- *(2) FileScan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1) [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexFilePath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -419,33 +419,33 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *InputAdapter| 4| 2| -2|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*Scan Index joinIndex_1| 0| 2| 2|") + .append("|*Scan Hyperspace(Type: CI, Name: joinIndex, LogVersion: 1)| 0| 2| 2|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 0| -1|") + .append("| *Scan parquet| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| *WholeStageCodegen| 4| 3| -1|") + .append("| *WholeStageCodegen| 4| 3| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+-----------------------+-------------------+------------------+----------+") + .append("+----------------------------------------------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -481,7 +481,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") .append(displayMode.newLine) .append(" " + displayMode.highlightTag.open) - .append("+- FileScan Index filterIndex_1 [Col2#,Col1#] ") + .append("+- FileScan Hyperspace(Type: CI, Name: filterIndex, LogVersion: 1) [Col2#,Col1#] ") .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") From 34875febfde6b414011c312fd9814cf4d8119f02 Mon Sep 17 00:00:00 2001 From: sezruby Date: Thu, 21 Jan 2021 15:15:47 +0900 Subject: [PATCH 6/8] review commit --- src/main/scala/com/microsoft/hyperspace/actions/Action.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index 782481954..1ea6c8309 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -33,7 +33,7 @@ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, HyperspaceE */ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession { protected val baseId: Int = logManager.getLatestId().getOrElse(-1) - protected def endId: Int = baseId + 2 + final protected def endId: Int = baseId + 2 def logEntry: LogEntry @@ -58,7 +58,7 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession def op(): Unit private def end(): Unit = { - val newId = baseId + 2 + val newId = endId val entry = logEntry entry.state = finalState entry.id = newId From 66fba69a2bde8734dfa4cc5cc11958b8dcb352ce Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 22 Jan 2021 08:58:58 +0900 Subject: [PATCH 7/8] Review commit --- .../com/microsoft/hyperspace/actions/CreateAction.scala | 3 +-- .../com/microsoft/hyperspace/actions/CreateActionBase.scala | 5 ++--- .../com/microsoft/hyperspace/actions/OptimizeAction.scala | 6 ++---- .../com/microsoft/hyperspace/actions/RefreshAction.scala | 6 ++---- .../hyperspace/actions/RefreshIncrementalAction.scala | 4 ++-- .../com/microsoft/hyperspace/index/IndexLogEntry.scala | 4 ---- .../index/plans/logical/IndexHadoopFsRelation.scala | 2 +- 7 files changed, 10 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala index d081f4fca..c48fca27e 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala @@ -35,8 +35,7 @@ class CreateAction( dataManager: IndexDataManager) extends CreateActionBase(dataManager) with Action { - final override def logEntry: LogEntry = - getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) + final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) final override val transientState: String = CREATING diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 298af7e09..7bffbf2b4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -51,8 +51,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) spark: SparkSession, df: DataFrame, indexConfig: IndexConfig, - path: Path, - logVersionId: Int): IndexLogEntry = { + path: Path): IndexLogEntry = { val absolutePath = PathUtils.makeAbsolute(path) val numBuckets = numBucketsForIndex(spark) @@ -89,7 +88,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) coveringIndexProperties)), Content.fromDirectory(absolutePath, fileIdTracker), Source(SparkPlan(sourcePlanProperties)), - Map(IndexConstants.INDEX_LOG_VERSION_PROPERTY -> logVersionId.toString)) + Map()) case None => throw HyperspaceException("Invalid plan for creating an index.") } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index c81c3df91..2fca5ec1c 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -148,13 +148,11 @@ class OptimizeAction( val mergedContent = Content(newContent.root.merge(filesToIgnoreDirectory)) previousIndexLogEntry.copy( content = mergedContent, - properties = previousIndexLogEntry.properties + - (IndexConstants.INDEX_LOG_VERSION_PROPERTY -> super[Action].endId.toString)) + properties = previousIndexLogEntry.properties) } else { previousIndexLogEntry.copy( content = newContent, - properties = previousIndexLogEntry.properties + - (IndexConstants.INDEX_LOG_VERSION_PROPERTY -> super[Action].endId.toString)) + properties = previousIndexLogEntry.properties) } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala index edf1f3961..9ea64012a 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala @@ -34,11 +34,9 @@ class RefreshAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) - with Action { + extends RefreshActionBase(spark, logManager, dataManager) { - override def logEntry: LogEntry = - getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) + override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) final override def op(): Unit = write(spark, df, indexConfig) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala index ba5490a93..7f0198473 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala @@ -48,7 +48,7 @@ class RefreshIncrementalAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) with Action { + extends RefreshActionBase(spark, logManager, dataManager) { final override def op(): Unit = { logInfo( @@ -127,7 +127,7 @@ class RefreshIncrementalAction( * @return Refreshed index log entry. */ override def logEntry: LogEntry = { - val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath, super[Action].endId) + val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) // If there is no deleted files, there are index data files only for appended data in this // version and we need to add the index data files of previous index version. diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4e2d2c5e2..9274dbb40 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -543,10 +543,6 @@ case class IndexLogEntry( IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean } - def logVersion: String = { - properties.getOrElse(IndexConstants.INDEX_LOG_VERSION_PROPERTY, "undefined") - } - @JsonIgnore lazy val fileIdTracker: FileIdTracker = { val tracker = new FileIdTracker diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index 0580310d4..0bfe84d71 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -45,6 +45,6 @@ class IndexHadoopFsRelation( override def toString(): String = { s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " + - s"Name: ${index.name}, LogVersion: ${index.logVersion})" + s"Name: ${index.name}, LogVersion: ${index.id})" } } From bd9fa6330562a5d87d3afa8484ead06297a85ace Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 22 Jan 2021 09:17:26 +0900 Subject: [PATCH 8/8] Review commit2 --- .../scala/com/microsoft/hyperspace/actions/Action.scala | 3 +-- .../com/microsoft/hyperspace/actions/OptimizeAction.scala | 8 ++------ .../com/microsoft/hyperspace/index/IndexConstants.scala | 2 -- .../index/plans/logical/IndexHadoopFsRelation.scala | 8 ++++---- .../com/microsoft/hyperspace/index/rules/RuleUtils.scala | 8 ++------ 5 files changed, 9 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index 1ea6c8309..eb1143cde 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -33,7 +33,6 @@ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, HyperspaceE */ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession { protected val baseId: Int = logManager.getLatestId().getOrElse(-1) - final protected def endId: Int = baseId + 2 def logEntry: LogEntry @@ -58,7 +57,7 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession def op(): Unit private def end(): Unit = { - val newId = endId + val newId = baseId + 2 val entry = logEntry entry.state = finalState entry.id = newId diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index 2fca5ec1c..f2fb8575b 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -146,13 +146,9 @@ class OptimizeAction( Directory.fromLeafFiles(filesToIgnoreStatuses, fileIdTracker) } val mergedContent = Content(newContent.root.merge(filesToIgnoreDirectory)) - previousIndexLogEntry.copy( - content = mergedContent, - properties = previousIndexLogEntry.properties) + previousIndexLogEntry.copy(content = mergedContent) } else { - previousIndexLogEntry.copy( - content = newContent, - properties = previousIndexLogEntry.properties) + previousIndexLogEntry.copy(content = newContent) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index a6f186986..68b6fd98a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -97,8 +97,6 @@ object IndexConstants { private[hyperspace] val LINEAGE_PROPERTY = "lineage" // Indicate whether the source file format is parquet. private[hyperspace] val HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY = "hasParquetAsSourceFormat" - // Index log version info. - private[hyperspace] val INDEX_LOG_VERSION_PROPERTY = "indexLogVersionProperty" // Hyperspace allows users to use globbing patterns to create indexes on. E.g. if user wants to // create an index on "/temp/*/*", they can do so by setting this key to "/temp/*/*". If not set, diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index 0bfe84d71..d46b25fb6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -27,14 +27,12 @@ import com.microsoft.hyperspace.index.IndexLogEntry * Wrapper class of HadoopFsRelation to indicate index application more explicitly in Plan string. */ class IndexHadoopFsRelation( - spark: SparkSession, - index: IndexLogEntry, location: FileIndex, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], fileFormat: FileFormat, - options: Map[String, String]) + options: Map[String, String])(spark: SparkSession, index: IndexLogEntry) extends HadoopFsRelation( location, partitionSchema, @@ -43,8 +41,10 @@ class IndexHadoopFsRelation( fileFormat, options)(spark) { - override def toString(): String = { + val indexPlanStr: String = { s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " + s"Name: ${index.name}, LogVersion: ${index.id})" } + override def toString(): String = indexPlanStr + } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index b854c634f..9a224a8ae 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -252,14 +252,12 @@ object RuleUtils { val location = new InMemoryFileIndex(spark, index.content.files, Map(), None) val relation = new IndexHadoopFsRelation( - spark, - index, location, new StructType(), StructType(index.schema.filter(baseRelation.schema.contains(_))), if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER)) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name)) @@ -362,14 +360,12 @@ object RuleUtils { val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None) val relation = new IndexHadoopFsRelation( - spark, - index, newLocation, new StructType(), newSchema, if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, - Map(IndexConstants.INDEX_RELATION_IDENTIFIER)) + Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) val updatedOutput = baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name))