diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs index c387e05390fc2..1c2a037bb5108 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs @@ -204,6 +204,24 @@ impl StreamedBatch { } } +/// Per-row filter outcome tracking for full outer joins. +/// +/// In a full outer join with a filter, buffered rows that match on join +/// keys but fail every filter evaluation must be emitted with NULLs on +/// the streamed side. Three states are needed because a simple boolean +/// cannot distinguish "never matched" (handled by [`BufferedBatch::null_joined`]) +/// from "matched but all filters failed" (must be emitted as null-joined). +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum FilterState { + /// Row never appeared in a matched pair. + Unvisited = 0, + /// Row matched streamed rows, but all filter evaluations failed. + AllFailed = 1, + /// Row matched and at least one filter evaluation passed. + SomePassed = 2, +} + /// A buffered batch that contains contiguous rows with same join key /// /// `BufferedBatch` can exist as either an in-memory `RecordBatch` or a `RefCountedTempFile` on disk. @@ -219,11 +237,9 @@ pub(super) struct BufferedBatch { pub null_joined: Vec, /// Size estimation used for reserving / releasing memory pub size_estimation: usize, - /// The indices of buffered batch that the join filter doesn't satisfy. - /// This is a map between right row index and a boolean value indicating whether all joined row - /// of the right row does not satisfy the filter . - /// When dequeuing the buffered batch, we need to produce null joined rows for these indices. - pub join_filter_not_matched_map: HashMap, + /// Tracks filter outcomes for buffered rows in full outer joins. + /// Indexed by absolute row position within the batch. See [`FilterState`]. + pub join_filter_status: Vec, /// Current buffered batch number of rows. Equal to batch.num_rows() /// but if batch is spilled to disk this property is preferable /// and less expensive @@ -260,7 +276,7 @@ impl BufferedBatch { join_arrays, null_joined: vec![], size_estimation, - join_filter_not_matched_map: HashMap::new(), + join_filter_status: vec![FilterState::Unvisited; num_rows], num_rows, } } @@ -1199,12 +1215,16 @@ impl MaterializingSortMergeJoinStream { return Ok(()); } - // For buffered row which is joined with streamed side rows but all joined rows - // don't satisfy the join filter + // Collect buffered rows that matched on join keys but had every + // filter evaluation fail — these must be emitted with NULLs on + // the streamed side to satisfy full outer join semantics. let not_matched_buffered_indices = buffered_batch - .join_filter_not_matched_map + .join_filter_status .iter() - .filter_map(|(idx, failed)| if *failed { Some(*idx) } else { None }) + .enumerate() + .filter_map(|(i, state)| { + matches!(state, FilterState::AllFailed).then_some(i as u64) + }) .collect::>(); let buffered_indices = @@ -1219,7 +1239,9 @@ impl MaterializingSortMergeJoinStream { self.joined_record_batches .push_batch_with_null_metadata(record_batch, self.join_type); } - buffered_batch.join_filter_not_matched_map.clear(); + buffered_batch + .join_filter_status + .fill(FilterState::Unvisited); Ok(()) } @@ -1392,15 +1414,18 @@ impl MaterializingSortMergeJoinStream { if right.is_null(i) { continue; } - let buffered_index = right.value(i); - buffered_batch.join_filter_not_matched_map.insert( - buffered_index, - *buffered_batch - .join_filter_not_matched_map - .get(&buffered_index) - .unwrap_or(&true) - && !pre_mask.value(offset + i), - ); + let idx = right.value(i) as usize; + match buffered_batch.join_filter_status[idx] { + FilterState::SomePassed => {} + _ if pre_mask.value(offset + i) => { + buffered_batch.join_filter_status[idx] = + FilterState::SomePassed; + } + _ => { + buffered_batch.join_filter_status[idx] = + FilterState::AllFailed; + } + } } offset += chunk_len; }