Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
4d002cd
add delete support for index refresh
Aug 26, 2020
f8f72f2
Merge branch 'master' into pouriap/refreshDelete
Aug 26, 2020
aed6255
Merge branch 'master' into pouriap/refreshDelete
Aug 31, 2020
bf98425
add delete support to refresh index
Sep 2, 2020
fcb4f95
add delete support to refresh index
Sep 2, 2020
ba55f30
add delete support to refresh index
Sep 2, 2020
a15b71d
add delete support to refresh index
Sep 2, 2020
70d9416
fix index manager test case
Sep 3, 2020
0b26209
fix index content refresh
Sep 3, 2020
3cc1e0f
Merge branch 'master' into pouriap/refreshDelete
Sep 3, 2020
bfa0543
Merge branch 'master' into pouriap/refreshDelete
Sep 9, 2020
66fa8fb
fix merge conflicts
Sep 10, 2020
2a62d1d
refactor refresh code and add refresh delete
Sep 10, 2020
9916c25
check for lineage during refresh delete.
Sep 10, 2020
d808a13
fix refresh delete test
Sep 10, 2020
096546e
Merge branch 'master' of github.com:apoorvedave1/hyperspace-1 into re…
apoorvedave1 Sep 14, 2020
7e516df
add merge functionality with test
apoorvedave1 Sep 14, 2020
ed6d49c
initial commit for refresh incremental support (append only)
apoorvedave1 Sep 14, 2020
2308ca9
comment fix
apoorvedave1 Sep 15, 2020
db46723
Delete RefreshIncrementalAction.scala
apoorvedave1 Sep 15, 2020
bbeacdd
Merge branch 'master' into refreshAppend
apoorvedave1 Sep 15, 2020
a657556
Merge remote-tracking branch 'upstream/master' into refreshAppend
apoorvedave1 Sep 16, 2020
3cbfea5
add unit test to show refresh works fine
apoorvedave1 Sep 16, 2020
8e8da80
Merge remote-tracking branch 'upstream/master' into refreshAppend
apoorvedave1 Sep 16, 2020
c3de8ca
Merge branch 'master' into refreshAppend
apoorvedave1 Sep 17, 2020
0c0c376
add sort node verification to the test
apoorvedave1 Sep 17, 2020
fed6786
Merge remote-tracking branch 'upstream/master' into refreshAppend
apoorvedave1 Oct 1, 2020
eb0c29a
Update src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest…
apoorvedave1 Oct 1, 2020
5e25290
Update src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest…
apoorvedave1 Oct 1, 2020
3347c6d
Update src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest…
apoorvedave1 Oct 1, 2020
870b2f7
Update src/main/scala/com/microsoft/hyperspace/actions/RefreshIncreme…
apoorvedave1 Oct 1, 2020
7ec82e4
Update src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRul…
apoorvedave1 Oct 1, 2020
76714a6
test bug fix
apoorvedave1 Oct 1, 2020
0bc6a51
Merge branch 'refreshAppend' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 1, 2020
dbb7ccc
Trigger Build
Oct 1, 2020
ffb7b90
build fix
Oct 1, 2020
4d73c9a
cleanup unnecessary test file
Oct 1, 2020
b6e43af
merge initial
apoorvedave1 Oct 5, 2020
1d287fb
review comments
apoorvedave1 Oct 5, 2020
1013615
review comments in tests
apoorvedave1 Oct 5, 2020
424c5ec
nit comments
apoorvedave1 Oct 5, 2020
d283116
test cleanup
apoorvedave1 Oct 5, 2020
89380a2
downmerge from refreshAppend
apoorvedave1 Oct 5, 2020
c0c6fda
initial add validation in refresh that no deleted files remain
apoorvedave1 Oct 6, 2020
25ee26b
review comments and refactoring
apoorvedave1 Oct 6, 2020
cb1039c
add validation in refresh incremental
apoorvedave1 Oct 6, 2020
0724ee2
add test case for failing for no new appended files
apoorvedave1 Oct 6, 2020
e35419f
trigger build
apoorvedave1 Oct 6, 2020
ee2a02f
Update RefreshIncrementalAction.scala
apoorvedave1 Oct 6, 2020
e82f3fd
Merge branch 'refreshAppend' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 6, 2020
d1946c0
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 7, 2020
c15c9cb
fix test cases with new behavior
apoorvedave1 Oct 7, 2020
3d18298
add "appended" and "deleted" files to metadata
apoorvedave1 Oct 7, 2020
af3284a
bug fixes for refresh append and delete to work together for deleted …
apoorvedave1 Oct 8, 2020
79ddb79
trigger build
apoorvedave1 Oct 8, 2020
10723e2
minor fix
apoorvedave1 Oct 8, 2020
b0771bf
initial commit
apoorvedave1 Oct 8, 2020
bd89f91
Merge branch 'master' into errorhandling
apoorvedave1 Oct 8, 2020
84b910f
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 8, 2020
37c0dc8
test name fix
apoorvedave1 Oct 8, 2020
739d474
Update RefreshIndexTests.scala
apoorvedave1 Oct 8, 2020
ad2e9df
initial commit part 2
apoorvedave1 Oct 8, 2020
4d27fe2
review comments
apoorvedave1 Oct 8, 2020
58aea9d
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
1c4ae1c
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
87ff7b7
doc
apoorvedave1 Oct 8, 2020
300703b
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
e31f171
build failure fix
apoorvedave1 Oct 8, 2020
47c6730
add unit test for "appended"files update
apoorvedave1 Oct 8, 2020
7b4a6a8
test fix
apoorvedave1 Oct 8, 2020
f8766c0
review comments
apoorvedave1 Oct 8, 2020
5a171f9
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
58f1339
set logger at session initialization in test cases
apoorvedave1 Oct 8, 2020
4dc0ab6
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 8, 2020
2c55d86
move mock logger setting to beginning of all SparkInvolvedSuite tests
Oct 9, 2020
138e96a
refactoring
apoorvedave1 Oct 9, 2020
767513f
explicit assert in test
apoorvedave1 Oct 9, 2020
7338de5
review comets
Oct 9, 2020
7f94a1d
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
Oct 9, 2020
29ca9b3
test bug fixes
Oct 9, 2020
5db1125
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
apoorvedave1 Oct 9, 2020
9d26ddc
whitespace fix
Oct 9, 2020
c7816b6
Merge branch 'errorhandling' of github.com:apoorvedave1/hyperspace-1 …
Oct 9, 2020
b43b29e
add refresh-append test for updating delted files
Oct 9, 2020
7ebda9a
Merge remote-tracking branch 'upstream/master' into bugfixes
Oct 9, 2020
c9b566e
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 9, 2020
22aa8fb
Merge branch 'master' into merge
apoorvedave1 Oct 9, 2020
abbc59f
Merge branch 'master' into bugfixes
apoorvedave1 Oct 9, 2020
8f55e66
bug fix in updating index log entry after refresh append and deleted
apoorvedave1 Oct 9, 2020
286ce74
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 9, 2020
1ca7fc9
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 9, 2020
1024b82
Merge branch 'merge' of github.com:apoorvedave1/hyperspace-1 into merge
apoorvedave1 Oct 9, 2020
49f34de
remove comment
apoorvedave1 Oct 9, 2020
e7da7de
fix scalastyle
apoorvedave1 Oct 9, 2020
3872fd9
initial commit
apoorvedave1 Oct 9, 2020
f5927ac
review comments
apoorvedave1 Oct 10, 2020
521edcb
rename test path
apoorvedave1 Oct 10, 2020
97099ef
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 10, 2020
c807246
Merge branch 'bugfixes' of github.com:apoorvedave1/hyperspace-1 into …
apoorvedave1 Oct 10, 2020
2080edb
add test for "deletedFiles" in entry
apoorvedave1 Oct 10, 2020
3af06eb
unit tests added
apoorvedave1 Oct 10, 2020
607ae9e
Merge remote-tracking branch 'upstream/master' into bug_193
apoorvedave1 Oct 10, 2020
c3186b8
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 10, 2020
315b36d
review comments, test fixes
apoorvedave1 Oct 10, 2020
42de97a
rephrase test name
apoorvedave1 Oct 10, 2020
6d1f0fc
autoformat result
apoorvedave1 Oct 10, 2020
ee6f406
Merge branch 'bug_193' of github.com:apoorvedave1/hyperspace-1 into m…
apoorvedave1 Oct 10, 2020
5b232d4
final review commments
apoorvedave1 Oct 10, 2020
03f23da
review comments
apoorvedave1 Oct 10, 2020
4e23381
Merge branch 'bug_193' of github.com:apoorvedave1/hyperspace-1 into m…
apoorvedave1 Oct 10, 2020
140eb1f
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 10, 2020
16f0e29
review comments, refactoring, reset default mode
apoorvedave1 Oct 10, 2020
dbfa533
test fixes with default modes
apoorvedave1 Oct 10, 2020
54660e2
remove delete and append flags for RefreshDeleteAction and RefreshApe…
apoorvedave1 Oct 10, 2020
91a27ea
added tests as per review comments
apoorvedave1 Oct 10, 2020
458083e
review comments
apoorvedave1 Oct 10, 2020
012c89b
nit
apoorvedave1 Oct 10, 2020
60585d7
add issue details for a known issue
apoorvedave1 Oct 10, 2020
3b09a59
review comments
apoorvedave1 Oct 12, 2020
f93831a
Update src/main/scala/com/microsoft/hyperspace/index/IndexCollectionM…
apoorvedave1 Oct 12, 2020
a9e5493
Merge remote-tracking branch 'upstream/master' into merge
apoorvedave1 Oct 12, 2020
87f4a8a
review comments and test refactoring
apoorvedave1 Oct 12, 2020
0ba4241
scalastyle
apoorvedave1 Oct 12, 2020
bdc6620
cleanup
imback82 Oct 12, 2020
03be354
minor test fix
apoorvedave1 Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.microsoft.hyperspace
import org.apache.spark.sql.{DataFrame, SparkSession}

