Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ object FileInfo {
// IndexLogEntry-specific CoveringIndex that represents derived dataset.
case class CoveringIndex(properties: CoveringIndex.Properties) {
val kind = "CoveringIndex"
val kindAbbr = "CI"
}
object CoveringIndex {
case class Properties(columns: Properties.Columns,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.plans.logical

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.IndexLogEntry

/**
* Wrapper class of HadoopFsRelation to indicate index application more explicitly in Plan string.
*/
class IndexHadoopFsRelation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason for choosing class over case class, same as HadoopFsRelation (since it's a wrapper, I would think both are of same type)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because case-to-case inheritance is not proper in Scala.

location: FileIndex,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String])(spark: SparkSession, index: IndexLogEntry)
extends HadoopFsRelation(
location,
partitionSchema,
dataSchema,
bucketSpec,
fileFormat,
options)(spark) {

val indexPlanStr: String = {
s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " +
s"Name: ${index.name}, LogVersion: ${index.id})"
}
override def toString(): String = indexPlanStr

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{LongType, StructType}

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.util.HyperspaceConf

object RuleUtils {
Expand Down Expand Up @@ -249,13 +249,13 @@ object RuleUtils {
case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) =>
val location =
new InMemoryFileIndex(spark, index.content.files, Map(), None)
val relation = HadoopFsRelation(
val relation = new IndexHadoopFsRelation(
location,
new StructType(),
StructType(index.schema.filter(baseRelation.schema.contains(_))),
if (useBucketSpec) Some(index.bucketSpec) else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)

val updatedOutput =
baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name))
Expand Down Expand Up @@ -357,13 +357,13 @@ object RuleUtils {
IndexConstants.DATA_FILE_NAME_ID))))

val newLocation = new InMemoryFileIndex(spark, filesToRead, Map(), None)
val relation = HadoopFsRelation(
val relation = new IndexHadoopFsRelation(
newLocation,
new StructType(),
newSchema,
if (useBucketSpec) Some(index.bucketSpec) else None,
new ParquetFileFormat,
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)
Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index)

val updatedOutput =
baseOutput.filter(attr => relation.schema.fieldNames.contains(attr.name))
Expand Down
Loading