-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
When DataFusion produces a Substrait plan for a SQL query that performs an inner join followed by a projection (ProjectRel), the field indices for the right‑side columns are wrong. In our case:
We expect other_int (the 2nd column of the right input) to be referenced by index 7
We expect other_flt (the 3rd column of the right input) to be referenced by index 8
—but the generated plan uses indices 5 and 6, which map instead to pred_input2 and key. Any Substrait consumer reading this plan will extract incorrect data.
To Reproduce
Prepare your Parquet tables at resources/dummy_table1.parquet and resources/dummy_table2.parquet (schema shown below).
Save the following as validate_substrait.py:
#!/usr/bin/env python3
import json
from datafusion import SessionContext
from datafusion.substrait import Serde
from substrait.proto import Plan as SubstraitProtoPlan
from google.protobuf.json_format import MessageToDict
T1_PARQUET = "resources/dummy_table1.parquet"
T2_PARQUET = "resources/dummy_table2.parquet"
def find_project(node):
if isinstance(node, dict):
if "project" in node:
return node["project"]
for v in node.values():
p = find_project(v)
if p is not None: return p
elif isinstance(node, list):
for item in node:
p = find_project(item)
if p is not None: return p
return None
def find_field(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "field" and isinstance(v, int):
return v
deeper = find_field(v)
if deeper is not None: return deeper
elif isinstance(node, list):
for item in node:
deeper = find_field(item)
if deeper is not None: return deeper
return None
def main():
ctx = SessionContext()
ctx.register_parquet("t1", T1_PARQUET)
ctx.register_parquet("t2", T2_PARQUET)
sql = """
SELECT
t1.key,
t1.filter_col_int,
t1.filter_col_float,
t2.filter_col_num,
t2.value_col,
(t1.predict_in1 + t2.value_col) AS project_col
FROM t1
INNER JOIN t2 ON t1.key = t2.key
WHERE
t1.filter_col_int > 0
AND (t2.value_col IS NULL OR t2.value_col < 40)
AND t1.filter_col_float IS NOT NULL
AND t2.value_col IS NOT NULL
"""
sub_plan = Serde.serialize_to_plan(sql, ctx)
proto_bytes = sub_plan.encode()
p = SubstraitProtoPlan()
p.ParseFromString(proto_bytes)
plan = MessageToDict(p, preserving_proto_field_name=True)
proj = find_project(plan)
if proj is None:
raise RuntimeError("No ProjectRel found!")
print("\nSubstrait ProjectRel.expressions:")
for i, expr in enumerate(proj.get("expressions", [])):
if "selection" in expr:
idx = find_field(expr["selection"]) or 0
print(f" Expr[{i}]: selection → index = {idx}")
else:
fn = expr.get("scalar_function", {}).get("function_reference", "?")
print(f" Expr[{i}]: scalar_function → func_ref={fn}")
print(
"\n❗ You should see:\n"
" Expr[3] = 5 (WRONG; expected 7 for filter_col_num)\n"
" Expr[4] = 6 (WRONG; expected 8 for value_col)"
)
if name == "main":
main()
==
Install dependencies and run:
pip install datafusion python-substrait protobuf
chmod +x validate_substrait.py
./validate_substrait.py
Observe output showing Expr[3] = 5 and Expr[4] = 6 instead of 7 and 8.
Expected behavior
Expr[3] should be 7 (pointing at other_int)
Expr[4] should be 8 (pointing at other_flt)
Downstream consumers reconstruct the final schema as
key, filter_col_int, filter_col_float, other_int, other_flt, project_col
Additional context
DataFusion version: v45.2.0
Relevant PR: #12495 added explicit output_mapping to ProjectRel, but it appears not to cover post‑join projections.
Related issues:
COUNT schema mismatch (#10873)
Empty‐args in AggregateRel (#15344)
Grouping expression misalignment (#14348)
Substrait spec: https://substrait.io/ (see ProjectRel and output_mapping)
This bug blocks any Substrait consumer from correctly reading joined projections without custom hacks.