From 0651ba2e038a952969d40ecfd7d80ea88ed4afa2 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 9 Oct 2021 23:27:15 +0800 Subject: [PATCH 1/2] add hyperloglog implementation --- .../src/physical_plan/hyperloglog/mod.rs | 246 ++++++++++++++++++ datafusion/src/physical_plan/mod.rs | 3 +- 2 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 datafusion/src/physical_plan/hyperloglog/mod.rs diff --git a/datafusion/src/physical_plan/hyperloglog/mod.rs b/datafusion/src/physical_plan/hyperloglog/mod.rs new file mode 100644 index 0000000000000..16d6591a93060 --- /dev/null +++ b/datafusion/src/physical_plan/hyperloglog/mod.rs @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains a modified version specifically for the +//! implementation of `approx_distinct` function. +//! +//! https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs +//! https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c + +// TODO remove this when hooked up with the rest +#![allow(dead_code)] + +use ahash::AHasher; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; + +const PRECISION: usize = 14_usize; +/// mask to obtain index into the registers +const HLL_P_MASK: u64 = (1 << PRECISION) as u64 - 1; +/// the number of bits of the hash value used determining the number of leading zeros +const HLL_Q: usize = 64_usize - PRECISION; +const NUM_REGISTERS: usize = 1_usize << PRECISION; + +#[derive(Clone, Debug)] +pub(crate) struct HyperLogLog +where + T: Hash + ?Sized, +{ + registers: [u8; NUM_REGISTERS], + phantom: PhantomData, +} + +impl HyperLogLog +where + T: Hash + ?Sized, +{ + /// Creates a new, empty HyperLogLog. + pub fn new() -> Self { + let registers = [0; NUM_REGISTERS]; + Self { + registers, + phantom: PhantomData, + } + } + + /// choice of hash function: ahash is already an dependency + /// and it fits the requirements of being a 64bit hash with + /// reasonable performance. + #[inline] + fn hash_value(&self, obj: &T) -> u64 { + let mut hasher = AHasher::default(); + obj.hash(&mut hasher); + hasher.finish() + } + + /// Adds an element to the HyperLogLog. + pub fn add(&mut self, obj: &T) { + let hash = self.hash_value(obj); + let index = (hash & HLL_P_MASK) as usize; + // count the 0000...1 pattern length + let p = ((hash >> PRECISION) | (1_u64 << HLL_Q)).leading_zeros() + 1; + self.registers[index] = self.registers[index].max(p as u8); + } + + /// Get the register histogram (each value in register index into + /// the histogram for its leading 000...1 pattern length); u32 is enough because we only have + /// 16384 registers + #[inline] + fn get_hll_histogram(&self) -> [u32; HLL_Q + 2] { + let mut hll_histo = [0; HLL_Q + 2]; + // build the histogram, hopefully this can be unrolled + (0..NUM_REGISTERS).for_each(|i| { + let index = self.registers[i] as usize; + hll_histo[index] += 1; + }); + hll_histo + } + + /// Guess the number of unique elements seen by the HyperLogLog. + pub fn count(&self) -> usize { + let hll_histo = self.get_hll_histogram(); + let m = NUM_REGISTERS as f64; + let mut z: f64 = m * hll_tau((m - hll_histo[HLL_Q + 1] as f64) / m); + (1..=HLL_Q).rev().for_each(|i| { + z += hll_histo[i] as f64; + z *= 0.5; + }); + z += m * hll_sigma(hll_histo[0] as f64 / m); + ((0.5 / 2_f64.ln()) * m * m / z).round() as u64 as usize + } +} + +/// Helper function sigma as defined in +/// "New cardinality estimation algorithms for HyperLogLog sketches" +/// Otmar Ertl, arXiv:1702.01284 +#[inline] +fn hll_sigma(x: f64) -> f64 { + if x == 1. { + f64::INFINITY + } else { + let mut y = 1.0; + let mut z = x; + let mut x = x; + loop { + x *= x; + let z_prime = z; + z += x * y; + y += y; + if z_prime == z { + break; + } + } + z + } +} + +/// Helper function tau as defined in +/// "New cardinality estimation algorithms for HyperLogLog sketches" +/// Otmar Ertl, arXiv:1702.01284 +#[inline] +fn hll_tau(x: f64) -> f64 { + if x == 0.0 || x == 1.0 { + 0.0 + } else { + let mut y = 1.0; + let mut z = 1.0 - x; + let mut x = x; + loop { + x = x.sqrt(); + let z_prime = z; + y *= 0.5; + z -= (1.0 - x).powf(2.0) * y; + if z_prime == z { + break; + } + } + z / 3.0 + } +} + +impl Extend for HyperLogLog +where + T: Hash, +{ + fn extend>(&mut self, iter: S) { + for elem in iter { + self.add(&elem); + } + } +} + +impl<'a, T> Extend<&'a T> for HyperLogLog +where + T: 'a + Hash + ?Sized, +{ + fn extend>(&mut self, iter: S) { + for elem in iter { + self.add(elem); + } + } +} + +#[cfg(test)] +mod tests { + use super::HyperLogLog; + + const ERROR_RATE: f64 = 0.81; + + fn compare_with_delta(got: usize, expected: usize) { + let diff = (got as i64) - (expected as i64); + let diff = diff.abs() as usize; + let delta = ((expected as f64) * ERROR_RATE).ceil() as usize; + assert!( + diff <= delta, + "{} is not near {} percent of {}", + got, + ERROR_RATE, + expected + ); + } + + macro_rules! sized_number_test { + ($SIZE: expr, $T: tt) => {{ + let mut hll = HyperLogLog::<$T>::new(); + for i in 0..$SIZE { + hll.add(&i); + } + compare_with_delta(hll.count(), $SIZE); + }}; + } + + macro_rules! typed_number_test { + ($SIZE: expr) => {{ + sized_number_test!($SIZE, u16); + sized_number_test!($SIZE, u32); + sized_number_test!($SIZE, u64); + sized_number_test!($SIZE, u128); + sized_number_test!($SIZE, i16); + sized_number_test!($SIZE, i32); + sized_number_test!($SIZE, i64); + sized_number_test!($SIZE, i128); + }}; + } + + #[test] + fn test_empty() { + let hll = HyperLogLog::::new(); + assert_eq!(hll.count(), 0); + } + + #[test] + fn test_one() { + let mut hll = HyperLogLog::::new(); + hll.add(&1); + assert_eq!(hll.count(), 1); + } + + #[test] + fn test_number_100() { + typed_number_test!(100); + } + + #[test] + fn test_number_1k() { + typed_number_test!(1000); + } + + #[test] + fn test_number_10k() { + typed_number_test!(10000); + } +} diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index f0b5622c43bad..fef2af58b99db 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -102,7 +102,7 @@ pub struct Statistics { /// Statistics on a column level pub column_statistics: Option>, /// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not - /// an estimate). Any or all other fields might still be None, in which case no information is known. + /// an estimate). Any or all other fields might still be None, in which case no information is known. /// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value. pub is_exact: bool, } @@ -625,6 +625,7 @@ pub mod functions; pub mod hash_aggregate; pub mod hash_join; pub mod hash_utils; +pub(crate) mod hyperloglog; pub mod join_utils; pub mod json; pub mod limit; From 48fe5c8c545749f7a993931fab206bcf9ee93fa4 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 10 Oct 2021 16:39:04 +0800 Subject: [PATCH 2/2] adding string type --- .../src/physical_plan/hyperloglog/mod.rs | 153 ++++++++++++------ 1 file changed, 105 insertions(+), 48 deletions(-) diff --git a/datafusion/src/physical_plan/hyperloglog/mod.rs b/datafusion/src/physical_plan/hyperloglog/mod.rs index 16d6591a93060..25e5213110796 100644 --- a/datafusion/src/physical_plan/hyperloglog/mod.rs +++ b/datafusion/src/physical_plan/hyperloglog/mod.rs @@ -15,25 +15,39 @@ // specific language governing permissions and limitations // under the License. -//! This module contains a modified version specifically for the -//! implementation of `approx_distinct` function. +//! # HyperLogLog //! -//! https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs -//! https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c +//! `hyperloglog` is a module that contains a modified version +//! of [redis's implementation](https://github.com/redis/redis/blob/4930d19e70c391750479951022e207e19111eb55/src/hyperloglog.c) +//! with some modification based on strong assumption of usage +//! within datafusion, so that [`approx_distinct`] function can +//! be efficiently implemented. +//! +//! Specifically, like Redis's version, this HLL structure uses +//! 2**14 = 16384 registers, which means the standard error is +//! 1.04/(16384**0.5) = 0.8125%. Unlike Redis, the register takes +//! up full [`u8`] size instead of a raw int* and thus saves some +//! tricky bit shifting techniques used in the original version. +//! This results in a memory usage increase from 12Kib to 16Kib. +//! Also only the dense version is adopted, so there's no automatic +//! conversion, largely to simplify the code. +//! +//! This module also borrows some code structure from [pdatastructs.rs](https://github.com/crepererum/pdatastructs.rs/blob/3997ed50f6b6871c9e53c4c5e0f48f431405fc63/src/hyperloglog.rs). // TODO remove this when hooked up with the rest #![allow(dead_code)] -use ahash::AHasher; -use std::hash::{Hash, Hasher}; +use ahash::{AHasher, RandomState}; +use std::hash::{BuildHasher, Hash, Hasher}; use std::marker::PhantomData; -const PRECISION: usize = 14_usize; -/// mask to obtain index into the registers -const HLL_P_MASK: u64 = (1 << PRECISION) as u64 - 1; +/// The greater is P, the smaller the error. +const HLL_P: usize = 14_usize; /// the number of bits of the hash value used determining the number of leading zeros -const HLL_Q: usize = 64_usize - PRECISION; -const NUM_REGISTERS: usize = 1_usize << PRECISION; +const HLL_Q: usize = 64_usize - HLL_P; +const NUM_REGISTERS: usize = 1_usize << HLL_P; +/// mask to obtain index into the registers +const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1; #[derive(Clone, Debug)] pub(crate) struct HyperLogLog @@ -44,6 +58,14 @@ where phantom: PhantomData, } +/// fixed seed for the hashing so that values are consistent across runs +const SEED: RandomState = RandomState::with_seeds( + 0x885f6cab121d01a3_u64, + 0x71e4379f2976ad8f_u64, + 0xbf30173dd28a8816_u64, + 0x0eaea5d736d733a4_u64, +); + impl HyperLogLog where T: Hash + ?Sized, @@ -62,7 +84,7 @@ where /// reasonable performance. #[inline] fn hash_value(&self, obj: &T) -> u64 { - let mut hasher = AHasher::default(); + let mut hasher: AHasher = SEED.build_hasher(); obj.hash(&mut hasher); hasher.finish() } @@ -71,36 +93,33 @@ where pub fn add(&mut self, obj: &T) { let hash = self.hash_value(obj); let index = (hash & HLL_P_MASK) as usize; - // count the 0000...1 pattern length - let p = ((hash >> PRECISION) | (1_u64 << HLL_Q)).leading_zeros() + 1; + let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1; self.registers[index] = self.registers[index].max(p as u8); } /// Get the register histogram (each value in register index into - /// the histogram for its leading 000...1 pattern length); u32 is enough because we only have - /// 16384 registers + /// the histogram; u32 is enough because we only have 2**14=16384 registers #[inline] - fn get_hll_histogram(&self) -> [u32; HLL_Q + 2] { - let mut hll_histo = [0; HLL_Q + 2]; - // build the histogram, hopefully this can be unrolled - (0..NUM_REGISTERS).for_each(|i| { - let index = self.registers[i] as usize; - hll_histo[index] += 1; - }); - hll_histo + fn get_histogram(&self) -> [u32; HLL_Q + 2] { + let mut histogram = [0; HLL_Q + 2]; + // hopefully this can be unrolled + for r in self.registers { + histogram[r as usize] += 1; + } + histogram } /// Guess the number of unique elements seen by the HyperLogLog. pub fn count(&self) -> usize { - let hll_histo = self.get_hll_histogram(); + let histogram = self.get_histogram(); let m = NUM_REGISTERS as f64; - let mut z: f64 = m * hll_tau((m - hll_histo[HLL_Q + 1] as f64) / m); - (1..=HLL_Q).rev().for_each(|i| { - z += hll_histo[i] as f64; + let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m); + for i in histogram[1..=HLL_Q].iter().rev() { + z += *i as f64; z *= 0.5; - }); - z += m * hll_sigma(hll_histo[0] as f64 / m); - ((0.5 / 2_f64.ln()) * m * m / z).round() as u64 as usize + } + z += m * hll_sigma(histogram[0] as f64 / m); + (0.5 / 2_f64.ln() * m * m / z).round() as usize } } @@ -143,7 +162,7 @@ fn hll_tau(x: f64) -> f64 { x = x.sqrt(); let z_prime = z; y *= 0.5; - z -= (1.0 - x).powf(2.0) * y; + z -= (1.0 - x).powi(2) * y; if z_prime == z { break; } @@ -176,20 +195,24 @@ where #[cfg(test)] mod tests { - use super::HyperLogLog; - - const ERROR_RATE: f64 = 0.81; + use super::{HyperLogLog, NUM_REGISTERS}; fn compare_with_delta(got: usize, expected: usize) { - let diff = (got as i64) - (expected as i64); - let diff = diff.abs() as usize; - let delta = ((expected as f64) * ERROR_RATE).ceil() as usize; + let expected = expected as f64; + let diff = (got as f64) - expected; + let diff = diff.abs() / expected; + // times 6 because we want the tests to be stable + // so we allow a rather large margin of error + // this is adopted from redis's unit test version as well + let margin = 1.04 / ((NUM_REGISTERS as f64).sqrt()) * 6.0; assert!( - diff <= delta, - "{} is not near {} percent of {}", + diff <= margin, + "{} is not near {} percent of {} which is ({}, {})", got, - ERROR_RATE, - expected + margin, + expected, + expected * (1.0 - margin), + expected * (1.0 + margin) ); } @@ -203,16 +226,22 @@ mod tests { }}; } + macro_rules! typed_large_number_test { + ($SIZE: expr) => {{ + sized_number_test!($SIZE, u64); + sized_number_test!($SIZE, u128); + sized_number_test!($SIZE, i64); + sized_number_test!($SIZE, i128); + }}; + } + macro_rules! typed_number_test { ($SIZE: expr) => {{ sized_number_test!($SIZE, u16); sized_number_test!($SIZE, u32); - sized_number_test!($SIZE, u64); - sized_number_test!($SIZE, u128); sized_number_test!($SIZE, i16); sized_number_test!($SIZE, i32); - sized_number_test!($SIZE, i64); - sized_number_test!($SIZE, i128); + typed_large_number_test!($SIZE); }}; } @@ -236,11 +265,39 @@ mod tests { #[test] fn test_number_1k() { - typed_number_test!(1000); + typed_number_test!(1_000); } #[test] fn test_number_10k() { - typed_number_test!(10000); + typed_number_test!(10_000); + } + + #[test] + fn test_number_100k() { + typed_large_number_test!(100_000); + } + + #[test] + fn test_number_1m() { + typed_large_number_test!(1_000_000); + } + + #[test] + fn test_u8() { + let mut hll = HyperLogLog::<[u8]>::new(); + for i in 0..1000 { + let s = i.to_string(); + let b = s.as_bytes(); + hll.add(b); + } + compare_with_delta(hll.count(), 1000); + } + + #[test] + fn test_string() { + let mut hll = HyperLogLog::::new(); + hll.extend((0..1000).map(|i| i.to_string())); + compare_with_delta(hll.count(), 1000); } }