Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b0771bf
initial commit
apoorvedave1 Oct 8, 2020
bd89f91
Merge branch 'master' into errorhandling
apoorvedave1 Oct 8, 2020
37c0dc8
test name fix
apoorvedave1 Oct 8, 2020
739d474
Update RefreshIndexTests.scala
apoorvedave1 Oct 8, 2020
ad2e9df
initial commit part 2
apoorvedave1 Oct 8, 2020
4d27fe2
review comments
apoorvedave1 Oct 8, 2020
58aea9d
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
1c4ae1c
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
87ff7b7
doc
apoorvedave1 Oct 8, 2020
300703b
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
e31f171
build failure fix
apoorvedave1 Oct 8, 2020
47c6730
add unit test for "appended"files update
apoorvedave1 Oct 8, 2020
7b4a6a8
test fix
apoorvedave1 Oct 8, 2020
f8766c0
review comments
apoorvedave1 Oct 8, 2020
5a171f9
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
58f1339
set logger at session initialization in test cases
apoorvedave1 Oct 8, 2020
4dc0ab6
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
2c55d86
move mock logger setting to beginning of all SparkInvolvedSuite tests
Oct 9, 2020
138e96a
refactoring
apoorvedave1 Oct 9, 2020
767513f
explicit assert in test
apoorvedave1 Oct 9, 2020
7338de5
review comets
Oct 9, 2020
7f94a1d
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
Oct 9, 2020
29ca9b3
test bug fixes
Oct 9, 2020
5db1125
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 9, 2020
9d26ddc
whitespace fix
Oct 9, 2020
c7816b6
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
Oct 9, 2020
b43b29e
add refresh-append test for updating delted files
Oct 9, 2020
7ebda9a
Merge remote-tracking branch 'upstream/master' into bugfixes
Oct 9, 2020
abbc59f
Merge branch 'master' into bugfixes
apoorvedave1 Oct 9, 2020
8f55e66
bug fix in updating index log entry after refresh append and deleted
apoorvedave1 Oct 9, 2020
286ce74
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 9, 2020
49f34de
remove comment
apoorvedave1 Oct 9, 2020
3872fd9
initial commit
apoorvedave1 Oct 9, 2020
f5927ac
review comments
apoorvedave1 Oct 10, 2020
521edcb
rename test path
apoorvedave1 Oct 10, 2020
97099ef
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 10, 2020
2080edb
add test for "deletedFiles" in entry
apoorvedave1 Oct 10, 2020
3af06eb
unit tests added
apoorvedave1 Oct 10, 2020
607ae9e
Merge remote-tracking branch 'upstream/master' into bug_193
apoorvedave1 Oct 10, 2020
315b36d
review comments, test fixes
apoorvedave1 Oct 10, 2020
42de97a
rephrase test name
apoorvedave1 Oct 10, 2020
6d1f0fc
autoformat result
apoorvedave1 Oct 10, 2020
03f23da
review comments
apoorvedave1 Oct 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ object RuleUtils {
indexes.filter(index =>
index.created && isHybridScanCandidate(index, filesByRelations.flatten))
} else {
indexes.filter(index => index.created && signatureValid(index))
indexes.filter(
index =>
index.created && signatureValid(index) &&
index.deletedFiles.isEmpty && index.appendedFiles.isEmpty)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be ok but might cause some bad user experience:

  1. refreshIncremental
  2. failed to apply index
  3. no idea or clue for the failure until the user checks index log entry ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sezruby for bringing this up, i think that's ok. This behavior is expected. I think this experience is exactly same as the following:

  1. user creates index
  2. user updates source data
  3. fail to apply index
  4. no idea or clue for the failure until the user checks index log entry ..

Please feel free to suggest how to improve this experience. We can create an issue for the same and fix it in subsequent PRs.

On the other hand, if we don't do this, it will lead to incorrect results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user experience currently is not good either. For example, with hybrid scan off, if you add a file, it will fail in signatureValid without letting the user know. We have been talking about exposing "why not" API that tells the user why an index was not picked up. Until we introduce that kind of API, I think the user experience will remain "not desirable" - meaning not good. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @apoorvedave1 beat me on this comment 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 @apoorvedave1
This is why I suggested quick failure (assert) rather than storing deletedFiles but maybe we could hybrid scan with it later.

Now we are proposing "mutable" dataset, I think we need to address this issue with more care.

Anyway I'm fine with the current approach as it's also valid :)

