[GLUTEN-9671][VL] Fix broadcast exchange stackoverflow due to Kryo serialization#10541
Conversation
This pull request introduces a safer and more robust approach for handling Spark's `BroadcastMode` during serialization. The main improvement is the introduction of a new `SafeBroadcastMode` abstraction and related utilities, which help avoid serialization issues that caused a Stackoverflow exception during broadcast exchanges. BroadcastMode was introduced in the [PR](apache#8116) that caused the issues observed. HashedRelationBroadcastMode embeds Catalyst expression trees, which are not safe to Kryo-serialize when running with `spark.kryo.referenceTracking=false` (default internally). With this change, the broadcast payload now contains only primitives and byte arrays (no Catalyst trees). For bound keys, we serialize just column ordinals (+ null-aware flag) and for computed keys (e.g., upper(col)), we serialize the key expressions once as Java bytes and deserialize only where needed to build projections. Ran internal test set (50 queries) and ran other query specifically checking if `spark.gluten.velox.offHeapBroadcastBuildRelation.enabled=true;` works.
|
@felixloesing Thanks for your fix. Just curious — why does Gluten require this change, while vanilla Spark doesn't? |
@JkSelf This is also described in the PR that introduced the |
JkSelf
left a comment
There was a problem hiding this comment.
Thanks for your fix. LGTM except two minor comments.
| oos.writeObject(keys) | ||
| oos.flush() | ||
| bos.toByteArray | ||
| } finally { |
There was a problem hiding this comment.
nit: Can we add a catch clause to log the exception and make debugging easier?
| try { | ||
| ois = new ObjectInputStream(bis) | ||
| ois.readObject().asInstanceOf[Seq[Expression]] | ||
| } finally { |
|
@JkSelf Thanks for the review! Added a catch clause and logged the exception. |
JkSelf
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the fix.
…rialization (apache#10541) This pull request introduces a safer and more robust approach for handling Spark's BroadcastMode during serialization. The main improvement is the introduction of a new SafeBroadcastMode abstraction and related utilities, which help avoid serialization issues that caused a Stackoverflow exception during broadcast exchanges. BroadcastMode was introduced in this PR that caused the issue we observed. HashedRelationBroadcastMode embeds Catalyst expression trees, which are not safe to Kryo-serialize when running with spark.kryo.referenceTracking=false (default internally). With this change, the broadcast payload now contains only primitives and byte arrays (no Catalyst trees). For bound keys, we serialize just column ordinals (+ null-aware flag) and for computed keys (e.g., upper(col)), we serialize the key expressions once as Java bytes and deserialize only where needed to build projections. (cherry picked from commit 91c52e1)
…ckoverflow due to Kryo serialization #10733 This pull request introduces a safer and more robust approach for handling Spark's BroadcastMode during serialization. The main improvement is the introduction of a new SafeBroadcastMode abstraction and related utilities, which help avoid serialization issues that caused a Stackoverflow exception during broadcast exchanges. BroadcastMode was introduced in this PR that caused the issue we observed. HashedRelationBroadcastMode embeds Catalyst expression trees, which are not safe to Kryo-serialize when running with spark.kryo.referenceTracking=false (default internally). With this change, the broadcast payload now contains only primitives and byte arrays (no Catalyst trees). For bound keys, we serialize just column ordinals (+ null-aware flag) and for computed keys (e.g., upper(col)), we serialize the key expressions once as Java bytes and deserialize only where needed to build projections. (cherry picked from commit 91c52e1) Co-authored-by: Felix Loesing <felix.loesing@gmail.com>
This pull request introduces a safer and more robust approach for handling Spark's
BroadcastModeduring serialization. The main improvement is the introduction of a newSafeBroadcastModeabstraction and related utilities, which help avoid serialization issues that caused a Stackoverflow exception during broadcast exchanges. BroadcastMode was introduced in this PR that caused the issue we observed. HashedRelationBroadcastMode embeds Catalyst expression trees, which are not safe to Kryo-serialize when running withspark.kryo.referenceTracking=false(default internally).With this change, the broadcast payload now contains only primitives and byte arrays (no Catalyst trees). For bound keys, we serialize just column ordinals (+ null-aware flag) and for computed keys (e.g., upper(col)), we serialize the key expressions once as Java bytes and deserialize only where needed to build projections.
Test Plan
Ran internal test set (50 queries) and ran other query specifically checking if
spark.gluten.velox.offHeapBroadcastBuildRelation.enabled=true;works.