diff --git a/distribution/pom.xml b/distribution/pom.xml
index 06b3c5a19da5..4876d8416e2a 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -45,7 +45,6 @@
--clean
-
dist
@@ -91,6 +90,8 @@
-c
org.apache.druid.extensions:druid-avro-extensions
-c
+ org.apache.druid.extensions:druid-bloom-filter
+ -c
org.apache.druid.extensions:druid-datasketches
-c
org.apache.druid.extensions:druid-hdfs-storage
diff --git a/docs/content/development/extensions-core/bloom-filter.md b/docs/content/development/extensions-core/bloom-filter.md
new file mode 100644
index 000000000000..140111d123a3
--- /dev/null
+++ b/docs/content/development/extensions-core/bloom-filter.md
@@ -0,0 +1,45 @@
+---
+layout: doc_page
+---
+
+# Druid Bloom Filter
+
+Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an extension.
+
+BloomFilter is a probabilistic data structure for set membership check.
+Following are some characterstics of BloomFilter
+- BloomFilters are highly space efficient when compared to using a HashSet.
+- Because of the probabilistic nature of bloom filter false positive (element not present in bloom filter but test() says true) are possible
+- false negatives are not possible (if element is present then test() will never say false).
+- The false positive probability is configurable (default: 5%) depending on which storage requirement may increase or decrease.
+- Lower the false positive probability greater is the space requirement.
+- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+- During the creation of bloom filter expected number of entries must be specified.If the number of insertions exceed the specified initial number of entries then false positive probability will increase accordingly.
+
+Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash algorithm.
+
+### Json Representation of Bloom Filter
+```json
+{
+ "type" : "bloom",
+ "dimension" : ,
+ "bloomKFilter" : ,
+ "extractionFn" :
+}
+```
+
+|Property |Description |required? |
+|-------------------------|------------------------------|----------------------------------|
+|`type` |Filter Type. Should always be `bloom`|yes|
+|`dimension` |The dimension to filter over. | yes |
+|`bloomKFilter` |Base64 encoded Binary representation of `org.apache.hive.common.util.BloomKFilter`| yes |
+|`extractionFn`|[Extraction function](./../dimensionspecs.html#extraction-functions) to apply to the dimension values |no|
+
+
+### Serialized Format for BloomKFilter
+ Serialized BloomKFilter format:
+ - 1 byte for the number of hash functions.
+ - 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
+ - big endian longs in the BloomKFilter bitset
+
+Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
\ No newline at end of file
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 1712e0484080..7a1e5dd70a49 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -23,6 +23,7 @@ Core extensions are maintained by Druid committers.
|----|-----------|----|
|druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)|
|druid-basic-security|Support for Basic HTTP authentication and role-based access control.|[link](../development/extensions-core/druid-basic-security.html)|
+|druid-bloom-filter|Support for providing Bloom filters in druid queries.|[link](../development/extensions-core/bloom-filter.html)|
|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)|
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)|
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
diff --git a/extensions-core/druid-bloom-filter/pom.xml b/extensions-core/druid-bloom-filter/pom.xml
new file mode 100644
index 000000000000..be44fe9f94c9
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/pom.xml
@@ -0,0 +1,65 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.druid.extensions
+ druid-bloom-filter
+ druid-bloom-filter
+ druid-bloom-filter
+
+
+ org.apache.druid
+ druid
+ 0.13.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ provided
+
+
+ org.apache.hive
+ hive-storage-api
+ 2.7.0
+
+
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ test
+ test-jar
+
+
+ junit
+ junit
+ test
+
+
+
+
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java
new file mode 100644
index 000000000000..ffdbbfa05b66
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterExtensionModule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class BloomFilterExtensionModule implements DruidModule
+{
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(new BloomFilterSerializersModule());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java
new file mode 100644
index 000000000000..21af16d649f7
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.druid.query.filter.BloomDimFilter;
+import org.apache.hive.common.util.BloomKFilter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class BloomFilterSerializersModule extends SimpleModule
+{
+ public static String BLOOM_FILTER_TYPE_NAME = "bloom";
+
+ public BloomFilterSerializersModule()
+ {
+ registerSubtypes(
+ new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME)
+ );
+ addSerializer(BloomKFilter.class, new BloomKFilterSerializer());
+ addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
+ }
+
+ public static class BloomKFilterSerializer extends StdSerializer
+ {
+
+ public BloomKFilterSerializer()
+ {
+ super(BloomKFilter.class);
+ }
+
+ @Override
+ public void serialize(
+ BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider
+ ) throws IOException
+ {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ jsonGenerator.writeBinary(bytes);
+ }
+ }
+
+ public static class BloomKFilterDeserializer extends StdDeserializer
+ {
+
+ protected BloomKFilterDeserializer()
+ {
+ super(BloomKFilter.class);
+ }
+
+ @Override
+ public BloomKFilter deserialize(
+ JsonParser jsonParser, DeserializationContext deserializationContext
+ ) throws IOException, JsonProcessingException
+ {
+ byte[] bytes = jsonParser.getBinaryValue();
+ return BloomKFilter.deserialize(new ByteArrayInputStream(bytes));
+
+ }
+ }
+}
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java
new file mode 100644
index 000000000000..808f709f60ad
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.segment.filter.DimensionPredicateFilter;
+import org.apache.hive.common.util.BloomKFilter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+
+/**
+ */
+public class BloomDimFilter implements DimFilter
+{
+
+ private final String dimension;
+ private final BloomKFilter bloomKFilter;
+ private final ExtractionFn extractionFn;
+
+ @JsonCreator
+ public BloomDimFilter(
+ @JsonProperty("dimension") String dimension,
+ @JsonProperty("bloomKFilter") BloomKFilter bloomKFilter,
+ @JsonProperty("extractionFn") ExtractionFn extractionFn
+ )
+ {
+ Preconditions.checkArgument(dimension != null, "dimension must not be null");
+ Preconditions.checkNotNull(bloomKFilter);
+ this.dimension = dimension;
+ this.bloomKFilter = bloomKFilter;
+ this.extractionFn = extractionFn;
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try {
+ BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException(StringUtils.format("Exception when generating cache key for [%s]", this), e);
+ }
+ byte[] bloomFilterBytes = byteArrayOutputStream.toByteArray();
+ return new CacheKeyBuilder(DimFilterUtils.BLOOM_DIM_FILTER_CACHE_ID)
+ .appendString(dimension)
+ .appendByte(DimFilterUtils.STRING_SEPARATOR)
+ .appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey())
+ .appendByte(DimFilterUtils.STRING_SEPARATOR)
+ .appendByteArray(bloomFilterBytes)
+ .build();
+ }
+
+
+ @Override
+ public DimFilter optimize()
+ {
+ return this;
+ }
+
+ @Override
+ public Filter toFilter()
+ {
+ return new DimensionPredicateFilter(
+ dimension,
+ new DruidPredicateFactory()
+ {
+ @Override
+ public Predicate makeStringPredicate()
+ {
+ return str -> {
+ if (str == null) {
+ return bloomKFilter.testBytes(null, 0, 0);
+ }
+ return bloomKFilter.testString(str);
+ };
+ }
+
+ @Override
+ public DruidLongPredicate makeLongPredicate()
+ {
+ return new DruidLongPredicate()
+ {
+ @Override
+ public boolean applyLong(long input)
+ {
+ return bloomKFilter.testLong(input);
+ }
+
+ @Override
+ public boolean applyNull()
+ {
+ return bloomKFilter.testBytes(null, 0, 0);
+ }
+ };
+ }
+
+ @Override
+ public DruidFloatPredicate makeFloatPredicate()
+ {
+ return new DruidFloatPredicate()
+ {
+ @Override
+ public boolean applyFloat(float input)
+ {
+ return bloomKFilter.testFloat(input);
+ }
+
+ @Override
+ public boolean applyNull()
+ {
+ return bloomKFilter.testBytes(null, 0, 0);
+ }
+ };
+ }
+
+ @Override
+ public DruidDoublePredicate makeDoublePredicate()
+ {
+ return new DruidDoublePredicate()
+ {
+ @Override
+ public boolean applyDouble(double input)
+ {
+ return bloomKFilter.testDouble(input);
+ }
+
+ @Override
+ public boolean applyNull()
+ {
+ return bloomKFilter.testBytes(null, 0, 0);
+ }
+ };
+ }
+ },
+ extractionFn
+ );
+ }
+
+ @JsonProperty
+ public String getDimension()
+ {
+ return dimension;
+ }
+
+ @JsonProperty
+ public BloomKFilter getBloomKFilter()
+ {
+ return bloomKFilter;
+ }
+
+ @JsonProperty
+ public ExtractionFn getExtractionFn()
+ {
+ return extractionFn;
+ }
+
+ @Override
+ public String toString()
+ {
+ if (extractionFn != null) {
+ return StringUtils.format("%s(%s) = %s", extractionFn, dimension, bloomKFilter);
+ } else {
+ return StringUtils.format("%s = %s", dimension, bloomKFilter);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BloomDimFilter that = (BloomDimFilter) o;
+
+ if (!dimension.equals(that.dimension)) {
+ return false;
+ }
+ if (bloomKFilter != null ? !bloomKFilter.equals(that.bloomKFilter) : that.bloomKFilter != null) {
+ return false;
+ }
+ return extractionFn != null ? extractionFn.equals(that.extractionFn) : that.extractionFn == null;
+ }
+
+ @Override
+ public RangeSet getDimensionRangeSet(String dimension)
+ {
+ return null;
+ }
+
+ @Override
+ public HashSet getRequiredColumns()
+ {
+ return Sets.newHashSet(dimension);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = dimension.hashCode();
+ result = 31 * result + (bloomKFilter != null ? bloomKFilter.hashCode() : 0);
+ result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/extensions-core/druid-bloom-filter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/druid-bloom-filter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 000000000000..cf441a986112
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1 @@
+org.apache.druid.guice.BloomFilterExtensionModule
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java
new file mode 100644
index 000000000000..181235a34778
--- /dev/null
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.BloomFilterSerializersModule;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.query.extraction.MapLookupExtractor;
+import org.apache.druid.query.extraction.TimeDimExtractionFn;
+import org.apache.druid.query.lookup.LookupExtractionFn;
+import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.filter.BaseFilterTest;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.hive.common.util.BloomKFilter;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class BloomDimFilterTest extends BaseFilterTest
+{
+ private static final String TIMESTAMP_COLUMN = "timestamp";
+
+ private static final InputRowParser