Thanks all!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, in this case, I still don't understand 😄. @rapoth do you understand this scenario?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let me try again 😁

  1. disable "global" hybrid scan config
  2. user can choose some "beneficial - with high perf improvement" indexes with little diff and do quick refresh for the indexes
  3. hybrid scan can be performed for quick refreshed indexes even if the global config is disabled. The other unrefreshed indexes with diff won't be the candidates for hybrid scan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 👍 . So in a nutshell, the selective hybrid scan is "we can apply the hybrid scan even if the user didn't enable it only when the signature matches", right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. That's the point 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite {
scanNode = LogicalRelation(relation, Seq(c1, c2, c3, c4), None, false)

val indexPlan = Project(Seq(c1, c2, c3), scanNode)
createIndex(indexName1, Seq(c3, c2), Seq(c1), indexPlan)
createIndexLogEntry(indexName1, Seq(c3, c2), Seq(c1), indexPlan)

val index2Plan = Project(Seq(c1, c2, c3, c4), scanNode)
createIndex(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan)
createIndexLogEntry(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan)
}

before {
Expand Down Expand Up @@ -134,9 +134,7 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite {
// Mark the relation that the rule is applied and verify the plan does not change.
val newPlan = plan transform {
case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) =>
r.copy(
relation =
h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark))
r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark))
}
assert(FilterIndexRule(newPlan).equals(newPlan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import com.microsoft.hyperspace.index.Hdfs.Properties

trait HyperspaceRuleTestSuite extends HyperspaceSuite {
private val filenames = Seq("f1.parquet", "f2.parquet")
def createIndex(
def createIndexLogEntry(
name: String,
indexCols: Seq[AttributeReference],
includedCols: Seq[AttributeReference],
plan: LogicalPlan): IndexLogEntry = {
plan: LogicalPlan,
appendedFiles: Seq[String] = Seq(),
deletedFiles: Seq[String] = Seq()): IndexLogEntry = {
val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName

LogicalPlanSignatureProvider.create(signClass).signature(plan) match {
Expand All @@ -43,7 +45,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite {
Seq(
Relation(
Seq("dummy"),
Hdfs(Properties(Content(Directory("/")))),
Hdfs(Properties(Content(Directory("/")), appendedFiles, deletedFiles)),
"schema",
"format",
Map())),
Expand All @@ -69,7 +71,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite {

val logManager = new IndexLogManagerImpl(getIndexRootPath(name))
indexLogEntry.state = Constants.States.ACTIVE
logManager.writeLog(0, indexLogEntry)
assert(logManager.writeLog(0, indexLogEntry))
indexLogEntry

case None => throw HyperspaceException("Invalid plan for index dataFrame.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper {
// +- Filter isnotnull(t2c1#4)
// +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet

createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode)
createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode)
createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode)
createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode)
createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode)
createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
}

before {
Expand Down Expand Up @@ -208,8 +208,10 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper {

{
// Test: should update plan if index exists to cover all implicit columns
val t1TestIndex = createIndex("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode)
val t2TestIndex = createIndex("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode)
val t1TestIndex =
createIndexLogEntry("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode)
val t2TestIndex =
createIndexLogEntry("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode)

// clear cache so the new indexes gets added to it
clearCache()
Expand Down Expand Up @@ -406,9 +408,7 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper {
// Mark the relation that the rule is applied and verify the plan does not change.
val newPlan = plan transform {
case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) =>
r.copy(
relation =
h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark))
r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark))
}
assert(JoinIndexRule(newPlan).equals(newPlan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull}
import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project, RepartitionByExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
import org.apache.spark.sql.types.{IntegerType, StringType}

import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants}
import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}

class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
Expand Down Expand Up @@ -79,11 +80,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
// +- Filter isnotnull(t2c1#4)
// +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet

createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode)
createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode)
createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode)
createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode)
createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode)
createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode)
createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
}

test("Verify indexes are matched by signature correctly.") {
Expand Down Expand Up @@ -280,6 +281,19 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
}
}

test(
"RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " +
"'appendedFiles' are not usable indexes if hybrid scan is disabled.") {
withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") {
val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode)
val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq("f1"))
val entry3 = entry1.withAppendedAndDeletedFiles(Seq("f2"), Seq())
val usableIndexes =
RuleUtils.getCandidateIndexes(spark, Seq(entry1, entry2, entry3), t1ProjectNode)
assert(usableIndexes.equals(Seq(entry1)))
}
}

private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
val r = RuleUtils.getLogicalRelation(plan)
assert(r.isDefined)
Expand Down