Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Dec 12, 2016

What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.

This PR adds a new option latestFirst to control this behavior. When it's true, FileStreamSource will sort the files by the modified time from latest to oldest, and take the first maxFilesPerTrigger files as a new batch.

How was this patch tested?

The added test.

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70002 has finished for PR 16251 at commit 58a57d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val optionMapWithoutPath: Map[String, String] =
parameters.filterKeys(_ != "path")

/** Whether to scan new files first. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate this comment further. In a trigger when it finds unprocessed files, it will first process the latest file.

Also, isnt latest more common than newest?

@zsxwing zsxwing changed the title [SPARK-18826][SS]Add 'newestFirst' option to FileStreamSource [SPARK-18826][SS]Add 'latestFirst' option to FileStreamSource Dec 14, 2016
@SparkQA
Copy link

SparkQA commented Dec 14, 2016

Test build #70149 has finished for PR 16251 at commit ce1d57e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 14, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70154 has finished for PR 16251 at commit ce1d57e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
val f1 = stringToFile(new File(src, "1.txt"), "1")
val f2 = stringToFile(new File(src, "2.txt"), "2")
eventually(timeout(streamingTimeout)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use eventually? Why not just set f1.setLatModified(f2.lastModified + 1000)

val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to wait on the manual clock? CheckLastBatch will automatically wait for the batch to complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to wait on the manual clock? CheckLastBatch will automatically wait for the batch to complete?

CheckLastBatch waits only when AddData is used, but in this test, I need to add data before starting the query.

)

// Read latest files first, so the first batch is "2", and the second batch is "1".
val fileStream2 = createFileStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code can be deduped by writing a function that make the query run two batches and collect the results in order. And then the function is called with latestFirst true or false, and the result order checked.

@tdas
Copy link
Contributor

tdas commented Dec 15, 2016

LGTM, pending tests.

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70200 has finished for PR 16251 at commit 2847738.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Dec 15, 2016

Merging to master and 2.1

asfgit pushed a commit that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.

This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16251 from zsxwing/newest-first.

(cherry picked from commit 68a6dc9)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 68a6dc9 Dec 15, 2016
@zsxwing zsxwing deleted the newest-first branch January 4, 2017 19:58
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.

This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#16251 from zsxwing/newest-first.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants