Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Nov 24, 2020

What changes were proposed in this pull request?

This PR aims to support storage migration to the fallback storage like cloud storage (S3) during worker decommission for the corner cases where the exceptions occur or there is no live peer left.

Although this PR focuses on cloud storage like S3 which has a TTL feature in order to simplify Spark's logic, we can use alternative fallback storages like HDFS/NFS(EFS) if the user provides a clean-up mechanism.

Why are the changes needed?

Currently, storage migration is not possible when there is no available executor. For example, when there is one executor, the executor cannot perform storage migration because it has no peer.

Does this PR introduce any user-facing change?

Yes. This is a new feature.

How was this patch tested?

Pass the CIs with newly added test cases.

@dongjoon-hyun
Copy link
Member Author

Could you review this, @holdenk , @viirya , @mridulm ?

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@dongjoon-hyun

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Nov 25, 2020

Test build #131768 has finished for PR 30492 at commit 25740c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Tagar
Copy link

Tagar commented Nov 25, 2020

@dongjoon-hyun is this only for shuffled data? I was wondering if it would also be possible to cover MEMORY_AND_DISK for cached dataframes? Thanks!

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @Tagar . Yes, they are separate options:

  • spark.storage.decommission.shuffleBlocks.enabled
  • spark.storage.decommission.rddBlocks.enabled

This PR is still under review. If this is accepted, I believe we can move on to that.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Nov 25, 2020

Could you review this PR once more please, @viirya and @mridulm ?

@holdenk
Copy link
Contributor

holdenk commented Nov 25, 2020

Hi @dongjoon-hyun, I'm taking this week away from open source to take my puppy to go see snow for the first time. I'll do a review on Monday. Thanks for understanding :)

@dongjoon-hyun
Copy link
Member Author

Thank you, @holdenk . Sorry for pinging you on the holiday season.

@dongjoon-hyun
Copy link
Member Author

BTW, cc @dbtsai , too.

@dongjoon-hyun
Copy link
Member Author

Thank you so much for your review, @viirya !

@dongjoon-hyun
Copy link
Member Author

Hi, @zsxwing and @viirya . I addressed your comments. Could you review once more please if you have a chance?

@zsxwing
Copy link
Member

zsxwing commented Nov 30, 2020

Thanks for adding the application id. I don't have time to look at details recently. Will defer that to @viirya

@SparkQA
Copy link

SparkQA commented Nov 30, 2020

Test build #131939 has finished for PR 30492 at commit a0285dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Got it, @zsxwing .

@dongjoon-hyun
Copy link
Member Author

Hi, @holdenk and @viirya and @dbtsai . Could you review this, please?

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for waiting on the review. I’ve got a few minor nits/ questions but overall this looks good to me. I’m excited for us to support this as part of dynamic scale down in Spark 3.1 :)

try {
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} catch {
case e: IOException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/ question: Could we move the if up as part of the case and avoid the need for explicit rethrow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because we should access the normal access path by default. We are able to use Fallback path only if the normal access path fail because conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined doesn't mean the fallback storage has the data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if I misunderstand your comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so I mean we still have a try/catch just the case statement has the if as part of it (eg “ Using if expressions in case statements”)

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay to me.

@dongjoon-hyun
Copy link
Member Author

Thank you so much, @holdenk and @viirya .
Merged to master for Apache Spark 3.1.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants