Skip to content

Conversation

@CodingCat
Copy link
Contributor

@CodingCat CodingCat commented Feb 19, 2025

What changes were proposed in this pull request?

it's a joint work with @YutingWang98

currently we have to wait for spark shuffle object gc to clean disk space occupied by celeborn shuffles

As a result, if a shuffle is failed to fetch and retried , the disk space occupied by the failed attempt cannot really be cleaned , we hit this issue internally when we have to deal with 100s of TB level shuffles in a single spark application, any hiccup in servers can double even triple the disk usage

this PR implements the mechanism to delete files from failed-to-fetch shuffles

the main idea is actually simple, it triggers clean up in LifecycleManager when it applies for a new celeborn shuffle id for a retried shuffle write stage

the tricky part is that is to avoid delete shuffle files when it is referred by multiple downstream stages: the PR introduces RunningStageManager to track the dependency between stages

Why are the changes needed?

saving disk space

Does this PR introduce any user-facing change?

a new config

How was this patch tested?

we manually delete some files

image

from the above screenshot we can see that originally we have shuffle 0, 1 and after 1 faced with chunk fetch failure, it triggers a retry of 0 (shuffle 2), but at this moment, 0 has been deleted from the workers

image

in the logs, we can see that in the middle the application , the unregister shuffle request was sent for shuffle 0

@CodingCat CodingCat changed the title [WIP] clean failed shuffle disk [CELEBORN-1896] delete data from failed to fetch shuffles Mar 7, 2025
@codecov
Copy link

codecov bot commented Mar 9, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 63.62%. Comparing base (4bacd1f) to head (eed6ba5).
Report is 62 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #3109       +/-   ##
===========================================
+ Coverage   32.63%   63.62%   +30.99%     
===========================================
  Files         341      343        +2     
  Lines       20422    20819      +397     
  Branches     1820     1835       +15     
===========================================
+ Hits         6663    13243     +6580     
+ Misses      13387     6617     -6770     
- Partials      372      959      +587     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

