Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace

/**
* An extension for Spark SQL to activate Hyperspace.
*
* Example to run a `spark-submit` with Hyperspace enabled:
* {{{
* spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension
* }}}
*
* Example to create a `SparkSession` with Hyperspace enabled:
* {{{
* val spark = SparkSession
* .builder()
* .appName("...")
* .master("...")
* .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
* .getOrCreate()
* }}}
*/
class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) {

/**
* If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension,
* the order of applying rule is different from without HyperspaceExtension
* (i.e., explicitly calling enableHyperspace). To make behavior consistently,
* current implementation of HyperspaceExtension uses a trick to call enableHyperspace
* before rule is applied. Since the interface of injectOptimizerRule should return rule builder,
* it returns a dummy rule that does nothing. It may increase overhead slightly
* because enableHyperspace is called once for each evaluation of spark plan.
*/
private case object DummyRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan
}
}

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule { sparkSession =>
// Enable Hyperspace to leverage indexes.
sparkSession.addOptimizationsIfNeeded()
// Return a dummy rule to fit in interface of injectOptimizerRule
DummyRule
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.internal.SQLConf

object IndexConstants {
// If it is set as false, Hyperspace will not be applied.
val HYPERSPACE_APPLY_ENABLED = "spark.hyperspace.apply.enabled"
val HYPERSPACE_APPLY_ENABLED_DEFAULT = "true"

val INDEXES_DIR = "indexes"

// Config used for setting the system path, which is considered as a "root" path for Hyperspace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
import com.microsoft.hyperspace.util.HyperspaceConf

/**
* Transform the given plan to use Hyperspace indexes.
Expand All @@ -42,7 +43,7 @@ object ApplyHyperspace
private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean]

override def apply(plan: LogicalPlan): LogicalPlan = {
if (disableForIndexMaintenance.get) {
if (!HyperspaceConf.hyperspaceApplyEnabled(spark) || disableForIndexMaintenance.get) {
return plan
}

Expand Down
53 changes: 39 additions & 14 deletions src/main/scala/com/microsoft/hyperspace/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package com.microsoft

import org.apache.spark.sql.SparkSession

import com.microsoft.hyperspace.HyperspaceSparkSessionExtension
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.util.HyperspaceConf

package object hyperspace {

Expand All @@ -29,42 +31,65 @@ package object hyperspace {
implicit class Implicits(sparkSession: SparkSession) {

/**
* Plug in Hyperspace-specific rules.
* Enable Hyperspace indexes.
*
* Plug in Hyperspace-specific rules and set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as true.
*
* @return a spark session that contains Hyperspace-specific rules.
*/
def enableHyperspace(): SparkSession = {
disableHyperspace
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true)
addOptimizationsIfNeeded()
sparkSession
}

/**
* Plug out Hyperspace-specific rules.
* Disable Hyperspace indexes.
*
* Set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as false
* to stop applying Hyperspace indexes.
*
* @return a spark session that does not contain Hyperspace-specific rules.
* @return a spark session that `IndexConstants.HYPERSPACE_APPLY_ENABLED` is set as false.
*/
def disableHyperspace(): SparkSession = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations =
experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals)
experimentalMethods.extraStrategies =
experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals)
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, false)
sparkSession
}

/**
* Checks if Hyperspace is enabled or not.
*
* Note that Hyperspace is enabled when:
* 1) `ApplyHyperspace` exists in extraOptimization
* 2) `BucketUnionStrate` exists in extraStrategies and
* 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true.
*
* @return true if Hyperspace is enabled or false otherwise.
*/
def isHyperspaceEnabled(): Boolean = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations.contains(ApplyHyperspace) &&
experimentalMethods.extraStrategies.contains(BucketUnionStrategy)
experimentalMethods.extraStrategies.contains(BucketUnionStrategy) &&
HyperspaceConf.hyperspaceApplyEnabled(sparkSession)
}

/**
* Add ApplyHyperspace and BucketUnionStrategy into extraOptimization
* and extraStrategies, respectively, to make Spark can use Hyperspace.
*
* @param sparkSession Spark session that will use Hyperspace
*/
private[hyperspace] def addOptimizationsIfNeeded(): Unit = {
if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains(
ApplyHyperspace)) {
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
}
if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains(
BucketUnionStrategy)) {
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
}
}
}
}
16 changes: 16 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import com.microsoft.hyperspace.index.IndexConstants
* Helper class to extract Hyperspace-related configs from SparkSession.
*/
object HyperspaceConf {

/**
* Returns the config value whether hyperspace is enabled or not.
*/
def hyperspaceApplyEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.HYPERSPACE_APPLY_ENABLED,
IndexConstants.HYPERSPACE_APPLY_ENABLED_DEFAULT)
.toBoolean
}

def setHyperspaceApplyEnabled(spark: SparkSession, apply: Boolean): Unit = {
spark.conf.set(IndexConstants.HYPERSPACE_APPLY_ENABLED, apply.toString)
}

def hybridScanEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
Expand Down
Loading