Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Following #23430, this PR does the API refactor for continuous read, w.r.t. the doc

The major changes:

  1. rename XXXContinuousReadSupport to XXXContinuousStream
  2. at the beginning of continuous streaming execution, convert StreamingRelationV2 to StreamingDataSourceV2Relation directly, instead of StreamingExecutionRelation.
  3. remove all the hacks as we have finished all the read side API refactor

How was this patch tested?

existing tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, this looks like it's correctly implemented to me but we should keep an eye out for flakiness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reminding! Yea I'll keep an eye on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, with the new abstraction, we should create the ContinuousStream at the beginning of the ContinuousExecution, instead of each reconfiguration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a small fix. The test needs to specify the numPartitions, but the socket source always use the spark default parallelism. Here I make numPartitions configurable.

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101564 has finished for PR 23619 at commit 120be7c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@cloud-fan cloud-fan Jan 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this test now. With the new TableProvider abstraction, the lookup logic is unified between microbatch and continuous

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101567 has finished for PR 23619 at commit 1be0946.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101571 has finished for PR 23619 at commit 8fe6366.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101578 has finished for PR 23619 at commit 8fe6366.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101583 has finished for PR 23619 at commit 8fe6366.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good.

if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
if scan.stream.isInstanceOf[KafkaContinuousStream] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic is correct, but let's keep an eye on the tests after merging since some flakiness slipped through in the last iteration of the refactoring.

* @throws UnsupportedOperationException
*/
default ContinuousStream toContinuousStream(String checkpointLocation) {
throw new UnsupportedOperationException("Continuous scans are not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the message should indicate they're unsupported just for this type of Scan - this makes it sound like they're not supported in general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, this looks like it's correctly implemented to me but we should keep an eye out for flakiness.

@SparkQA
Copy link

SparkQA commented Jan 29, 2019

Test build #101792 has finished for PR 23619 at commit 9d2ee51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks! Merged to master

@asfgit asfgit closed this in e97ab1d Jan 29, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Following apache#23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

The major changes:
1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
3. remove all the hacks as we have finished all the read side API refactor

## How was this patch tested?

existing tests

Closes apache#23619 from cloud-fan/continuous.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec => this.batch == other.batch
Copy link
Member

@zhztheplayer zhztheplayer Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan

Hi Wenchen, just bumped into this code.

Do you remember why output is not included in equality comparison, as well as in V1 scan?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants