Skip to content

feat: simplify count distinct logical plan#15867

Closed
chenkovsky wants to merge 6 commits intoapache:mainfrom
chenkovsky:feature/count_distinct_accumulator
Closed

feat: simplify count distinct logical plan#15867
chenkovsky wants to merge 6 commits intoapache:mainfrom
chenkovsky:feature/count_distinct_accumulator

Conversation

@chenkovsky
Copy link
Contributor

@chenkovsky chenkovsky commented Apr 26, 2025

Which issue does this PR close?

Rationale for this change

select count(distinct ..) query doesn't go to the specialized distinct accumulator.

single_distinct_to_groupby is used to optimize queries with multiple distincts.
e.g.

Aggregation
       GROUP BY (k)
       F1(DISTINCT s0, s1, ...),
       F2(DISTINCT s0, s1, ...),

but if we only have one count(distinct), we should goto specialized distinct accumulator.

there's also another old issue #4082

What changes are included in this PR?

disable single_distinct_to_groupby optimzation for count distinct.

Are these changes tested?

UT

Are there any user-facing changes?

No

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Apr 26, 2025
@chenkovsky chenkovsky force-pushed the feature/count_distinct_accumulator branch from fca72c2 to 191d7cf Compare April 26, 2025 10:23
@chenkovsky chenkovsky force-pushed the feature/count_distinct_accumulator branch from 5383a27 to bf0bb3a Compare April 27, 2025 00:09
for e in args {
fields_set.insert(e);
}
distinct_func = Some(Arc::clone(func));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can record the name instead of the whole func

@jayzhan211
Copy link
Contributor

clickbench Q42 fail

SELECT "URLHash", "EventDate", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate" ORDER BY PageViews DESC LIMIT 10 OFFSET 100;

@chenkovsky
Copy link
Contributor Author

clickbench Q42 fail

SELECT "URLHash", "EventDate", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate" >= '2013-07-01' AND "EventDate" <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate" ORDER BY PageViews DESC LIMIT 10 OFFSET 100;

this query is in clickbench.slt, and sqllogictest doesn't fail. I cannot reproduce it in my env.

@jayzhan211
Copy link
Contributor

My bad, it seems like the casting error is allowed and the log is removed in #15858

@jayzhan211
Copy link
Contributor

Q14

SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;

This query is 2x slower, we need to find the reason and optimize it.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 27, 2025

Sorry, not Q14 is Q13

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ feature_count_distinct_accumulator ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    0.18ms │                             0.23ms │  1.31x slower │
│ QQuery 1     │   28.20ms │                            26.98ms │     no change │
│ QQuery 2     │   54.85ms │                            54.72ms │     no change │
│ QQuery 3     │   50.61ms │                            44.70ms │ +1.13x faster │
│ QQuery 4     │  321.17ms │                           596.53ms │  1.86x slower │
│ QQuery 5     │  480.40ms │                           700.37ms │  1.46x slower │
│ QQuery 6     │    0.21ms │                             0.18ms │ +1.16x faster │
│ QQuery 7     │   33.49ms │                            29.93ms │ +1.12x faster │
│ QQuery 8     │  411.45ms │                           415.03ms │     no change │
│ QQuery 9     │  537.09ms │                           520.49ms │     no change │
│ QQuery 10    │  148.01ms │                           144.94ms │     no change │
│ QQuery 11    │  161.14ms │                           159.35ms │     no change │
│ QQuery 12    │  516.02ms │                           458.13ms │ +1.13x faster │
│ QQuery 13    │  675.36ms │                          1523.54ms │  2.26x slower │
│ QQuery 14    │  484.44ms │                           454.57ms │ +1.07x faster │
│ QQuery 15    │  393.40ms │                           386.47ms │     no change │
│ QQuery 16    │  919.67ms │                           867.38ms │ +1.06x faster │
│ QQuery 17    │  816.58ms │                           783.79ms │     no change │
│ QQuery 18    │ 2365.83ms │                          2443.65ms │     no change │
│ QQuery 19    │   49.46ms │                            43.93ms │ +1.13x faster │
│ QQuery 20    │  689.76ms │                           659.51ms │     no change │
│ QQuery 21    │  907.96ms │                           865.63ms │     no change │
│ QQuery 22    │ 1729.82ms │                          1882.47ms │  1.09x slower │
│ QQuery 23    │ 5449.95ms │                          5430.83ms │     no change │
│ QQuery 24    │  305.67ms │                           285.31ms │ +1.07x faster │
│ QQuery 25    │  297.98ms │                           285.56ms │     no change │
│ QQuery 26    │  336.26ms │                           322.70ms │     no change │
│ QQuery 27    │ 1093.64ms │                          1065.44ms │     no change │
│ QQuery 28    │ 8143.92ms │                          8515.65ms │     no change │
│ QQuery 29    │  399.70ms │                           359.68ms │ +1.11x faster │
│ QQuery 30    │  674.77ms │                           426.97ms │ +1.58x faster │
│ QQuery 31    │  597.66ms │                           442.16ms │ +1.35x faster │
│ QQuery 32    │ 2636.13ms │                          2211.93ms │ +1.19x faster │
│ QQuery 33    │ 3176.46ms │                          2929.34ms │ +1.08x faster │
│ QQuery 34    │ 4208.19ms │                          3916.81ms │ +1.07x faster │
│ QQuery 35    │  715.89ms │                           603.21ms │ +1.19x faster │
│ QQuery 36    │   84.79ms │                            84.43ms │     no change │
│ QQuery 37    │   62.08ms │                            52.38ms │ +1.19x faster │
│ QQuery 38    │   92.32ms │                            84.77ms │ +1.09x faster │
│ QQuery 39    │  153.67ms │                           136.62ms │ +1.12x faster │
│ QQuery 40    │   92.64ms │                            34.58ms │ +2.68x faster │
│ QQuery 41    │  120.72ms │                            29.36ms │ +4.11x faster │
│ QQuery 42    │   35.37ms │                            27.37ms │ +1.29x faster │
└──────────────┴───────────┴────────────────────────────────────┴───────────────┘