appShuffleIdentifier: String): Unit = {
val Array(appShuffleId, stageId, _) = appShuffleIdentifier.split('-')
lifecycleManager.get().getShuffleIdMapping.get(appShuffleId.toInt).foreach {
case (pastAppShuffleIdentifier, (celebornShuffleId, _)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have skipped the input parameter of celebornShuffleId.

lifecycleManager.compareAndSet(null, ref)
}

private def noRunningDownstreamStage(shuffleId: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input parameter should be celebornShuffleId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes, fixed

|| onlyCurrentStageReferred(celebornShuffleId, stageId.toInt)
|| noRunningDownstreamStage(celebornShuffleId)
|| !committedSuccessfully(celebornShuffleId)) {
val Array(_, stageId, attemptId) = pastAppShuffleIdentifier.split('-')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

ret
}

private val cleanerThread = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be replaced by newDaemonSingleThreadScheduledExecutor and scheduleWithFixedDelay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here can be more parameters to change to clean failed shuffle interval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

// expecting celeborn shuffle id and application shuffle identifier
@volatile private var getShuffleIdForWriterCallback: Option[BiConsumer[Integer, String]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@volatile private var getShuffleIdForWriterCallback: Option[BiConsumer[Integer, String]] = None
@volatile private var validateCelebornShuffleIdForClean: Option[BiConsumer[Integer, String]] = None

getShuffleIdForWriterCallback = Some(callback)
}
// expecting celeborn shuffle id and application shuffle identifier
@volatile private var getShuffleIdForReaderCallback: Option[BiConsumer[Integer, String]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@volatile private var getShuffleIdForReaderCallback: Option[BiConsumer[Integer, String]] = None
@volatile private var recordShuffleIdReference: Option[BiConsumer[Integer, String]] = None

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the package org.apache.spark.scheduler truly necessary in Celeborn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need to access runningStages in DAGScheduler which is a private[scheduler] variable


def addShuffleIdReferringStage(celebornShuffleId: Int, appShuffleIdentifier: String): Unit = {
// this is only implemented/tested with Spark for now
val Array(_, stageId, _) = appShuffleIdentifier.split('-')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this together with SparkUtils.appShuffleIdentifier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean make it as a method of SparkUtils?

we may not be able to do that since SparkUtils is a spark version specific class, but for the reason mentioned above, FailedShuffleCleaner has to be in spark-common

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to move to SparkCommonUtils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, moved

FailedShuffleCleaner.addShuffleIdToBeCleaned(appShuffleIdentifier);
}

public static void addShuffleIdRefCount(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better rename addShuffleIdRefCount to addShuffleIdReferringStage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

def addShuffleIdReferringStage(celebornShuffleId: Int, appShuffleIdentifier: String): Unit = {
// this is only implemented/tested with Spark for now
val Array(_, stageId, _) = appShuffleIdentifier.split('-')
celebornShuffleIdToReferringStages.putIfAbsent(celebornShuffleId, new mutable.HashSet[Int]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be ConcurrentSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, as mentioned at #3109 (comment), I found I still need an explicit lock over this HashSet, so ConcurrentHashSet may not help here

}

public static void addShuffleIdRefCount(
LifecycleManager lifecycleManager, int celebornShuffeId, String appShuffleIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a typo. celebornShuffeId --> celebornShuffleId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, fixed

lifecycleManager.registerInvalidatedBroadcastCallback(
shuffleId -> SparkUtils.invalidateSerializedGetReducerFileGroupResponse(shuffleId));
}
if (lifecycleManager.conf().clientFetchCleanFailedShuffle()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are duplicates of lines 159~172.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, some merge error, fixed

@CodingCat
Copy link
Contributor Author

@FMX @RexXiong thank you for the review, I addressed the comments, would you please take another look?

class RunningStageManagerImpl extends RunningStageManager {
private def dagScheduler = SparkContext.getActive.get.dagScheduler
override def isRunningStage(stageId: Int): Boolean = {
dagScheduler.runningStages.map(_.id).contains(stageId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use reflect to get the value of runningStages? IMO we would better not name a package from other project. See SparkCommonUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable, changed


def addShuffleIdReferringStage(celebornShuffleId: Int, appShuffleIdentifier: String): Unit = {
// this is only implemented/tested with Spark for now
val Array(_, stageId, _) = appShuffleIdentifier.split('-')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to move to SparkCommonUtils?

@CodingCat
Copy link
Contributor Author

@RexXiong I addressed all comments, please let me know about any further suggestions

@RexXiong
Copy link
Contributor

Thanks @CodingCat I left some comments, please take a look when you have time.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
Class<?> stageClass = Class.forName("org.apache.spark.scheduler.Stage");
idField = stageClass.getDeclaredField("id");
idField.setAccessible(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use the DynFields, likes:

private static final DynFields.UnboundField shuffleIdToMapStage_FIELD =
DynFields.builder().hiddenImpl(DAGScheduler.class, "shuffleIdToMapStage").build();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it can be static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

try {
DAGScheduler dagScheduler = SparkContext$.MODULE$.getActive().get().dagScheduler();
Class<?> dagSchedulerClz = SparkContext$.MODULE$.getActive().get().dagScheduler().getClass();
Field runningStagesField = dagSchedulerClz.getDeclaredField("runningStages");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynFields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

private var cleanerThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"failedShuffleCleanerThreadPool")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it would always launch the pool even the feature is not enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


lifecycleManager.registerShuffleTrackerCallback(
shuffleId -> SparkUtils.unregisterAllMapOutput(mapOutputTracker, shuffleId));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@CodingCat
Copy link
Contributor Author

@YutingWang98 taking a look for the upstream version?

@YutingWang98
Copy link
Contributor

@YutingWang98 taking a look for the upstream version?

Thanks! lgtm

@CodingCat
Copy link
Contributor Author

@turboFei thanks for the review! anything else I need to address?

@turboFei
Copy link
Member

Left some comments in CodingCat#1

@turboFei
Copy link
Member

gentle ping @FMX

@CodingCat
Copy link
Contributor Author

i removed some tests from the PR as they are pretty flaky in github CI even tho they have been running internally for months without issues , and I cannot reproduce the failures in my laptop

the current failures not really related to my PR

Copy link
Member

@turboFei turboFei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, thanks for the efforts.
Left some comments

.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.stageRerun.enabled", "false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UT is broken. @CodingCat

Copy link
Member

@turboFei turboFei May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some UT has special config, for this UT, it is spark.celeborn.client.spark.stageRerun.enabled=false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to revert the refactor the test module to prevent mistake.

baseBuilder.config("spark.celeborn.client.spark.fetch.cleanFailedShuffle", "true")
} else {
baseBuilder
baseBuilder.config("spark.celeborn.client.spark.stageRerun.enabled", "false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not generic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, this config is usually true.

}
}

test("celeborn spark integration test - unregister shuffle with throwsFetchFailure disabled") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is special here, please see the UT name,

test("celeborn spark integration test - unregister shuffle with throwsFetchFailure disabled")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current abstraction of createSparkSession is not generic and hard to extend.

@CodingCat
Copy link
Contributor Author

CodingCat commented May 20, 2025

@turboFei I think now the code is in a better shape and test also seems has been stabilized

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks. Merged into main(v0.6.0).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants