Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7dbc75e
blooming aggs
clintropolis Sep 22, 2018
e1c9f77
partially address review
clintropolis Oct 6, 2018
935a28a
fix docs
clintropolis Oct 8, 2018
c17f8b5
minor test refactor after rebase
clintropolis Oct 23, 2018
03a99bc
use copied bloomkfilter
clintropolis Nov 13, 2018
21eb78f
add ByteBuffer methods to BloomKFilter to allow agg to use in place, …
clintropolis Jan 8, 2019
d1ba9d4
add methods to BloomKFilter to get number of set bits, use in compara…
clintropolis Jan 8, 2019
f284aeb
more docs
clintropolis Jan 9, 2019
71d00cf
fix
clintropolis Jan 9, 2019
cec7706
fix style
clintropolis Jan 9, 2019
ee91f3b
simplify bloomfilter bytebuffer merge, change methods to allow passin…
clintropolis Jan 9, 2019
6470dc6
oof, more fixes
clintropolis Jan 9, 2019
233aa9e
more sane docs example
clintropolis Jan 9, 2019
654a994
fix it
clintropolis Jan 9, 2019
6c04d24
do the right thing in the right place
clintropolis Jan 11, 2019
2e5f43d
formatting
clintropolis Jan 11, 2019
a12bad1
fix
clintropolis Jan 11, 2019
ee6ecd6
avoid conflict
clintropolis Jan 11, 2019
70882c9
typo fixes, faster comparator, docs for comparator behavior
clintropolis Jan 12, 2019
3858cb8
unused imports
clintropolis Jan 12, 2019
2ccc137
use buffer comparator instead of deserializing
clintropolis Jan 12, 2019
ff87a37
striped readwrite lock for buffer agg, null handling comparator, othe…
clintropolis Jan 16, 2019
a635a09
style fixes
clintropolis Jan 16, 2019
34183ac
style
clintropolis Jan 17, 2019
daad5a6
remove sync for now
clintropolis Jan 18, 2019
b310e52
oops
clintropolis Jan 18, 2019
8ded684
Merge remote-tracking branch 'upstream/master' into bloom-filter-aggr…
clintropolis Jan 18, 2019
d6a3809
consistency
clintropolis Jan 18, 2019
d0b90b2
inspect runtime shape of selector instead of selector plus, static co…
clintropolis Jan 21, 2019
435e784
CardinalityBufferAggregator inspect selectors instead of selectorPluses
clintropolis Jan 21, 2019
3bdddb1
fix style
clintropolis Jan 21, 2019
d11f784
Merge remote-tracking branch 'upstream/master' into bloom-filter-aggr…
clintropolis Jan 21, 2019
0f08686
Merge remote-tracking branch 'upstream/master' into bloom-filter-aggr…
clintropolis Jan 22, 2019
74feb97
refactor away from using ColumnSelectorPlus and ColumnSelectorStrateg…
clintropolis Jan 24, 2019
3136ce7
adjustment
clintropolis Jan 24, 2019
a50b2b2
fix teamcity error?
clintropolis Jan 25, 2019
68bb28f
rename nil aggs to empty, change empty agg constructor signature, add…
clintropolis Jan 25, 2019
b61e6f3
Merge remote-tracking branch 'upstream/master' into bloom-filter-aggr…
clintropolis Jan 26, 2019
8ebe1d9
use stringutils base64 stuff to be chill with master
clintropolis Jan 26, 2019
d1a3c44
Merge remote-tracking branch 'upstream/master' into bloom-filter-aggr…
clintropolis Jan 28, 2019
a56615b
add aggregate combiner, comment
clintropolis Jan 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 91 additions & 13 deletions docs/content/development/extensions-core/bloom-filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,44 @@ title: "Bloom Filter"

# Bloom Filter

Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an extension.
This extension adds the ability to both construct bloom filters from query results, and filter query results by testing
against a bloom filter. Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an
extension.

BloomFilter is a probabilistic data structure for set membership check.
Following are some characterstics of BloomFilter
A BloomFilter is a probabilistic data structure for performing a set membership check. A bloom filter is a good candidate
to use with Druid for cases where an explicit filter is impossible, e.g. filtering a query against a set of millions of
values.

Following are some characteristics of BloomFilters:
- BloomFilters are highly space efficient when compared to using a HashSet.
- Because of the probabilistic nature of bloom filter false positive (element not present in bloom filter but test() says true) are possible
- false negatives are not possible (if element is present then test() will never say false).
- The false positive probability is configurable (default: 5%) depending on which storage requirement may increase or decrease.
- Lower the false positive probability greater is the space requirement.
- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
- During the creation of bloom filter expected number of entries must be specified.If the number of insertions exceed the specified initial number of entries then false positive probability will increase accordingly.
- Because of the probabilistic nature of bloom filters, false positive results are possible (element was not actually
inserted into a bloom filter during construction, but `test()` says true)
- False negatives are not possible (if element is present then `test()` will never say false).
- The false positive probability of this implementation is currently fixed at 5%, but increasing the number of entries
that the filter can hold can decrease this false positive rate in exchange for overall size.
- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter. During the creation of bloom filter expected number of entries must be specified. If the number of insertions exceed
the specified initial number of entries then false positive probability will increase accordingly.

This extension is currently based on `org.apache.hive.common.util.BloomKFilter` from `hive-storage-api`. Internally,
this implementation uses Murmur3 as the hash algorithm.

To construct a BloomKFilter externally with Java to use as a filter in a Druid query:

```java
BloomKFilter bloomFilter = new BloomKFilter(1500);
bloomFilter.addString("value 1");
bloomFilter.addString("value 2");
bloomFilter.addString("value 3");
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BloomKFilter.serialize(byteArrayOutputStream, bloomFilter);
String base64Serialized = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
```

Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash algorithm.
This string can then be used in the native or sql Druid query.

### JSON Representation of Bloom Filter
## Filtering queries with a Bloom Filter

### JSON Specification of Bloom Filter
```json
{
"type" : "bloom",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making this valid JSON so it doesn't get syntax highlighted

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this is really only ugly on github and it looks ok translated to the website docs

Expand Down Expand Up @@ -75,12 +97,68 @@ Bloom filters are supported in SQL via the `bloom_filter_test` operator:
SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
```


### Expression and Virtual Column Support

The bloom filter extension also adds a bloom filter [Druid expression](../../misc/math-expr.html) which shares syntax
with the SQL operator.

```sql
bloom_filter_test(<expr>, '<serialized_bytes_for_BloomKFilter>')
```
```

## Bloom Filter Query Aggregator

Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refers to bloom aggregator here, but in the JSON spec the type is bloomFilter.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bloom is the correct value to be consistent with the filter type name, updated docs to reflect that.


### JSON Specification of Bloom Filter Aggregator

```json
{
"type": "bloom",
"name": <output_field_name>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making this valid JSON so it doesn't get syntax highlighted

"maxNumEntries": <maximum_number_of_elements_for_BloomKFilter>
"field": <dimension_spec>
}
```

|Property |Description |required? |
|-------------------------|------------------------------|----------------------------------|
|`type` |Aggregator Type. Should always be `bloom`|yes|
|`name` |Output field name |yes|
|`field` |[DimensionSpec](./../dimensionspecs.html) to add to `org.apache.hive.common.util.BloomKFilter` | yes |
|`maxNumEntries` |Maximum number of distinct values supported by `org.apache.hive.common.util.BloomKFilter`, default `1500`| no |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be worthwhile under maxNumEntries to discuss the implications of having more elements than the value provided here. Also, any discussion on how to choose an appropriate value here to get a given false-positive rate would also be helpful.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, digging into it, in BloomKFilter the false positive rate is not controllable in the manner of BloomFilter, and is fixed to the default of 5%. However I guess that can be indirectly controlled by increasing the maxNumEntries, though that's kind of lame. Having a higher cardinality than the value of maxNumEntries will cause the false positive probability to reach 1, constructing a useless bloom filter, so that should definitely be added to the docs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docs to include fixed 5% false positive rate, though no formula for how changing maxNumEntries affects that yet.


### Example

```json
{
"queryType": "timeseries",
"dataSource": "wikiticker",
"intervals": [ "2015-09-12T00:00:00.000/2015-09-13T00:00:00.000" ],
"granularity": "day",
"aggregations": [
{
"type": "bloom",
"name": "userBloom",
"maxNumEntries": 100000,
"field": {
"type":"default",
"dimension":"user",
"outputType": "STRING"
}
}
]
}
```

response

```json
[{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}]
```

These values can then be set in the filter specification above.

Ordering results by a bloom filter aggregator, for example in a TopN query, will perform a comparatively expensive
linear scan _of the filter itself_ to count the number of set bits as a means of approximating how many items have been
added to the set. As such, ordering by an alternate aggregation is recommended if possible.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public Pair<String, DerivativeDataSourceMetadata> map(int index, ResultSet r, St
}

/**
* caculate the average data size per segment granularity for a given datasource.
* calculate the average data size per segment granularity for a given datasource.
*
* e.g. for a datasource, there're 5 segments as follows,
* interval = "2018-04-01/2017-04-02", segment size = 1024 * 1024 * 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static Set<String> extractFieldsFromAggregations(List<AggregatorFactory>
}

/**
* caculate the intervals which are covered by interval2, but not covered by interval1.
* calculate the intervals which are covered by interval2, but not covered by interval1.
* result intervals = interval2 - interval1 ∩ interval2
* e.g.
* a list of interval2: ["2018-04-01T00:00:00.000Z/2018-04-02T00:00:00.000Z",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory;
import org.apache.druid.query.aggregation.bloom.BloomFilterSerde;
import org.apache.druid.query.filter.BloomDimFilter;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.segment.serde.ComplexMetrics;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -41,10 +44,17 @@ public class BloomFilterSerializersModule extends SimpleModule

public BloomFilterSerializersModule()
{
registerSubtypes(new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME));
registerSubtypes(
new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME),
new NamedType(BloomFilterAggregatorFactory.class, BLOOM_FILTER_TYPE_NAME)
);
addSerializer(BloomKFilter.class, new BloomKFilterSerializer());
addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
addDeserializer(BloomKFilterHolder.class, new BloomKFilterHolderDeserializer());

if (ComplexMetrics.getSerdeForType(BLOOM_FILTER_TYPE_NAME) == null) {
ComplexMetrics.registerSerde(BLOOM_FILTER_TYPE_NAME, new BloomFilterSerde());
}
}

private static class BloomKFilterSerializer extends StdSerializer<BloomKFilter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.bloom;

import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseNullableColumnValueSelector;

import javax.annotation.Nullable;

public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
{
final BloomKFilter collector;
protected final TSelector selector;

BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector)
{
this.collector = collector;
this.selector = selector;
}

@Nullable
@Override
public Object get()
{
return collector;
}

@Override
public float getFloat()
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()");
}

@Override
public long getLong()
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()");
}

@Override
public double getDouble()
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()");
}

@Override
public void close()
{
// nothing to close
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.bloom;

import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;

import java.nio.ByteBuffer;

public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNullableColumnValueSelector> implements BufferAggregator
{
protected final int maxNumEntries;
protected final TSelector selector;

BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries)
{
this.selector = selector;
this.maxNumEntries = maxNumEntries;
}

abstract void bufferAdd(ByteBuffer buf);

@Override
public void init(ByteBuffer buf, int position)
{
final ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
BloomKFilter filter = new BloomKFilter(maxNumEntries);
BloomKFilter.serialize(mutationBuffer, filter);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
final int oldPosition = buf.position();
buf.position(position);
bufferAdd(buf);
buf.position(oldPosition);
}


@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
// | k (byte) | numLongs (int) | bitset (long[numLongs]) |
int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
mutationBuffer.limit(position + sizeBytes);
return mutationBuffer.slice();
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()");
}

@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()");
}

@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()");
}

@Override
public void close()
{
// nothing to close
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
Loading