@chenkovsky
Copy link
Contributor Author

chenkovsky commented Apr 27, 2025

@jayzhan211 after profiling, I found that most time is costed in hashset operator. for primative type, hashset may not a good option.
2025-04-27 13 21 07

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 27, 2025

    fn state(&mut self) -> Result<Vec<ScalarValue>> {
        let scalars = self.values.iter().cloned().collect::<Vec<_>>();
        let arr =
            ScalarValue::new_list_nullable(scalars.as_slice(), &self.state_data_type);
        Ok(vec![ScalarValue::List(arr)])
    }

Currently, we clone the HashSet from the partial aggregation and convert it into a List for final aggregation.
In high-cardinality cases, where most values are unique, this results in two rounds of aggregation plus an additional clone, which is inefficient.

I see two potential solutions:

  1. Use a single aggregation with parallelism.
    We could attempt to perform a single aggregation step while still allowing parallel processing. At first, we could test if a single-threaded aggregation is faster than the current two-step process in high-cardinality scenarios. And then find a way to execute it with parallelism later on

  2. Avoid cloning and directly use the HashSet in the final aggregation.
    We can try to eliminate the clone and List conversion by initializing the final aggregation state directly with the HashSet.
    However, ensuring true zero-copy from partial to final aggregation might be non-trivial and would require careful handling. This approach would also require significant changes to how aggregation currently works.

@chenkovsky
Copy link
Contributor Author

BTW, if we want to run count distinct in big data scenario, we have to use two-step process. so I think we have to add an configure to toggle this optimization.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 28, 2025

BTW, if we want to run count distinct in big data scenario, we have to use two-step process. so I think we have to add an configure to toggle this optimization.

We can control the skip partial aggregation ratio, the lower the ration the more likely the partial aggregation is skipped, so we probably don't need another switch to control two-step or single step.

@chenkovsky
Copy link
Contributor Author

I tested shared concurrent hashset(DashSet) to avoid clone, but no performance gain.

something like

static global_values: LazyLock<DashSet<i64, std::hash::RandomState>> = LazyLock::new(|| DashSet::new());
#[derive(Debug)]
pub struct PrimitiveDistinctCountAccumulatorI64
{
    data_type: DataType,
    values: &'static DashSet<i64, std::hash::RandomState>
}

impl PrimitiveDistinctCountAccumulatorI64
{
    pub fn new(data_type: &DataType) -> Self {
        Self {
            values: &global_values,
            data_type: data_type.clone(),
        }
    }
}

@chenkovsky
Copy link
Contributor Author

I tried to do single thread distinct, but no performance gain.

something like:

impl<T> Accumulator for PrimitiveDistinctCountAccumulator<T> {
  fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
     //  don't dedup
  }

  fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
    // don't dedup
  }

 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
    // do dedup
  }
}

@chenkovsky chenkovsky closed this Apr 28, 2025
@chenkovsky
Copy link
Contributor Author

chenkovsky commented Apr 29, 2025

@jayzhan211 I think the root cause of poor performance is that, orginal plan can count parallelly, but current plan is actually blocked on final aggregation. I'm trying to modify physical plan to make count distinct more efficient.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 29, 2025

I found we actually didn't have distinct count group accumulator

#15888

I try one and the performance is much better now, but still slightly left behind the main branch.

But SELECT COUNT(DISTINCT "UserID") FROM hits; becomes pretty slow 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

select count(distinct ..) query doesn't go to the specialized distinct accumulator

2 participants