diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index b80b622190dbc1..85e141bb3cba03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -287,6 +287,23 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d keys.addAll(validOutputIds); validOutputIds = keys; } + if (inputFragment instanceof MultiCastPlanFragment) { + // TODO: remove this logic when we split to multi-window in logical window to physical window conversion + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); + DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( + multiCastDataSink.getDataStreamSinks().size() - 1); + if (!(distribute.child() instanceof PhysicalProject)) { + List projectionExprs = new ArrayList<>(); + PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute); + Preconditions.checkState(consumer != null, "consumer not found"); + for (Slot slot : distribute.getOutput()) { + projectionExprs.add(ExpressionTranslator.translate(consumer.getProducerSlot(slot), context)); + } + TupleDescriptor projectionTuple = generateTupleDesc(distribute.getOutput(), null, context); + dataStreamSink.setProjections(projectionExprs); + dataStreamSink.setOutputTupleDesc(projectionTuple); + } + } DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context); PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition); exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances()); @@ -2331,4 +2348,16 @@ private boolean isComplexDataType(DataType dataType) { return dataType instanceof ArrayType || dataType instanceof MapType || dataType instanceof JsonType || dataType instanceof StructType; } + + private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) { + if (root == null) { + return null; + } else if (root instanceof PhysicalCTEConsumer) { + return (PhysicalCTEConsumer) root; + } else if (root.children().size() != 1) { + return null; + } else { + return getCTEConsumerChild((PhysicalPlan) root.child(0)); + } + } }