From fa80dbabb6fc65726efced12676fd8c66d96bf93 Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Tue, 12 Oct 2021 15:51:43 +0900 Subject: [PATCH 1/6] add spark session extension add configure to control enabling hyperspace add dummy rule to avoid different behavior of Extensions / apply hyperspace add test for hyperspace extension --- .../HyperspaceSparkSessionExtension.scala | 68 ++++++ .../hyperspace/index/IndexConstants.scala | 4 + .../index/rules/ApplyHyperspace.scala | 6 +- .../com/microsoft/hyperspace/package.scala | 43 ++-- .../hyperspace/util/HyperspaceConf.scala | 16 ++ .../hyperspace/HyperspaceExtensionTest.scala | 198 ++++++++++++++++++ .../index/E2EHyperspaceRulesTest.scala | 10 +- 7 files changed, 327 insertions(+), 18 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala new file mode 100644 index 000000000..b362adcfc --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -0,0 +1,68 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * An extension for Spark SQL to activate Hyperspace. + * + * Example to run a `spark-submit` with Hyperspace enabled: + * {{{ + * spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension + * }}} + * + * Example to create a `SparkSession` with Hyperspace enabled: + * {{{ + * val spark = SparkSession + * .builder() + * .appName("...") + * .master("...") + * .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension") + * .getOrCreate() + * }}} + */ + +class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { + + /** + * If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension, + * the order of applying rule is different from without HyperspaceExtension + * (i.e., explicitly calling enableHyperspace). To make behavior consistently, + * current implementation of HyperspaceExtension uses a trick to call enableHyperspace + * before rule is applied. Since the interface of injectOptimizerRule should return rule builder, + * it returns a dummy rule that does nothing. It may increase overhead slightly + * because enableHyperspace is called once for each evaluation of spark plan. + */ + private case class DummyRule() extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan + } + } + + type RuleBuilder = SparkSession => Rule[LogicalPlan] + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectOptimizerRule { sparkSession => + // Enable Hyperspace to leverage indexes. + sparkSession.enableHyperspace() + // Return a dummy rule to fit in interface of injectOptimizerRule + DummyRule() + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index b415fb548..c65c2e092 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -19,6 +19,10 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.internal.SQLConf object IndexConstants { + // If it is set as false, Hyperspace will not be applied. + val HYPERSPACE_APPLY_ENABLED = "spark.hyperspace.apply.enabled" + val HYPERSPACE_APPLY_ENABLED_DEFAULT = "true" + val INDEXES_DIR = "indexes" // Config used for setting the system path, which is considered as a "root" path for Hyperspace; diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index a9ae97acd..8c3024dd9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging +import com.microsoft.hyperspace.util.HyperspaceConf /** * Transform the given plan to use Hyperspace indexes. @@ -42,10 +43,13 @@ object ApplyHyperspace private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean] override def apply(plan: LogicalPlan): LogicalPlan = { - if (disableForIndexMaintenance.get) { + if (!HyperspaceConf.hyperspaceApplyEnabled(spark)) { return plan } + if (disableForIndexMaintenance.get) { + return plan + } val indexManager = Hyperspace .getContext(spark) .indexCollectionManager diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index fb57bdb35..597b553f8 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.SparkSession import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.ApplyHyperspace +import com.microsoft.hyperspace.util.HyperspaceConf package object hyperspace { @@ -29,42 +30,56 @@ package object hyperspace { implicit class Implicits(sparkSession: SparkSession) { /** - * Plug in Hyperspace-specific rules. + * Enable Hyperspace indexes. + * + * Plug in Hyperspace-specific rules and set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as true. * * @return a spark session that contains Hyperspace-specific rules. */ def enableHyperspace(): SparkSession = { - disableHyperspace - sparkSession.sessionState.experimentalMethods.extraOptimizations ++= - ApplyHyperspace :: Nil - sparkSession.sessionState.experimentalMethods.extraStrategies ++= - BucketUnionStrategy :: Nil + HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true) + + if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( + ApplyHyperspace)) { + sparkSession.sessionState.experimentalMethods.extraOptimizations ++= + ApplyHyperspace :: Nil + } + if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( + BucketUnionStrategy)) { + sparkSession.sessionState.experimentalMethods.extraStrategies ++= + BucketUnionStrategy :: Nil + } sparkSession } /** - * Plug out Hyperspace-specific rules. + * Disable Hyperspace indexes. + * + * Set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as false + * to stop applying Hyperspace indexes. * - * @return a spark session that does not contain Hyperspace-specific rules. + * @return a spark session that `IndexConstants.HYPERSPACE_APPLY_ENABLED` is set as false. */ def disableHyperspace(): SparkSession = { - val experimentalMethods = sparkSession.sessionState.experimentalMethods - experimentalMethods.extraOptimizations = - experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals) - experimentalMethods.extraStrategies = - experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals) + HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, false) sparkSession } /** * Checks if Hyperspace is enabled or not. * + * Note that Hyperspace is enabled when: + * 1) `ApplyHyperspace` exists in extraOptimization + * 2) `BucketUnionStrate` exists in BucketUnionStrategy and + * 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true. + * * @return true if Hyperspace is enabled or false otherwise. */ def isHyperspaceEnabled(): Boolean = { val experimentalMethods = sparkSession.sessionState.experimentalMethods experimentalMethods.extraOptimizations.contains(ApplyHyperspace) && - experimentalMethods.extraStrategies.contains(BucketUnionStrategy) + experimentalMethods.extraStrategies.contains(BucketUnionStrategy) && + HyperspaceConf.hyperspaceApplyEnabled(sparkSession) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 2fcc56519..8b071c439 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -25,6 +25,22 @@ import com.microsoft.hyperspace.index.IndexConstants * Helper class to extract Hyperspace-related configs from SparkSession. */ object HyperspaceConf { + + /** + * Returns the config value whether hyperspace is enabled or not. + */ + def hyperspaceApplyEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.HYPERSPACE_APPLY_ENABLED, + IndexConstants.HYPERSPACE_APPLY_ENABLED_DEFAULT) + .toBoolean + } + + def setHyperspaceApplyEnabled(spark: SparkSession, apply: Boolean): Unit = { + spark.conf.set(IndexConstants.HYPERSPACE_APPLY_ENABLED, apply.toString) + } + def hybridScanEnabled(spark: SparkSession): Boolean = { spark.conf .get( diff --git a/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala b/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala new file mode 100644 index 000000000..f51564d3c --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala @@ -0,0 +1,198 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.hyperspace + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} + +import com.microsoft.hyperspace.index.{Content, FileIdTracker, HyperspaceSuite, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY +import com.microsoft.hyperspace.util.FileUtils + +class HyperspaceExtensionTest extends HyperspaceSuite { + private val sampleDeptDataLocation = inTempDir("dept") + private val sampleEmpDataLocation = inTempDir("emp") + + private val departments = Seq( + (10, "Accounting", "New York"), + (20, "Research", "Dallas"), + (30, "Sales", "Chicago"), + (40, "Operations", "Boston")) + + private val employees = Seq( + (7369, "SMITH", 20), + (7499, "ALLEN", 30), + (7521, "WARD", 30), + (7566, "JONES", 20), + (7698, "BLAKE", 30), + (7782, "CLARK", 10), + (7788, "SCOTT", 20), + (7839, "KING", 10), + (7844, "TURNER", 30), + (7876, "ADAMS", 20), + (7900, "JAMES", 30), + (7934, "MILLER", 10), + (7902, "FORD", 20), + (7654, "MARTIN", 30)) + + override protected lazy val spark: SparkSession = SparkSession + .builder() + .master(s"local[$numParallelism]") + .config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") + .config("delta.log.cacheSize", "3") + .config("spark.databricks.delta.snapshotPartitions", "2") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config( + "spark.sql.extensions", + "io.delta.sql.DeltaSparkSessionExtension," + + "com.microsoft.hyperspace.HyperspaceSparkSessionExtension") + .config("spark.sql.shuffle.partitions", "5") + .config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5") + .config("spark.ui.enabled", "false") + .config("spark.ui.showConsoleProgress", "false") + .appName(suiteName) + .getOrCreate() + + override def beforeAll(): Unit = { + super.beforeAll() + + val sparkSession = spark + import sparkSession.implicits._ + FileUtils.delete(new Path(sampleDeptDataLocation)) + FileUtils.delete(new Path(sampleEmpDataLocation)) + + departments + .toDF("deptId", "deptName", "location") + .write + .mode("overwrite") + .parquet(sampleDeptDataLocation) + + employees + .toDF("empId", "empName", "deptId") + .write + .mode("overwrite") + .parquet(sampleEmpDataLocation) + } + + override def afterAll(): Unit = { + FileUtils.delete(new Path(sampleDeptDataLocation)) + FileUtils.delete(new Path(sampleEmpDataLocation)) + super.beforeAll() + } + + test("Verify ApplyHyperspace is used with hyperspace extension session") { + MockEventLogger.reset() + + val deptDF = spark.read.parquet(sampleDeptDataLocation) + val empDF = spark.read.parquet(sampleEmpDataLocation) + + val deptIndexConfig = IndexConfig("deptIndex", Seq("deptId"), Seq("deptName")) + val empIndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName")) + + // Create Hyperspace indexes. + val hs = new Hyperspace(spark) + hs.createIndex(deptDF, deptIndexConfig) + hs.createIndex(empDF, empIndexConfig) + + // Make sure new index is available to all. + assert(Hyperspace.getContext(spark).indexCollectionManager.indexes.count == 2) + + def filterQuery(): DataFrame = deptDF.filter("deptId == '30'").select("deptId", "deptName") + + verifyIndexUsage(filterQuery, getIndexFilesPath(deptIndexConfig.indexName)) + + def eqJoinQuery(): DataFrame = + empDF + .join(deptDF, empDF("deptId") === deptDF("deptId")) + .select(empDF("empName"), deptDF("deptName")) + + verifyIndexUsage( + eqJoinQuery, + getIndexFilesPath(deptIndexConfig.indexName) ++ getIndexFilesPath(empIndexConfig.indexName)) + } + + /** + * Verify that the query plan has the expected rootPaths. + * + * @param optimizedPlan the optimized query plan. + * @param expectedPaths the expected paths in the query plan. + */ + private def verifyQueryPlanHasExpectedRootPaths( + optimizedPlan: LogicalPlan, + expectedPaths: Seq[Path]): Unit = { + assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName)) + } + + /** + * Get all rootPaths from a query plan. + * + * @param optimizedPlan the optimized query plan. + * @return a sequence of [[Path]]. + */ + private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = { + optimizedPlan.collect { + case LogicalRelation( + HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + location.rootPaths + }.flatten + } + + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { + versions.flatMap { v => + Content + .fromDirectory( + new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"), + new FileIdTracker, + new Configuration) + .files + } + } + + /** + * Gets the sorted rows from the given dataframe to make it easy to compare with + * other dataframe. + * + * @param df dataframe to collect rows from. + * @return sorted rows. + */ + private def getSortedRows(df: DataFrame): Array[Row] = { + df.orderBy(df.columns.head, df.columns.tail: _*).collect() + } + + private def verifyIndexUsage(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = { + spark.disableHyperspace() + val dfWithHyperspaceDisabled = f() + val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema + val sortedRowsWithHyperspaceDisabled = getSortedRows(dfWithHyperspaceDisabled) + + spark.enableHyperspace() + val dfWithHyperspaceEnabled = f() + + verifyQueryPlanHasExpectedRootPaths( + dfWithHyperspaceEnabled.queryExecution.optimizedPlan, + expectedRootPaths) + + assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema)) + assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled))) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index ec1c2841f..ae66ccb15 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -33,7 +33,7 @@ import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndexConfig import com.microsoft.hyperspace.index.dataskipping.sketches.MinMaxSketch import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector} -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils} class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { private val testDir = inTempDir("e2eTests") @@ -91,12 +91,16 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { spark.sessionState.experimentalMethods.extraStrategies .containsSlice(expectedOptimizationStrategy)) + // Since applyHyperspace is called before, extraOptimization contains ApplyHyperspace + // This behavior has changed according to following discussion: + // https://github.com/microsoft/hyperspace/pull/504/files#r740278070 spark.disableHyperspace() + assert(!HyperspaceConf.hyperspaceApplyEnabled(spark)) assert( - !spark.sessionState.experimentalMethods.extraOptimizations + spark.sessionState.experimentalMethods.extraOptimizations .containsSlice(expectedOptimizationRuleBatch)) assert( - !spark.sessionState.experimentalMethods.extraStrategies + spark.sessionState.experimentalMethods.extraStrategies .containsSlice(expectedOptimizationStrategy)) } From 1d96573489120d220570d6fc636edd42db14a5da Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Wed, 10 Nov 2021 10:38:35 +0900 Subject: [PATCH 2/6] update according to the reviews --- .../hyperspace/HyperspaceSparkSessionExtension.scala | 5 ++--- .../microsoft/hyperspace/index/rules/ApplyHyperspace.scala | 5 +---- src/main/scala/com/microsoft/hyperspace/package.scala | 7 +++++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala index b362adcfc..3df19853e 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.rules.Rule * .getOrCreate() * }}} */ - class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { /** @@ -50,7 +49,7 @@ class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { * it returns a dummy rule that does nothing. It may increase overhead slightly * because enableHyperspace is called once for each evaluation of spark plan. */ - private case class DummyRule() extends Rule[LogicalPlan] { + private case object DummyRule extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan } @@ -62,7 +61,7 @@ class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { // Enable Hyperspace to leverage indexes. sparkSession.enableHyperspace() // Return a dummy rule to fit in interface of injectOptimizerRule - DummyRule() + DummyRule } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index 8c3024dd9..60967c1ad 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -43,13 +43,10 @@ object ApplyHyperspace private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean] override def apply(plan: LogicalPlan): LogicalPlan = { - if (!HyperspaceConf.hyperspaceApplyEnabled(spark)) { + if (!HyperspaceConf.hyperspaceApplyEnabled(spark) || disableForIndexMaintenance.get) { return plan } - if (disableForIndexMaintenance.get) { - return plan - } val indexManager = Hyperspace .getContext(spark) .indexCollectionManager diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index 597b553f8..06e4a23a1 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -38,7 +38,11 @@ package object hyperspace { */ def enableHyperspace(): SparkSession = { HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true) + addOptimizationsIfNeeded() + sparkSession + } + private def addOptimizationsIfNeeded(): Unit = { if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( ApplyHyperspace)) { sparkSession.sessionState.experimentalMethods.extraOptimizations ++= @@ -49,7 +53,6 @@ package object hyperspace { sparkSession.sessionState.experimentalMethods.extraStrategies ++= BucketUnionStrategy :: Nil } - sparkSession } /** @@ -70,7 +73,7 @@ package object hyperspace { * * Note that Hyperspace is enabled when: * 1) `ApplyHyperspace` exists in extraOptimization - * 2) `BucketUnionStrate` exists in BucketUnionStrategy and + * 2) `BucketUnionStrate` exists in extraStrategies and * 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true. * * @return true if Hyperspace is enabled or false otherwise. From 4bb381dc546d52c4d696c4407018542337e0e1fd Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Wed, 10 Nov 2021 14:00:20 +0900 Subject: [PATCH 3/6] refactor enableHyperspace --- .../HyperspaceSparkSessionExtension.scala | 21 +++++++++++++++++-- .../com/microsoft/hyperspace/package.scala | 16 ++------------ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala index 3df19853e..73ec56235 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -20,6 +20,9 @@ import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import com.microsoft.hyperspace.index.execution.BucketUnionStrategy +import com.microsoft.hyperspace.index.rules.ApplyHyperspace + /** * An extension for Spark SQL to activate Hyperspace. * @@ -55,13 +58,27 @@ class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { } } - type RuleBuilder = SparkSession => Rule[LogicalPlan] override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectOptimizerRule { sparkSession => // Enable Hyperspace to leverage indexes. - sparkSession.enableHyperspace() + HyperspaceSparkSessionExtension.addOptimizationsIfNeeded(sparkSession) // Return a dummy rule to fit in interface of injectOptimizerRule DummyRule } } } + +object HyperspaceSparkSessionExtension { + def addOptimizationsIfNeeded(sparkSession: SparkSession): Unit = { + if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( + ApplyHyperspace)) { + sparkSession.sessionState.experimentalMethods.extraOptimizations ++= + ApplyHyperspace :: Nil + } + if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( + BucketUnionStrategy)) { + sparkSession.sessionState.experimentalMethods.extraStrategies ++= + BucketUnionStrategy :: Nil + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index 06e4a23a1..671f32c20 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -18,6 +18,7 @@ package com.microsoft import org.apache.spark.sql.SparkSession +import com.microsoft.hyperspace.HyperspaceSparkSessionExtension import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.ApplyHyperspace import com.microsoft.hyperspace.util.HyperspaceConf @@ -38,23 +39,10 @@ package object hyperspace { */ def enableHyperspace(): SparkSession = { HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true) - addOptimizationsIfNeeded() + HyperspaceSparkSessionExtension.addOptimizationsIfNeeded(sparkSession) sparkSession } - private def addOptimizationsIfNeeded(): Unit = { - if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( - ApplyHyperspace)) { - sparkSession.sessionState.experimentalMethods.extraOptimizations ++= - ApplyHyperspace :: Nil - } - if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( - BucketUnionStrategy)) { - sparkSession.sessionState.experimentalMethods.extraStrategies ++= - BucketUnionStrategy :: Nil - } - } - /** * Disable Hyperspace indexes. * From d8a9c3435ddf186a5ca8afee7a0c7f249682fc37 Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Wed, 10 Nov 2021 14:24:35 +0900 Subject: [PATCH 4/6] add comments --- .../hyperspace/HyperspaceSparkSessionExtension.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala index 73ec56235..9fbd495fb 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -69,6 +69,13 @@ class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { } object HyperspaceSparkSessionExtension { + + /** + * Add ApplyHyperspace and BucketUnionStrategy into extraOptimization + * and extraStrategies, respectively, to make Spark can use Hyperspace. + * + * @param sparkSession Spark session that will use Hyperspace + */ def addOptimizationsIfNeeded(sparkSession: SparkSession): Unit = { if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( ApplyHyperspace)) { From 20b824d6ae25927ece35948cc5f93b4bbeec09a9 Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Mon, 15 Nov 2021 11:31:01 +0900 Subject: [PATCH 5/6] move helper function to package.scala --- .../HyperspaceSparkSessionExtension.scala | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala index 9fbd495fb..0edee450b 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -61,31 +61,9 @@ class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectOptimizerRule { sparkSession => // Enable Hyperspace to leverage indexes. - HyperspaceSparkSessionExtension.addOptimizationsIfNeeded(sparkSession) + sparkSession.addOptimizationsIfNeeded() // Return a dummy rule to fit in interface of injectOptimizerRule DummyRule } } } - -object HyperspaceSparkSessionExtension { - - /** - * Add ApplyHyperspace and BucketUnionStrategy into extraOptimization - * and extraStrategies, respectively, to make Spark can use Hyperspace. - * - * @param sparkSession Spark session that will use Hyperspace - */ - def addOptimizationsIfNeeded(sparkSession: SparkSession): Unit = { - if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( - ApplyHyperspace)) { - sparkSession.sessionState.experimentalMethods.extraOptimizations ++= - ApplyHyperspace :: Nil - } - if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( - BucketUnionStrategy)) { - sparkSession.sessionState.experimentalMethods.extraStrategies ++= - BucketUnionStrategy :: Nil - } - } -} From 4d2f1b85569af2cd589a1ea24a1a08aa52629d46 Mon Sep 17 00:00:00 2001 From: Yoonjae Park Date: Mon, 15 Nov 2021 11:31:21 +0900 Subject: [PATCH 6/6] move helper function to package.scala --- .../com/microsoft/hyperspace/package.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index 671f32c20..7d7caa4e4 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -39,7 +39,7 @@ package object hyperspace { */ def enableHyperspace(): SparkSession = { HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true) - HyperspaceSparkSessionExtension.addOptimizationsIfNeeded(sparkSession) + addOptimizationsIfNeeded() sparkSession } @@ -72,5 +72,24 @@ package object hyperspace { experimentalMethods.extraStrategies.contains(BucketUnionStrategy) && HyperspaceConf.hyperspaceApplyEnabled(sparkSession) } + + /** + * Add ApplyHyperspace and BucketUnionStrategy into extraOptimization + * and extraStrategies, respectively, to make Spark can use Hyperspace. + * + * @param sparkSession Spark session that will use Hyperspace + */ + private[hyperspace] def addOptimizationsIfNeeded(): Unit = { + if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( + ApplyHyperspace)) { + sparkSession.sessionState.experimentalMethods.extraOptimizations ++= + ApplyHyperspace :: Nil + } + if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( + BucketUnionStrategy)) { + sparkSession.sessionState.experimentalMethods.extraStrategies ++= + BucketUnionStrategy :: Nil + } + } } }