import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_FULL
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -74,7 +75,18 @@ class Hyperspace(spark: SparkSession) {
* @param indexName Name of the index to refresh.
*/
def refreshIndex(indexName: String): Unit = {
indexManager.refresh(indexName)
refreshIndex(indexName, REFRESH_MODE_FULL)
}

/**
* Update indexes for the latest version of the data. This api provides a few supported
* refresh modes as listed below.
*
* @param indexName Name of the index to refresh.
* @param mode Refresh mode. Currently supported modes are `incremental` and `full`.
*/
def refreshIndex(indexName: String, mode: String): Unit = {
indexManager.refresh(indexName, mode)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class RefreshAction(
extends RefreshActionBase(spark, logManager, dataManager) {

final override def op(): Unit = {
// TODO: The current implementation picks the number of buckets from session config.
// This should be user-configurable to allow maintain the existing bucket numbers
// in the index log entry.
// TODO: The current implementation picks the number of buckets and
// "spark.hyperspace.index.lineage.enabled" from session config. This should be
// user-configurable to allow maintain the existing bucket numbers in the index log entry.
// https://github.com/microsoft/hyperspace/issues/196.
write(spark, df, indexConfig)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class RefreshAppendAction(
dataManager: IndexDataManager)
extends RefreshActionBase(spark, logManager, dataManager) {
final override def op(): Unit = {
// TODO: The current implementation picks the number of buckets from session config.
// This should be user-configurable to allow maintain the existing bucket numbers
// in the index log entry.
// TODO: The current implementation picks the number of buckets and
// "spark.hyperspace.index.lineage.enabled" from session config. This should be
// user-configurable to allow maintain the existing bucket numbers in the index log entry.
// https://github.com/microsoft/hyperspace/issues/196.
write(spark, dfWithAppendedFiles, indexConfig)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ class CachingIndexCollectionManager(
super.vacuum(indexName)
}

override def refresh(indexName: String): Unit = {
override def refresh(indexName: String, mode: String): Unit = {
clearCache()
super.refresh(indexName)
super.refresh(indexName, mode)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.internal.SQLConf

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions._
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL}

class IndexCollectionManager(
spark: SparkSession,
Expand Down Expand Up @@ -63,16 +63,17 @@ class IndexCollectionManager(
}
}

override def refresh(indexName: String): Unit = {
override def refresh(indexName: String, mode: String): Unit = {
withLogManager(indexName) { logManager =>
val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName)
val dataManager = indexDataManagerFactory.create(indexPath)
if (HyperspaceConf.refreshDeleteEnabled(spark)) {
if (mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) {
new RefreshDeleteAction(spark, logManager, dataManager).run()
} else if (HyperspaceConf.refreshAppendEnabled(spark)) {
new RefreshAppendAction(spark, logManager, dataManager).run()
} else {
} else if (mode.equalsIgnoreCase(REFRESH_MODE_FULL)) {
new RefreshAction(spark, logManager, dataManager).run()
} else {
throw HyperspaceException(s"Unsupported refresh mode '$mode' found.")
}
}
}
Expand Down
14 changes: 2 additions & 12 deletions src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ object IndexConstants {
val INDEX_LINEAGE_ENABLED = "spark.hyperspace.index.lineage.enabled"
val INDEX_LINEAGE_ENABLED_DEFAULT = "false"

val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled"
val REFRESH_DELETE_ENABLED_DEFAULT = "false"

/**
* This flag enables refreshing index if additional data files are appended to the source. When
* set to false, the refresh call will not run RefreshAppendAction. It will instead go for full
* refresh.
* This flag is temporary, and will be removed when both Append and Delete actions are merged
* for refreshing indexes.
*/
val REFRESH_APPEND_ENABLED = "spark.hyperspace.index.refresh.append.enabled"
val REFRESH_APPEND_ENABLED_DEFAULT = "false"
val REFRESH_MODE_INCREMENTAL = "incremental"
val REFRESH_MODE_FULL = "full"
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ trait IndexManager {
*
* @param indexName Name of the index to refresh.
*/
def refresh(indexName: String): Unit
def refresh(indexName: String, mode: String): Unit

/**
* Cancel api to bring back index from an inconsistent state to the last known stable state.
Expand Down
14 changes: 0 additions & 14 deletions src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,4 @@ object HyperspaceConf {
IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT)
.toBoolean
}

def refreshDeleteEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(IndexConstants.REFRESH_DELETE_ENABLED,
IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT)
.toBoolean
}

def refreshAppendEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(IndexConstants.REFRESH_APPEND_ENABLED,
IndexConstants.REFRESH_APPEND_ENABLED_DEFAULT)
.toBoolean
}
}
24 changes: 24 additions & 0 deletions src/test/scala/com/microsoft/hyperspace/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.microsoft.hyperspace

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import com.microsoft.hyperspace.MockEventLogger.reset
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.{EventLogger, HyperspaceEvent}
import com.microsoft.hyperspace.util.FileUtils

object TestUtils {
def copyWithState(index: IndexLogEntry, state: String): IndexLogEntry = {
Expand All @@ -46,6 +48,28 @@ object TestUtils {
path.getName +: splitPath(path.getParent)
}
}

/**
* Delete files from a given path.
*
* @param path Path to the folder containing files.
* @param pattern File name pattern to delete.
* @param numFilesToDelete Number of files to delete.
* @return Paths to the deleted file.
*/
def deleteFiles(path: String, pattern: String, numFilesToDelete: Int): Seq[Path] = {
val pathToDelete = new Path(path, pattern)
val fileNames = pathToDelete
.getFileSystem(new Configuration)
.globStatus(pathToDelete)
.map(_.getPath)

assert(fileNames.length >= numFilesToDelete)
val filesToDelete = fileNames.take(numFilesToDelete)
filesToDelete.foreach(FileUtils.delete(_))

filesToDelete
}
}

/**
Expand Down
Loading