diff --git a/datafusion/src/physical_plan/hyperloglog/mod.rs b/datafusion/src/physical_plan/hyperloglog/mod.rs new file mode 100644 index 0000000000000..25e5213110796 --- /dev/null +++ b/datafusion/src/physical_plan/hyperloglog/mod.rs @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # HyperLogLog +//! +//! `hyperloglog` is a module that contains a modified version +//! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c) +//! with some modification based on strong assumption of usage +//! within datafusion, so that [`approx_distinct`] function can +//! be efficiently implemented. +//! +//! Specifically, like Redis's version, this HLL structure uses +//! 2**14 = 16384 registers, which means the standard error is +//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes +//! up full [`u8`] size instead of a raw int* and thus saves some +//! tricky bit shifting techniques used in the original version. +//! This results in a memory usage increase from 12Kib to 16Kib. +//! Also only the dense version is adopted, so there's no automatic +//! conversion, largely to simplify the code. +//! +//! This module also borrows some code structure from [pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs). + +// TODO remove this when hooked up with the rest +#![allow(dead_code)] + +use ahash::{AHasher, RandomState}; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::marker::PhantomData; + +/// The greater is P, the smaller the error. +const HLL_P: usize = 14_usize; +/// the number of bits of the hash value used determining the number of leading zeros +const HLL_Q: usize = 64_usize - HLL_P; +const NUM_REGISTERS: usize = 1_usize << HLL_P; +/// mask to obtain index into the registers +const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1; + +#[derive(Clone, Debug)] +pub(crate) struct HyperLogLog +where + T: Hash + ?Sized, +{ + registers: [u8; NUM_REGISTERS], + phantom: PhantomData, +} + +/// fixed seed for the hashing so that values are consistent across runs +const SEED: RandomState = RandomState::with_seeds( + 0x885f6cab121d01a3_u64, + 0x71e4379f2976ad8f_u64, + 0xbf30173dd28a8816_u64, + 0x0eaea5d736d733a4_u64, +); + +impl HyperLogLog +where + T: Hash + ?Sized, +{ + /// Creates a new, empty HyperLogLog. + pub fn new() -> Self { + let registers = [0; NUM_REGISTERS]; + Self { + registers, + phantom: PhantomData, + } + } + + /// choice of hash function: ahash is already an dependency + /// and it fits the requirements of being a 64bit hash with + /// reasonable performance. + #[inline] + fn hash_value(&self, obj: &T) -> u64 { + let mut hasher: AHasher = SEED.build_hasher(); + obj.hash(&mut hasher); + hasher.finish() + } + + /// Adds an element to the HyperLogLog. + pub fn add(&mut self, obj: &T) { + let hash = self.hash_value(obj); + let index = (hash & HLL_P_MASK) as usize; + let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1; + self.registers[index] = self.registers[index].max(p as u8); + } + + /// Get the register histogram (each value in register index into + /// the histogram; u32 is enough because we only have 2**14=16384 registers + #[inline] + fn get_histogram(&self) -> [u32; HLL_Q + 2] { + let mut histogram = [0; HLL_Q + 2]; + // hopefully this can be unrolled + for r in self.registers { + histogram[r as usize] += 1; + } + histogram + } + + /// Guess the number of unique elements seen by the HyperLogLog. + pub fn count(&self) -> usize { + let histogram = self.get_histogram(); + let m = NUM_REGISTERS as f64; + let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m); + for i in histogram[1..=HLL_Q].iter().rev() { + z += *i as f64; + z *= 0.5; + } + z += m * hll_sigma(histogram[0] as f64 / m); + (0.5 / 2_f64.ln() * m * m / z).round() as usize + } +} + +/// Helper function sigma as defined in +/// "New cardinality estimation algorithms for HyperLogLog sketches" +/// Otmar Ertl, arXiv:1702.01284 +#[inline] +fn hll_sigma(x: f64) -> f64 { + if x == 1. { + f64::INFINITY + } else { + let mut y = 1.0; + let mut z = x; + let mut x = x; + loop { + x *= x; + let z_prime = z; + z += x * y; + y += y; + if z_prime == z { + break; + } + } + z + } +} + +/// Helper function tau as defined in +/// "New cardinality estimation algorithms for HyperLogLog sketches" +/// Otmar Ertl, arXiv:1702.01284 +#[inline] +fn hll_tau(x: f64) -> f64 { + if x == 0.0 || x == 1.0 { + 0.0 + } else { + let mut y = 1.0; + let mut z = 1.0 - x; + let mut x = x; + loop { + x = x.sqrt(); + let z_prime = z; + y *= 0.5; + z -= (1.0 - x).powi(2) * y; + if z_prime == z { + break; + } + } + z / 3.0 + } +} + +impl Extend for HyperLogLog +where + T: Hash, +{ + fn extend>(&mut self, iter: S) { + for elem in iter { + self.add(&elem); + } + } +} + +impl<'a, T> Extend<&'a T> for HyperLogLog +where + T: 'a + Hash + ?Sized, +{ + fn extend>(&mut self, iter: S) { + for elem in iter { + self.add(elem); + } + } +} + +#[cfg(test)] +mod tests { + use super::{HyperLogLog, NUM_REGISTERS}; + + fn compare_with_delta(got: usize, expected: usize) { + let expected = expected as f64; + let diff = (got as f64) - expected; + let diff = diff.abs() / expected; + // times 6 because we want the tests to be stable + // so we allow a rather large margin of error + // this is adopted from redis's unit test version as well + let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0; + assert!( + diff <= margin, + "{} is not near {} percent of {} which is ({}, {})", + got, + margin, + expected, + expected * (1.0 - margin), + expected * (1.0 + margin) + ); + } + + macro_rules! sized_number_test { + ($SIZE: expr, $T: tt) => {{ + let mut hll = HyperLogLog::<$T>::new(); + for i in 0..$SIZE { + hll.add(&i); + } + compare_with_delta(hll.count(), $SIZE); + }}; + } + + macro_rules! typed_large_number_test { + ($SIZE: expr) => {{ + sized_number_test!($SIZE, u64); + sized_number_test!($SIZE, u128); + sized_number_test!($SIZE, i64); + sized_number_test!($SIZE, i128); + }}; + } + + macro_rules! typed_number_test { + ($SIZE: expr) => {{ + sized_number_test!($SIZE, u16); + sized_number_test!($SIZE, u32); + sized_number_test!($SIZE, i16); + sized_number_test!($SIZE, i32); + typed_large_number_test!($SIZE); + }}; + } + + #[test] + fn test_empty() { + let hll = HyperLogLog::::new(); + assert_eq!(hll.count(), 0); + } + + #[test] + fn test_one() { + let mut hll = HyperLogLog::::new(); + hll.add(&1); + assert_eq!(hll.count(), 1); + } + + #[test] + fn test_number_100() { + typed_number_test!(100); + } + + #[test] + fn test_number_1k() { + typed_number_test!(1_000); + } + + #[test] + fn test_number_10k() { + typed_number_test!(10_000); + } + + #[test] + fn test_number_100k() { + typed_large_number_test!(100_000); + } + + #[test] + fn test_number_1m() { + typed_large_number_test!(1_000_000); + } + + #[test] + fn test_u8() { + let mut hll = HyperLogLog::<[u8]>::new(); + for i in 0..1000 { + let s = i.to_string(); + let b = s.as_bytes(); + hll.add(b); + } + compare_with_delta(hll.count(), 1000); + } + + #[test] + fn test_string() { + let mut hll = HyperLogLog::::new(); + hll.extend((0..1000).map(|i| i.to_string())); + compare_with_delta(hll.count(), 1000); + } +} diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index f0b5622c43bad..fef2af58b99db 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -102,7 +102,7 @@ pub struct Statistics { /// Statistics on a column level pub column_statistics: Option>, /// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not - /// an estimate). Any or all other fields might still be None, in which case no information is known. + /// an estimate). Any or all other fields might still be None, in which case no information is known. /// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value. pub is_exact: bool, } @@ -625,6 +625,7 @@ pub mod functions; pub mod hash_aggregate; pub mod hash_join; pub mod hash_utils; +pub(crate) mod hyperloglog; pub mod join_utils; pub mod json; pub mod limit;