diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 0e2049c8bca1..ac2eb41f102d 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -186,6 +186,12 @@
${project.parent.version}test
+
+ org.apache.druid.extensions.contrib
+ oak-incremental-index
+ ${project.parent.version}
+ test
+
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index 9c42bca4e44d..07c59a9c719a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -73,6 +73,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -106,6 +107,8 @@ public class FilteredAggregatorBenchmark
{
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
@Param({"75000"})
@@ -204,7 +207,7 @@ public void setup()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
@@ -234,7 +237,7 @@ public void tearDown()
@State(Scope.Benchmark)
public static class IncrementalIndexIngestState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index bfc3c2975260..6ff6cbc85f80 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -47,6 +47,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -84,7 +85,7 @@ public class IncrementalIndexReadBenchmark
@Param({"true", "false"})
private boolean rollup;
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
@@ -92,6 +93,8 @@ public class IncrementalIndexReadBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 310f155432d7..f5fe8f5e6915 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -31,6 +31,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -68,7 +69,7 @@ public class IndexIngestionBenchmark
@Param({"none", "moderate", "high"})
private String rollupOpportunity;
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
private static final Logger log = new Logger(IndexIngestionBenchmark.class);
@@ -76,6 +77,8 @@ public class IndexIngestionBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 9b91a70293e5..d980e32401d3 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -37,6 +37,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.openjdk.jmh.annotations.Benchmark;
@@ -73,6 +74,8 @@ public class IndexPersistBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
@@ -93,7 +96,7 @@ public class IndexPersistBenchmark
@Param({"none", "moderate", "high"})
private String rollupOpportunity;
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
private AppendableIndexSpec appendableIndexSpec;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 6310dd499dbd..87577377a9d6 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -90,6 +90,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -154,6 +155,8 @@ public class GroupByBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
@@ -532,7 +535,7 @@ public String getFormatString()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index b9ae2a40f662..5fff6081b258 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -71,6 +71,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -125,6 +126,8 @@ public class ScanBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
@@ -281,7 +284,7 @@ public void setup()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index a3de820c0682..b09996f2b777 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -78,6 +78,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -128,6 +129,8 @@ public class SearchBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
@@ -351,7 +354,7 @@ public void setup()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index 1f271e22274c..7b22303e9116 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -72,6 +72,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -122,6 +123,8 @@ public class TimeseriesBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
@@ -276,7 +279,7 @@ public void setup()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index 3eaf7dc85f48..a6a08919f9d5 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -69,6 +69,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -119,6 +120,8 @@ public class TopNBenchmark
static {
NullHandling.initializeForTests();
+ // Register OakIncrementalIndex
+ IncrementalIndexCreator.JSON_MAPPER.registerModule(OakIncrementalIndexModule.JACKSON_MODULE);
}
private AppendableIndexSpec appendableIndexSpec;
@@ -254,7 +257,7 @@ public void setup()
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
- @Param({"onheap", "offheap"})
+ @Param({"onheap", "oak"})
private String indexType;
IncrementalIndex incIndex;
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 41c56dbac7bd..7cc592c66fa4 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -616,6 +616,8 @@
org.apache.druid.extensions.contrib:aliyun-oss-extensions-corg.apache.druid.extensions.contrib:opentelemetry-emitter
+ -c
+ org.apache.druid.extensions.contrib:oak-incremental-index
diff --git a/docs/development/extensions-contrib/oak-incremental-index.md b/docs/development/extensions-contrib/oak-incremental-index.md
new file mode 100644
index 000000000000..8601c97b6d1a
--- /dev/null
+++ b/docs/development/extensions-contrib/oak-incremental-index.md
@@ -0,0 +1,147 @@
+---
+id: oak-incremental-index
+title: "Oak Incremental Index"
+---
+
+
+
+
+## Overview
+This extension improves the CPU and memory efficiency of Druid's ingestion.
+The same performance is achieved with 60% less memory and 50% less CPU time.
+Ingestion-throughput was nearly doubled with the same memory budget,
+and throughput was increased by 75% with the same CPU budget.
+Full details of the experimental setup and results are available [here](https://github.com/liran-funaro/druid/wiki/Evaluation).
+
+It uses [OakMap open source library](https://github.com/yahoo/Oak) to store the keys and values outside the JVM heap.
+
+#### Main enhancements provided by this extension:
+1. **Resource efficiency**: Use less CPU and RAM for the same performance
+2. **Performance**: Improving performance by using more concurrent workers with the same resource allocation
+
+### Installation
+Use the [pull-deps](../../operations/pull-deps.md) tool included with Druid to install this [extension](../../development/extensions.md#community-extensions) on all Druid middle-manager nodes.
+
+```bash
+java -classpath "/lib/*" org.apache.druid.cli.Main tools pull-deps -c org.apache.druid.extensions.contrib:oak-incremental-index
+```
+
+### Enabling
+After installation, to enable this extension, just add `oak-incremental-index` to `druid.extensions.loadList` in the
+middle-manager's `runtime.properties` file and then restart its nodes.
+
+For example:
+```bash
+druid.extensions.loadList=["oak-incremental-index"]
+```
+
+### Resource Configurations
+Since Oak allocates its keys/values directly, it is not subject to the JVM's on/off-heap limitations.
+Despite this, we have to respect runtime resource limits if they aren't specified by the ingestion specs.
+To ensure that the Oak index doesn't use more resources than available, we use the same default `maxBytesInMemory` as the on-heap index, i.e., 1/6 of the maximal heap size.
+It assumes that the middle-manager is configured correctly according to the machine's resources.
+
+As the middle-manager resource configuration typically consumes all the instance memory, we need to reduce the heap size in order to accommodate the Oak index.
+This can be achieved by ensuring the minimal heap size of the ingestion task is 2/3 (or less) of the current maximal heap size.
+Generally, smaller heap sizes lead to greater resource efficiency.
+
+If, for example, the middle-manager's `runtime.properties` file had the following configuration:
+```properties
+druid.indexer.runner.javaOpts=-Xms3g -Xmx3g
+```
+It is suggested to change it to:
+```properties
+druid.indexer.runner.javaOpts=-Xms2g -Xmx3g
+```
+If you'd like to maximize resource efficiency, try:
+```properties
+druid.indexer.runner.javaOpts=-Xms256m -Xmx3g
+```
+
+## Usage
+Some workloads may benefit from this extension, but others may perform better with the on-heap implementation
+(despite consuming more RAM than the Oak implementation).
+Hence, the user can choose which incremental-index implementation to use, depending on its needs.
+
+By default, the built-in on-heap incremental index is used.
+After enabling the extension, the user must modify the [`ingestion spec`](../../ingestion/ingestion-spec.md)
+to use Oak incremental index.
+Namely, modify [`appendable index spec`](#appendableindexspec)
+under [`tuning config`](../../ingestion/ingestion-spec.md#tuningconfig) as follows.
+
+### `spec`
+All properties in the [`ingestion spec`](../../ingestion/ingestion-spec.md) remains as before.
+However, the user need to pay attention to the following configurations under [`tuning config`](../../ingestion/ingestion-spec.md#tuningconfig).
+
+| Field | Description | Default |
+|----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------|
+| `appendableIndexSpec` | Tune which incremental index to use. See table [below](#appendableindexspec) for more information. | See table below |
+| `maxRowsInMemory` | The maximum number of records to store in memory before persisting to disk. Note that this is the number of rows post-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first), so they both determines the memory usage of the ingestion task. | `1000000` |
+| `maxBytesInMemory` | The maximum aggregate size of records, in bytes, to store in memory before persisting. For oak-incremental-index, this accurately limits the memory usage, but for the default on-heap implementation, this is based on a rough estimate of memory usage. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first), so they both determines the memory usage of the ingestion task. `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decrease, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.
Setting `maxBytesInMemory` to `-1` disables this check, meaning Druid will rely entirely on `maxRowsInMemory` to control memory usage. Setting it to zero (default) means the default value will be used (see on the right).
Note that for the on-heap implementation, the estimate of memory usage is designed to be an overestimate, and can be especially high when using complex ingest-time aggregators, including sketches. If this causes your indexing workloads to persist to disk too often, you can set `maxBytesInMemory` to `-1` and rely on `maxRowsInMemory` instead.
When using this extension, the `maxBytesInMemory` should be set according to your machine's memory limit to ensure Druid doesn't run into allocation problems. Specifically, take the available memory on your machine after deducting the JVM heap space, the off-heap buffers space, and memory used for other process on the machine, and then divide by two so that we'll have room for both the ingested index and the flushed one. | one-sixth of JVM heap size |
+| `skipBytesInMemoryOverheadCheck` | The calculation of `maxBytesInMemory` takes into account overhead objects created during ingestion and each intermediate persist. Setting this to true can exclude the bytes of these overhead objects from `maxBytesInMemory` check.
Note that the oak incremental index is stored in a different memory area than the overhead objects. As a result, it doesn't make sense to subject them to the same restrictions. Thus, setting this value to `true` is recommended for this extension. | false |
+| Other properties | See [`tuning config`](../../ingestion/ingestion-spec.md#tuningconfig) for more information. ||
+
+### `appendableIndexSpec`
+The user can choose between the built-in on-heap incremental index and this extension.
+
+| Field | Description | Default |
+|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
+| `type` | Each in-memory index has its own tuning type code. You must specify the type code that matches your in-memory index. Available options are `onheap` and `oak`. | `onheap` |
+
+On-heap implementations have no additional parameters. The following are the parameters for this extension.
+For most use cases, the defaults work well, but if there are issues, these can be adjusted.
+
+| Field | Description | Default |
+|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `type` | `oak` | |
+| `oakBlockSize` | OakMap stores its data outside the JVM heap in memory blocks. Larger blocks consolidate allocations and reduce overhead, but can also waste more memory if not fully utilized. The default has a reasonable balance between performance and memory usage when tested in a batch ingestion scenario. | `8MiB` |
+| `oakMaxMemoryCapacity` | OakMap maintains its memory blocks with an internal data-structure. Structure size is roughly `oakMaxMemoryCapacity/oakBlockSize`. We set this number to a large enough yet reasonable value so that this structure does not consume too much memory. | `32GiB` |
+| `oakChunkMaxItems` | OakMap maintains its entries in small chunks. Using larger chunks reduces the number of on-heap objects, but may incur more overhead when balancing the entries between chunks. The default showed the best performance in batch ingestion scenarios. | `256` |
+
+# Example
+The following tuning configuration is recommended for this extension:
+```json
+"tuningConfig": {
+ ,
+ "maxRowsInMemory": 1000000,
+ "maxBytesInMemory": 0,
+ "skipBytesInMemoryOverheadCheck": true,
+ "appendableIndexSpec": {
+ "type": "oak"
+ }
+}
+```
+
+The following is an example tuning configuration that sets all the common properties:
+```json
+"tuningConfig": {
+ ,
+ "maxRowsInMemory": 1000000,
+ "maxBytesInMemory": 0,
+ "skipBytesInMemoryOverheadCheck": true,
+ "appendableIndexSpec": {
+ "type": "oak",
+ "oakMaxMemoryCapacity": 34359738368,
+ "oakBlockSize": 8388608,
+ "oakChunkMaxItems": 256
+ }
+}
+```
+
diff --git a/docs/development/extensions.md b/docs/development/extensions.md
index 49f61c8d7467..34810d47d858 100644
--- a/docs/development/extensions.md
+++ b/docs/development/extensions.md
@@ -95,6 +95,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|druid-tdigestsketch|Support for approximate sketch aggregators based on [T-Digest](https://github.com/tdunning/t-digest)|[link](../development/extensions-contrib/tdigestsketch-quantiles.md)|
|gce-extensions|GCE Extensions|[link](../development/extensions-contrib/gce-extensions.md)|
|prometheus-emitter|Exposes [Druid metrics](../operations/metrics.md) for Prometheus server collection (https://prometheus.io/)|[link](./extensions-contrib/prometheus.md)|
+|oak-incremental-index|Ingestion using [OakMap open source library](https://github.com/yahoo/Oak) to store the keys and values outside the JVM heap.|[link](./extensions-contrib/oak-incremental-index.md)|
## Promoting community extensions to core extensions
diff --git a/extensions-contrib/oak-incremental-index/README.md b/extensions-contrib/oak-incremental-index/README.md
new file mode 100644
index 000000000000..f45da37d616b
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/README.md
@@ -0,0 +1,38 @@
+
+
+# Oak Incremental Index
+This extension improves the CPU and memory efficiency of Druid's ingestion.
+The same performance is achieved with 60% less memory and 50% less CPU time.
+Ingestion-throughput was nearly doubled with the same memory budget,
+and throughput was increased by 75% with the same CPU budget.
+Full details of the experimental setup and results are available [here](https://github.com/liran-funaro/druid/wiki/Evaluation).
+
+It uses [OakMap open source library](https://github.com/yahoo/Oak) to store the keys and values outside the JVM heap.
+
+# Documentation
+More information can be found in the [extension documentation](../../docs/development/extensions-contrib/oak-incremental-index.md).
+
+# Credits
+This module is a result of feedback and work done by following people.
+
+* https://github.com/liran-funaro
+* https://github.com/sanastas
+* https://github.com/ebortnik
+* https://github.com/eshcar
diff --git a/extensions-contrib/oak-incremental-index/pom.xml b/extensions-contrib/oak-incremental-index/pom.xml
new file mode 100644
index 000000000000..3247fb3b49fc
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/pom.xml
@@ -0,0 +1,123 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.druid.extensions.contrib
+ oak-incremental-index
+ oak-incremental-index
+ Druid incremental-index based on Oak lib https://github.com/yahoo/Oak
+
+
+ org.apache.druid
+ druid
+ 0.23.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ com.yahoo.oak
+ oak
+ 0.2.5
+
+
+
+ org.apache.druid
+ druid-core
+ ${project.parent.version}
+ provided
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ provided
+
+
+
+ com.google.code.findbugs
+ jsr305
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ com.google.inject
+ guice
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ provided
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.easymock
+ easymock
+ test
+
+
+ org.apache.druid
+ druid-core
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.apache.druid
+ druid-server
+ ${project.parent.version}
+ test
+ test-jar
+
+
+ org.apache.druid
+ druid-sql
+ ${project.parent.version}
+ test-jar
+ test
+
+
+
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndex.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndex.java
new file mode 100644
index 000000000000..797a613a2c0c
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndex.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterators;
+import com.yahoo.oak.OakBuffer;
+import com.yahoo.oak.OakMap;
+import com.yahoo.oak.OakMapBuilder;
+import com.yahoo.oak.OakScopedReadBuffer;
+import com.yahoo.oak.OakScopedWriteBuffer;
+import com.yahoo.oak.OakSerializer;
+import com.yahoo.oak.OakUnsafeDirectBuffer;
+import com.yahoo.oak.OakUnscopedBuffer;
+import org.apache.druid.annotations.EverythingIsNonnullByDefault;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.segment.incremental.AppendableIndexBuilder;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexRow;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+
+/**
+ * OakIncrementalIndex has two main attributes that are different from the other IncrementalIndex implementations:
+ * 1. It stores both **keys** and **values** off-heap (as opposed to the off-heap implementation that stores only
+ * the **values** off-heap).
+ * 2. It is based on OakMap (https://github.com/yahoo/Oak) instead of Java's ConcurrentSkipList.
+ * These two changes significantly reduce the number of heap-objects and thus decrease dramatically the GC's memory
+ * and performance overhead.
+ */
+@EverythingIsNonnullByDefault
+public class OakIncrementalIndex extends IncrementalIndex implements IncrementalIndex.FactsHolder
+{
+ private final BufferAggregator[] aggregators;
+ private final OakMap facts;
+
+ // Given a ByteBuffer and an offset inside the buffer, offset + aggOffsetInBuffer[i]
+ // would give a position in the buffer where the i^th aggregator's value is stored.
+ private final int[] aggregatorOffsetInBuffer;
+ private final int aggregatorsTotalSize;
+
+ private static final Logger log = new Logger(OakIncrementalIndex.class);
+
+ public OakIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema,
+ boolean deserializeComplexMetrics,
+ boolean concurrentEventAdd,
+ int maxRowCount,
+ long maxBytesInMemory,
+ long oakMaxMemoryCapacity,
+ int oakBlockSize,
+ int oakChunkMaxItems)
+ {
+ super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, maxRowCount, maxBytesInMemory, true);
+
+ AggregatorFactory[] metrics = getMetricAggs();
+ this.aggregators = new BufferAggregator[metrics.length];
+
+ this.aggregatorOffsetInBuffer = new int[metrics.length];
+
+ int curAggOffset = 0;
+ for (int i = 0; i < metrics.length; i++) {
+ aggregatorOffsetInBuffer[i] = curAggOffset;
+ curAggOffset += metrics[i].getMaxIntermediateSizeWithNulls();
+ }
+ this.aggregatorsTotalSize = curAggOffset;
+
+ final IncrementalIndexRow minRow = new IncrementalIndexRow(
+ incrementalIndexSchema.getMinTimestamp(),
+ OakIncrementalIndexRow.NO_DIMS,
+ dimensionDescsList,
+ IncrementalIndexRow.EMPTY_ROW_INDEX
+ );
+
+ this.facts = new OakMapBuilder<>(
+ new OakKey.Comparator(dimensionDescsList, isRollup()),
+ new OakKey.Serializer(dimensionDescsList, indexIncrement),
+ new OakValueSerializer(),
+ minRow
+ ).setPreferredBlockSize(oakBlockSize)
+ .setChunkMaxItems(oakChunkMaxItems)
+ .setMemoryCapacity(oakMaxMemoryCapacity)
+ .buildOrderedMap();
+ }
+
+ @Override
+ public FactsHolder getFacts()
+ {
+ return this;
+ }
+
+ @Override
+ public int size()
+ {
+ return facts.size();
+ }
+
+ @Override
+ public long getBytesInMemory()
+ {
+ return facts.memorySize();
+ }
+
+ @Override
+ public void close()
+ {
+ super.close();
+
+ for (BufferAggregator agg : aggregators) {
+ if (agg != null) {
+ agg.close();
+ }
+ }
+
+ clear();
+ }
+
+ @Override
+ protected AddToFactsResult addToFacts(InputRow row,
+ IncrementalIndexRow key,
+ ThreadLocal rowContainer,
+ Supplier rowSupplier,
+ boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
+ {
+ if (!skipMaxRowsInMemoryCheck) {
+ // We validate here as a sanity check that we did not exceed the row and memory limitations
+ // in previous insertions.
+ if (size() > maxRowCount || (maxBytesInMemory > 0 && getBytesInMemory() > maxBytesInMemory)) {
+ throw new IndexSizeExceededException(
+ "Maximum number of rows [%d out of %d] or max size in bytes [%d out of %d] reached",
+ size(), maxRowCount,
+ getBytesInMemory(), maxBytesInMemory
+ );
+ }
+ }
+
+ // In rollup mode, we let the key-serializer assign the row index.
+ // Upon lookup, the comparator ignores this special index value and only compares according to the key itself.
+ // The serializer is only called on insertion, so it will not increment the index if the key already exits.
+ // In plain mode, we force a new row index.
+ // Upon lookup, since there is no key with this index, a new key will be inserted every time.
+ key.setRowIndex(isRollup() ? OakKey.Serializer.ASSIGN_ROW_INDEX_IF_ABSENT : indexIncrement.getAndIncrement());
+
+ // This call is different from FactsHolder.putIfAbsent() because it also handles the aggregation
+ // in case the key already exits.
+ final OakInputRowContext ctx = new OakInputRowContext(rowContainer, row);
+ facts.zc().putIfAbsentComputeIfPresent(key, ctx, buffer -> aggregate(ctx, buffer));
+ return new AddToFactsResult(size(), getBytesInMemory(), ctx.parseExceptionMessages);
+ }
+
+ @Override
+ public int getLastRowIndex()
+ {
+ return indexIncrement.get() - 1;
+ }
+
+ private int getOffsetInBuffer(int aggIndex)
+ {
+ assert aggregatorOffsetInBuffer != null;
+ return aggregatorOffsetInBuffer[aggIndex];
+ }
+
+ @Override
+ protected float getMetricFloatValue(IncrementalIndexRow incrementalIndexRow, int aggIndex)
+ {
+ OakIncrementalIndexRow oakRow = (OakIncrementalIndexRow) incrementalIndexRow;
+ return aggregators[aggIndex].getFloat(oakRow.getAggregationsBuffer(), getOffsetInBuffer(aggIndex));
+ }
+
+ @Override
+ protected long getMetricLongValue(IncrementalIndexRow incrementalIndexRow, int aggIndex)
+ {
+ OakIncrementalIndexRow oakRow = (OakIncrementalIndexRow) incrementalIndexRow;
+ return aggregators[aggIndex].getLong(oakRow.getAggregationsBuffer(), getOffsetInBuffer(aggIndex));
+ }
+
+ @Override
+ protected Object getMetricObjectValue(IncrementalIndexRow incrementalIndexRow, int aggIndex)
+ {
+ OakIncrementalIndexRow oakRow = (OakIncrementalIndexRow) incrementalIndexRow;
+ return aggregators[aggIndex].get(oakRow.getAggregationsBuffer(), getOffsetInBuffer(aggIndex));
+ }
+
+ @Override
+ protected double getMetricDoubleValue(IncrementalIndexRow incrementalIndexRow, int aggIndex)
+ {
+ OakIncrementalIndexRow oakRow = (OakIncrementalIndexRow) incrementalIndexRow;
+ return aggregators[aggIndex].getDouble(oakRow.getAggregationsBuffer(), getOffsetInBuffer(aggIndex));
+ }
+
+ @Override
+ protected boolean isNull(IncrementalIndexRow incrementalIndexRow, int aggIndex)
+ {
+ OakIncrementalIndexRow oakRow = (OakIncrementalIndexRow) incrementalIndexRow;
+ return aggregators[aggIndex].isNull(oakRow.getAggregationsBuffer(), getOffsetInBuffer(aggIndex));
+ }
+
+ private OakIncrementalIndexRow rowFromMapEntry(Map.Entry entry)
+ {
+ return new OakIncrementalIndexRow(
+ entry.getKey(), dimensionDescsList, entry.getValue()
+ );
+ }
+
+ @Override
+ public Iterable iterableWithPostAggregations(
+ @Nullable final List postAggs,
+ final boolean descending
+ )
+ {
+ return () -> transformIterator(descending, entry -> {
+ OakIncrementalIndexRow row = rowFromMapEntry(entry);
+ return getMapBasedRowWithPostAggregations(
+ row,
+ IntStream.range(0, aggregators.length).mapToObj(
+ i -> getMetricObjectValue(row, i)
+ ),
+ postAggs
+ );
+ });
+ }
+
+ // Aggregator management: initialization and aggregation
+
+ public void initAggValue(OakInputRowContext ctx, ByteBuffer aggBuffer)
+ {
+ AggregatorFactory[] metrics = getMetricAggs();
+ assert selectors != null;
+
+ if (aggregators.length > 0 && aggregators[aggregators.length - 1] == null) {
+ synchronized (this) {
+ if (aggregators[aggregators.length - 1] == null) {
+ // note: creation of Aggregators is done lazily when at least one row from input is available
+ // so that FilteredAggregators could be initialized correctly.
+ ctx.setRow();
+ for (int i = 0; i < metrics.length; i++) {
+ final AggregatorFactory agg = metrics[i];
+ if (aggregators[i] == null) {
+ aggregators[i] = agg.factorizeBuffered(selectors.get(agg.getName()));
+ }
+ }
+ ctx.clearRow();
+ }
+ }
+ }
+
+ for (int i = 0; i < metrics.length; i++) {
+ aggregators[i].init(aggBuffer, getOffsetInBuffer(i));
+ }
+
+ aggregate(ctx, aggBuffer);
+ }
+
+ public void aggregate(OakInputRowContext ctx, OakBuffer buffer)
+ {
+ OakUnsafeDirectBuffer unsafeBuffer = (OakUnsafeDirectBuffer) buffer;
+ aggregate(ctx, unsafeBuffer.getByteBuffer());
+ }
+
+ public void aggregate(OakInputRowContext ctx, ByteBuffer aggBuffer)
+ {
+ ctx.setRow();
+
+ for (int i = 0; i < aggregators.length; i++) {
+ final BufferAggregator agg = aggregators[i];
+
+ try {
+ agg.aggregate(aggBuffer, getOffsetInBuffer(i));
+ }
+ catch (ParseException e) {
+ // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
+ log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
+ ctx.addException(e);
+ }
+ }
+
+ ctx.clearRow();
+ }
+
+ /**
+ * Responsible for the initialization of the aggregators of a new inserted row.
+ * It is activated when a new row is serialized before insertion to the facts map.
+ */
+ class OakValueSerializer implements OakSerializer
+ {
+ @Override
+ public void serialize(OakInputRowContext ctx, OakScopedWriteBuffer buffer)
+ {
+ OakUnsafeDirectBuffer unsafeBuffer = (OakUnsafeDirectBuffer) buffer;
+ initAggValue(ctx, unsafeBuffer.getByteBuffer());
+ }
+
+ @Override
+ public OakInputRowContext deserialize(OakScopedReadBuffer buffer)
+ {
+ // cannot deserialize without the IncrementalIndexRow
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int calculateSize(OakInputRowContext row)
+ {
+ return aggregatorsTotalSize;
+ }
+
+ @Override
+ public int calculateHash(OakInputRowContext oakInputRowContext)
+ {
+ // This method should not be called.
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // FactsHolder helper methods
+
+ public Iterator transformIterator(
+ boolean descending,
+ Function, Row> transformer
+ )
+ {
+ OakMap orderedFacts = descending ? facts.descendingMap() : facts;
+ return orderedFacts.zc().entrySet().stream().map(transformer).iterator();
+ }
+
+ /**
+ * Generate a new row object for each iterated item.
+ */
+ private Iterator transformNonStreamIterator(
+ Iterator> iterator)
+ {
+ return Iterators.transform(iterator, this::rowFromMapEntry);
+ }
+
+ /**
+ * Since the buffers in the stream iterators are reused, we don't need to create
+ * a new row object for each next() call.
+ * See {@code OakIncrementalIndexRow.reset()} for more information.
+ */
+ private Iterator transformStreamIterator(
+ Iterator> iterator)
+ {
+ final OakIncrementalIndexRow[] rowHolder = new OakIncrementalIndexRow[1];
+
+ return Iterators.transform(iterator, entry -> {
+ if (rowHolder[0] == null) {
+ rowHolder[0] = rowFromMapEntry(entry);
+ } else {
+ rowHolder[0].reset();
+ }
+ return rowHolder[0];
+ });
+ }
+
+ // FactsHolder interface implementation
+
+ @Override
+ public int getPriorIndex(IncrementalIndexRow key)
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMinTimeMillis()
+ {
+ return facts.firstKey().getTimestamp();
+ }
+
+ @Override
+ public long getMaxTimeMillis()
+ {
+ return facts.lastKey().getTimestamp();
+ }
+
+ @Override
+ public Iterator iterator(boolean descending)
+ {
+ // We should never get here because we override iterableWithPostAggregations
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd)
+ {
+ return () -> {
+ IncrementalIndexRow from = null;
+ IncrementalIndexRow to = null;
+ if (timeStart > getMinTimeMillis()) {
+ from = new IncrementalIndexRow(timeStart, OakIncrementalIndexRow.NO_DIMS, dimensionDescsList,
+ IncrementalIndexRow.EMPTY_ROW_INDEX);
+ }
+
+ if (timeEnd < getMaxTimeMillis()) {
+ to = new IncrementalIndexRow(timeEnd, OakIncrementalIndexRow.NO_DIMS, dimensionDescsList,
+ IncrementalIndexRow.EMPTY_ROW_INDEX);
+ }
+
+ OakMap subMap = facts.subMap(from, true, to, false, descending);
+ return transformStreamIterator(subMap.zc().entryStreamSet().iterator());
+ };
+ }
+
+ @Override
+ public Iterable keySet()
+ {
+ return () -> transformNonStreamIterator(facts.zc().entrySet().iterator());
+ }
+
+ @Override
+ public Iterable persistIterable()
+ {
+ return () -> transformStreamIterator(facts.zc().entryStreamSet().iterator());
+ }
+
+ @Override
+ public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+ facts.close();
+ }
+
+ /**
+ * OakIncrementalIndex builder
+ */
+ public static class Builder extends AppendableIndexBuilder
+ {
+ // OakMap stores its data outside the JVM heap in memory blocks. Larger blocks consolidate allocations and reduce
+ // overhead, but can also waste more memory if not fully utilized. The default has a reasonable balance between
+ // performance and memory usage when tested in a batch ingestion scenario.
+ public static final int DEFAULT_OAK_BLOCK_SIZE = 8 * (1 << 20);
+
+ // OakMap maintains its memory blocks with an internal data-structure. Structure size is roughly
+ // `oakMaxMemoryCapacity/oakBlockSize`. We set this number to a large enough yet reasonable value so that this
+ // structure does not consume too much memory.
+ public static final long DEFAULT_OAK_MAX_MEMORY_CAPACITY = 32L * (1L << 30);
+
+ // OakMap maintains its entries in small chunks. Using larger chunks reduces the number of on-heap objects, but may
+ // incur more overhead when balancing the entries between chunks. The default showed the best performance in batch
+ // ingestion scenarios.
+ public static final int DEFAULT_OAK_CHUNK_MAX_ITEMS = 256;
+
+ public long oakMaxMemoryCapacity = DEFAULT_OAK_MAX_MEMORY_CAPACITY;
+ public int oakBlockSize = DEFAULT_OAK_BLOCK_SIZE;
+ public int oakChunkMaxItems = DEFAULT_OAK_CHUNK_MAX_ITEMS;
+
+ public Builder setOakMaxMemoryCapacity(long oakMaxMemoryCapacity)
+ {
+ this.oakMaxMemoryCapacity = oakMaxMemoryCapacity;
+ return this;
+ }
+
+ public Builder setOakBlockSize(int oakBlockSize)
+ {
+ this.oakBlockSize = oakBlockSize;
+ return this;
+ }
+
+ public Builder setOakChunkMaxItems(int oakChunkMaxItems)
+ {
+ this.oakChunkMaxItems = oakChunkMaxItems;
+ return this;
+ }
+
+ @Override
+ protected OakIncrementalIndex buildInner()
+ {
+ return new OakIncrementalIndex(
+ Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ maxRowCount,
+ maxBytesInMemory,
+ oakMaxMemoryCapacity,
+ oakBlockSize,
+ oakChunkMaxItems
+ );
+ }
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexModule.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexModule.java
new file mode 100644
index 000000000000..e1e1e0e27a2a
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexModule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class OakIncrementalIndexModule implements DruidModule
+{
+ public static final Module JACKSON_MODULE = new SimpleModule("OakIncrementalIndexModule")
+ .registerSubtypes(
+ new NamedType(OakIncrementalIndexSpec.class, OakIncrementalIndexSpec.TYPE)
+ );
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(JACKSON_MODULE);
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexRow.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexRow.java
new file mode 100644
index 000000000000..67615740ed35
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexRow.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import com.yahoo.oak.OakUnsafeDirectBuffer;
+import com.yahoo.oak.OakUnscopedBuffer;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.incremental.IncrementalIndex.DimensionDesc;
+import org.apache.druid.segment.incremental.IncrementalIndexRow;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class OakIncrementalIndexRow extends IncrementalIndexRow
+{
+ public static final Object[] NO_DIMS = new Object[]{};
+
+ private final OakUnsafeDirectBuffer oakDimensions;
+ private long dimensions;
+ private final OakUnsafeDirectBuffer oakAggregations;
+ @Nullable
+ private ByteBuffer aggregationsBuffer;
+ private int dimsLength;
+
+ public OakIncrementalIndexRow(OakUnscopedBuffer dimensions,
+ List dimensionDescsList,
+ OakUnscopedBuffer aggregations)
+ {
+ super(0, NO_DIMS, dimensionDescsList, IncrementalIndexRow.EMPTY_ROW_INDEX);
+ this.oakDimensions = (OakUnsafeDirectBuffer) dimensions;
+ this.oakAggregations = (OakUnsafeDirectBuffer) aggregations;
+ this.dimensions = oakDimensions.getAddress();
+ this.aggregationsBuffer = null;
+ this.dimsLength = -1; // lazy initialization
+ }
+
+ /**
+ * The key/value of the row is received as OakUnscopedBuffer.
+ * When iterating through the index items, we use Oak's stream iterators.
+ * In such iterators, the key/value OakUnscopedBuffer objects are reused for each next() call to avoid
+ * redundant object instantiation.
+ * So whenever we iterate through the index, we don't have to recreate the OakIncrementalIndexRow
+ * object. We can just reset it.
+ */
+ public void reset()
+ {
+ dimsLength = -1;
+ dimensions = oakDimensions.getAddress();
+ aggregationsBuffer = null;
+ }
+
+ public ByteBuffer getAggregationsBuffer()
+ {
+ // Read buffer only once
+ if (aggregationsBuffer == null) {
+ aggregationsBuffer = oakAggregations.getByteBuffer();
+ }
+ return aggregationsBuffer;
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return OakKey.getTimestamp(dimensions);
+ }
+
+ @Override
+ @Nullable
+ public Object getDim(int dimIndex)
+ {
+ if (isDimOutOfBounds(dimIndex)) {
+ return null;
+ }
+ return OakKey.getDim(dimensions, dimIndex);
+ }
+
+ @Override
+ public int getDimsLength()
+ {
+ // Read length only once
+ if (dimsLength < 0) {
+ dimsLength = OakKey.getDimsLength(dimensions);
+ }
+ return dimsLength;
+ }
+
+ /**
+ * Allows faster null validation because it does not need to deserialize the key.
+ */
+ @Override
+ public boolean isDimNull(int dimIndex)
+ {
+ return isDimOutOfBounds(dimIndex) || OakKey.isDimNull(dimensions, dimIndex);
+ }
+
+ /**
+ * Allows faster access to a IndexedInts dimension because it uses lazy evaluation (no need for deserialization).
+ */
+ @Override
+ @Nullable
+ public IndexedInts getIndexedDim(final int dimIndex, @Nullable IndexedInts cachedIndexedInts)
+ {
+ if (isDimNull(dimIndex)) {
+ return null;
+ }
+
+ OakKey.IndexedDim indexedInts;
+
+ if (!(cachedIndexedInts instanceof OakKey.IndexedDim)) {
+ indexedInts = new OakKey.IndexedDim(dimensions, dimIndex);
+ } else {
+ indexedInts = (OakKey.IndexedDim) cachedIndexedInts;
+ indexedInts.setValues(dimensions, dimIndex);
+ }
+
+ return indexedInts;
+ }
+
+ @Override
+ public int getRowIndex()
+ {
+ return OakKey.getRowIndex(dimensions);
+ }
+
+ @Override
+ public void setRowIndex(int rowIndex)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isDimOutOfBounds(int dimIndex)
+ {
+ return dimIndex < 0 || dimIndex >= getDimsLength();
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexSpec.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexSpec.java
new file mode 100644
index 000000000000..b7dc7224fbd6
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakIncrementalIndexSpec.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Oak incremental index spec (describes the in-memory indexing method for data ingestion).
+ */
+public class OakIncrementalIndexSpec implements AppendableIndexSpec
+{
+ public static final String TYPE = "oak";
+
+ final long oakMaxMemoryCapacity;
+ final int oakBlockSize;
+ final int oakChunkMaxItems;
+
+ @JsonCreator
+ public OakIncrementalIndexSpec(
+ final @JsonProperty("oakMaxMemoryCapacity") @Nullable Long oakMaxMemoryCapacity,
+ final @JsonProperty("oakBlockSize") @Nullable Integer oakBlockSize,
+ final @JsonProperty("oakChunkMaxItems") @Nullable Integer oakChunkMaxItems
+ )
+ {
+ this.oakMaxMemoryCapacity = oakMaxMemoryCapacity != null && oakMaxMemoryCapacity > 0 ? oakMaxMemoryCapacity :
+ OakIncrementalIndex.Builder.DEFAULT_OAK_MAX_MEMORY_CAPACITY;
+ this.oakBlockSize = oakBlockSize != null && oakBlockSize > 0 ? oakBlockSize :
+ OakIncrementalIndex.Builder.DEFAULT_OAK_BLOCK_SIZE;
+ this.oakChunkMaxItems = oakChunkMaxItems != null && oakChunkMaxItems > 0 ? oakChunkMaxItems :
+ OakIncrementalIndex.Builder.DEFAULT_OAK_CHUNK_MAX_ITEMS;
+ }
+
+ @JsonProperty
+ public long getOakMaxMemoryCapacity()
+ {
+ return oakMaxMemoryCapacity;
+ }
+
+ @JsonProperty
+ public int getOakBlockSize()
+ {
+ return oakBlockSize;
+ }
+
+ @JsonProperty
+ public int getOakChunkMaxItems()
+ {
+ return oakChunkMaxItems;
+ }
+
+ @Nonnull
+ @Override
+ public OakIncrementalIndex.Builder builder()
+ {
+ return new OakIncrementalIndex.Builder()
+ .setOakMaxMemoryCapacity(oakMaxMemoryCapacity)
+ .setOakBlockSize(oakBlockSize)
+ .setOakChunkMaxItems(oakChunkMaxItems);
+ }
+
+ @Override
+ public long getDefaultMaxBytesInMemory()
+ {
+ // Since Oak allocates its keys/values directly, it is not subject to the JVM's on/off-heap limitations.
+ // Despite this, we have to respect runtime resource limits if they aren't specified by the ingestion specs.
+ // To ensure that the Oak index doesn't use more resources than available, we use the same default
+ // `maxBytesInMemory` as the on-heap index, i.e., 1/6 of the maximal heap size.
+ // It assumes that the middle-manager is configured correctly according to the machine's resources.
+ return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakInputRowContext.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakInputRowContext.java
new file mode 100644
index 000000000000..b62b8b732438
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakInputRowContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class acts as a wrapper to bundle InputRow together with its row container.
+ * It also collects the parse exceptions along the way.
+ */
+class OakInputRowContext
+{
+ final ThreadLocal rowContainer;
+ final InputRow row;
+ final List parseExceptionMessages = new ArrayList<>();
+
+ OakInputRowContext(ThreadLocal rowContainer, Row row)
+ {
+ this.rowContainer = rowContainer;
+ this.row = (InputRow) row;
+ }
+
+ void setRow()
+ {
+ rowContainer.set(row);
+ }
+
+ void clearRow()
+ {
+ rowContainer.set(null);
+ }
+
+ void addException(ParseException e)
+ {
+ parseExceptionMessages.add(e.getMessage());
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakKey.java b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakKey.java
new file mode 100644
index 000000000000..22d7a8302e9a
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/java/org/apache/druid/segment/incremental/oak/OakKey.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.yahoo.oak.DirectUtils;
+import com.yahoo.oak.OakBuffer;
+import com.yahoo.oak.OakComparator;
+import com.yahoo.oak.OakScopedReadBuffer;
+import com.yahoo.oak.OakScopedWriteBuffer;
+import com.yahoo.oak.OakSerializer;
+import com.yahoo.oak.OakUnsafeDirectBuffer;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.DimensionIndexer;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexRow;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+/**
+ * Responsible for the serialization, deserialization, and comparison of keys.
+ *
+ * It stores the key in an off-heap buffer in the following structure:
+ * - Global metadata (buffer offset + _):
+ * +0: timestamp (long)
+ * +8: dims length (int)
+ * +12: row index (int)
+ * (total of 16 bytes)
+ * - Followed by the dimensions one after the other (buffer offset + 16 + dimIdx*12 + _):
+ * +0: value type (int)
+ * +4: data (int/long/float/double)
+ * (12 bytes per dimension)
+ * Note: For string dimension (int array), the data includes:
+ * +4: the offset of the int array in the buffer (int)
+ * +8: the length of the int array (int)
+ * - The string dimensions arrays are stored after all the dims' data (buffer offset + 16 + dimsLength*12 + _).
+ *
+ * Note: the specified offsets are true in most cases, but other JVM implementations may have
+ * different offsets, depending on the size of the primitives in bytes.
+ * The offset calculation below is robust to JVM implementation changes.
+ */
+public final class OakKey
+{
+ // The off-heap buffer offsets (buffer offset + _)
+ static final int TIME_STAMP_OFFSET = 0;
+ static final int DIMS_LENGTH_OFFSET = TIME_STAMP_OFFSET + Long.BYTES;
+ static final int ROW_INDEX_OFFSET = DIMS_LENGTH_OFFSET + Integer.BYTES;
+ static final int DIMS_OFFSET = ROW_INDEX_OFFSET + Integer.BYTES;
+ static final int DIM_VALUE_TYPE_OFFSET = 0;
+ static final int DIM_DATA_OFFSET = DIM_VALUE_TYPE_OFFSET + Integer.BYTES;
+ static final int STRING_DIM_ARRAY_POS_OFFSET = DIM_DATA_OFFSET;
+ static final int STRING_DIM_ARRAY_LENGTH_OFFSET = STRING_DIM_ARRAY_POS_OFFSET + Integer.BYTES;
+ static final int SIZE_PER_DIM = DIM_DATA_OFFSET + Collections.max(Arrays.asList(
+ // Common data types
+ Integer.BYTES,
+ Long.BYTES,
+ Float.BYTES,
+ Double.BYTES,
+ // String dimension data type
+ Integer.BYTES * 2
+ ));
+
+ // Dimension types
+ static final ValueType[] VALUE_ORDINAL_TYPES = ValueType.values();
+ // Marks a null dimension
+ static final int NULL_DIM = -1;
+
+ private OakKey()
+ {
+ }
+
+ static long getKeyAddress(OakBuffer buffer)
+ {
+ return ((OakUnsafeDirectBuffer) buffer).getAddress();
+ }
+
+ static long getTimestamp(long address)
+ {
+ return DirectUtils.getLong(address + TIME_STAMP_OFFSET);
+ }
+
+ static int getRowIndex(long address)
+ {
+ return DirectUtils.getInt(address + ROW_INDEX_OFFSET);
+ }
+
+ static int getDimsLength(long address)
+ {
+ return DirectUtils.getInt(address + DIMS_LENGTH_OFFSET);
+ }
+
+ static int getDimOffsetInBuffer(int dimIndex)
+ {
+ return DIMS_OFFSET + (dimIndex * SIZE_PER_DIM);
+ }
+
+ static boolean isValueTypeNull(int dimValueTypeID)
+ {
+ return dimValueTypeID < 0 || dimValueTypeID >= VALUE_ORDINAL_TYPES.length;
+ }
+
+ static boolean isDimNull(long address, int dimIndex)
+ {
+ long dimAddress = address + getDimOffsetInBuffer(dimIndex);
+ return isValueTypeNull(DirectUtils.getInt(dimAddress + DIM_VALUE_TYPE_OFFSET));
+ }
+
+ @Nullable
+ static Object getDim(long address, int dimIndex)
+ {
+ long dimAddress = address + getDimOffsetInBuffer(dimIndex);
+ int dimValueTypeID = DirectUtils.getInt(dimAddress + DIM_VALUE_TYPE_OFFSET);
+
+ if (isValueTypeNull(dimValueTypeID)) {
+ return null;
+ }
+
+ switch (VALUE_ORDINAL_TYPES[dimValueTypeID]) {
+ case DOUBLE:
+ return DirectUtils.getDouble(dimAddress + DIM_DATA_OFFSET);
+ case FLOAT:
+ return DirectUtils.getFloat(dimAddress + DIM_DATA_OFFSET);
+ case LONG:
+ return DirectUtils.getLong(dimAddress + DIM_DATA_OFFSET);
+ case STRING:
+ int arrayPos = DirectUtils.getInt(dimAddress + STRING_DIM_ARRAY_POS_OFFSET);
+ int arraySize = DirectUtils.getInt(dimAddress + STRING_DIM_ARRAY_LENGTH_OFFSET);
+ int[] array = new int[arraySize];
+ DirectUtils.copyToArray(address + arrayPos, array, arraySize);
+ return array;
+ default:
+ return null;
+ }
+ }
+
+ static Object[] getAllDims(long address)
+ {
+ int dimsLength = getDimsLength(address);
+ return IntStream.range(0, dimsLength).mapToObj(dimIndex -> getDim(address, dimIndex)).toArray();
+ }
+
+ /**
+ * A lazy-evaluation version of an indexed ints dimension, used by
+ * {@link OakIncrementalIndexRow#getIndexedDim(int, IndexedInts)}.
+ * As opposed to the integers array version that is returned by
+ * {@link IncrementalIndexRow#getIndexedDim(int, IndexedInts)}.
+ */
+ public static class IndexedDim implements IndexedInts
+ {
+ long dimensionsAddress;
+ int dimIndex;
+ int arraySize;
+ long arrayAddress;
+
+ public IndexedDim(long dimensionsAddress, int dimIndex)
+ {
+ setValues(dimensionsAddress, dimIndex);
+ }
+
+ public void setValues(long dimensionsAddress, int dimIndex)
+ {
+ if (this.dimensionsAddress != dimensionsAddress || this.dimIndex != dimIndex) {
+ this.dimensionsAddress = dimensionsAddress;
+ this.dimIndex = dimIndex;
+ long dimAddress = this.dimensionsAddress + getDimOffsetInBuffer(dimIndex);
+ arrayAddress = dimensionsAddress + DirectUtils.getInt(dimAddress + STRING_DIM_ARRAY_POS_OFFSET);
+ arraySize = DirectUtils.getInt(dimAddress + STRING_DIM_ARRAY_LENGTH_OFFSET);
+ }
+ }
+
+ @Override
+ public int size()
+ {
+ return arraySize;
+ }
+
+ @Override
+ public int get(int index)
+ {
+ return DirectUtils.getInt(arrayAddress + ((long) index) * Integer.BYTES);
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ // nothing to inspect
+ }
+ }
+
+ public static class Serializer implements OakSerializer
+ {
+ public static final int ASSIGN_ROW_INDEX_IF_ABSENT = Integer.MIN_VALUE;
+
+ private final List dimensionDescsList;
+ private final AtomicInteger rowIndexGenerator;
+
+ public Serializer(List dimensionDescsList, AtomicInteger rowIndexGenerator)
+ {
+ this.dimensionDescsList = dimensionDescsList;
+ this.rowIndexGenerator = rowIndexGenerator;
+ }
+
+ @Nullable
+ private ValueType getDimValueType(int dimIndex)
+ {
+ IncrementalIndex.DimensionDesc dimensionDesc = dimensionDescsList.get(dimIndex);
+ if (dimensionDesc == null) {
+ return null;
+ }
+ ColumnCapabilities capabilities = dimensionDesc.getCapabilities();
+ return capabilities.getType();
+ }
+
+ @Override
+ public void serialize(IncrementalIndexRow incrementalIndexRow, OakScopedWriteBuffer buffer)
+ {
+ long address = getKeyAddress(buffer);
+
+ long timestamp = incrementalIndexRow.getTimestamp();
+ int dimsLength = incrementalIndexRow.getDimsLength();
+ int rowIndex = incrementalIndexRow.getRowIndex();
+ if (rowIndex == ASSIGN_ROW_INDEX_IF_ABSENT) {
+ rowIndex = rowIndexGenerator.getAndIncrement();
+ incrementalIndexRow.setRowIndex(rowIndex);
+ }
+ DirectUtils.putLong(address + TIME_STAMP_OFFSET, timestamp);
+ DirectUtils.putInt(address + DIMS_LENGTH_OFFSET, dimsLength);
+ DirectUtils.putInt(address + ROW_INDEX_OFFSET, rowIndex);
+
+ long dimsAddress = address + DIMS_OFFSET;
+ // the index for writing the int arrays of the string-dim (after all the dims' data)
+ int stringDimArraysPos = getDimOffsetInBuffer(dimsLength);
+
+ for (int dimIndex = 0; dimIndex < dimsLength; dimIndex++) {
+ Object dim = incrementalIndexRow.getDim(dimIndex);
+ ValueType dimValueType = dim != null ? getDimValueType(dimIndex) : null;
+ boolean isDimHaveValue = dimValueType != null;
+
+ int dimValueTypeID = isDimHaveValue ? dimValueType.ordinal() : NULL_DIM;
+ DirectUtils.putInt(dimsAddress + DIM_VALUE_TYPE_OFFSET, dimValueTypeID);
+
+ if (isDimHaveValue) {
+ switch (dimValueType) {
+ case FLOAT:
+ DirectUtils.putFloat(dimsAddress + DIM_DATA_OFFSET, (Float) dim);
+ break;
+ case DOUBLE:
+ DirectUtils.putDouble(dimsAddress + DIM_DATA_OFFSET, (Double) dim);
+ break;
+ case LONG:
+ DirectUtils.putLong(dimsAddress + DIM_DATA_OFFSET, (Long) dim);
+ break;
+ case STRING:
+ int[] arr = (int[]) dim;
+ int length = arr.length;
+ DirectUtils.putInt(dimsAddress + STRING_DIM_ARRAY_POS_OFFSET, stringDimArraysPos);
+ DirectUtils.putInt(dimsAddress + STRING_DIM_ARRAY_LENGTH_OFFSET, length);
+ long lengthBytes = DirectUtils.copyFromArray(arr, address + stringDimArraysPos, length);
+ stringDimArraysPos += (int) lengthBytes;
+ break;
+ }
+ }
+
+ dimsAddress += SIZE_PER_DIM;
+ }
+ }
+
+ @Override
+ public IncrementalIndexRow deserialize(OakScopedReadBuffer buffer)
+ {
+ long address = getKeyAddress(buffer);
+ return new IncrementalIndexRow(
+ getTimestamp(address),
+ getAllDims(address),
+ dimensionDescsList,
+ getRowIndex(address)
+ );
+ }
+
+ @Override
+ public int calculateSize(IncrementalIndexRow incrementalIndexRow)
+ {
+ int dimsLength = incrementalIndexRow.getDimsLength();
+ int sizeInBytes = getDimOffsetInBuffer(dimsLength);
+
+ // When the dimensionDesc's capabilities are of type ValueType.STRING,
+ // the object in timeAndDims.dims is of type int[].
+ // In this case, we need to know the array size before allocating the ByteBuffer.
+ for (int i = 0; i < dimsLength; i++) {
+ if (getDimValueType(i) != ValueType.STRING) {
+ continue;
+ }
+
+ Object dim = incrementalIndexRow.getDim(i);
+ if (dim != null) {
+ sizeInBytes += Integer.BYTES * ((int[]) dim).length;
+ }
+ }
+
+ return sizeInBytes;
+ }
+
+ @Override
+ public int calculateHash(IncrementalIndexRow incrementalIndexRow)
+ {
+ // This method should not be called.
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class Comparator implements OakComparator
+ {
+ private final List dimensionDescs;
+ private final boolean rollup;
+
+ public Comparator(List dimensionDescs, boolean rollup)
+ {
+ this.dimensionDescs = dimensionDescs;
+ this.rollup = rollup;
+ }
+
+ @Override
+ public int compareKeys(IncrementalIndexRow lhs, IncrementalIndexRow rhs)
+ {
+ int retVal = Longs.compare(lhs.getTimestamp(), rhs.getTimestamp());
+ if (retVal != 0) {
+ return retVal;
+ }
+
+ int lhsDimsLength = lhs.getDimsLength();
+ int rhsDimsLength = rhs.getDimsLength();
+ int numComparisons = Math.min(lhsDimsLength, rhsDimsLength);
+
+ int index = 0;
+ while (retVal == 0 && index < numComparisons) {
+ final Object lhsIdxs = lhs.getDim(index);
+ final Object rhsIdxs = rhs.getDim(index);
+
+ if (lhsIdxs == null) {
+ if (rhsIdxs == null) {
+ ++index;
+ continue;
+ }
+ return -1;
+ }
+
+ if (rhsIdxs == null) {
+ return 1;
+ }
+
+ final DimensionIndexer indexer = dimensionDescs.get(index).getIndexer();
+ retVal = indexer.compareUnsortedEncodedKeyComponents(lhsIdxs, rhsIdxs);
+ ++index;
+ }
+
+ if (retVal == 0) {
+ int lengthDiff = Ints.compare(lhsDimsLength, rhsDimsLength);
+ if (lengthDiff != 0) {
+ IncrementalIndexRow largerRow = lengthDiff > 0 ? lhs : rhs;
+ retVal = allNull(largerRow, numComparisons) ? 0 : lengthDiff;
+ }
+ }
+
+ return retVal == 0 ? rowIndexCompare(lhs.getRowIndex(), rhs.getRowIndex()) : retVal;
+ }
+
+ @Override
+ public int compareSerializedKeys(OakScopedReadBuffer lhsBuffer, OakScopedReadBuffer rhsBuffer)
+ {
+ long lhs = getKeyAddress(lhsBuffer);
+ long rhs = getKeyAddress(rhsBuffer);
+
+ int retVal = Longs.compare(getTimestamp(lhs), getTimestamp(rhs));
+ if (retVal != 0) {
+ return retVal;
+ }
+
+ int lhsDimsLength = getDimsLength(lhs);
+ int rhsDimsLength = getDimsLength(rhs);
+ int numComparisons = Math.min(lhsDimsLength, rhsDimsLength);
+
+ int index = 0;
+ while (retVal == 0 && index < numComparisons) {
+ final Object lhsIdxs = getDim(lhs, index);
+ final Object rhsIdxs = getDim(rhs, index);
+
+ if (lhsIdxs == null) {
+ if (rhsIdxs == null) {
+ ++index;
+ continue;
+ }
+ return -1;
+ }
+
+ if (rhsIdxs == null) {
+ return 1;
+ }
+
+ final DimensionIndexer indexer = dimensionDescs.get(index).getIndexer();
+ retVal = indexer.compareUnsortedEncodedKeyComponents(lhsIdxs, rhsIdxs);
+ ++index;
+ }
+
+ if (retVal == 0) {
+ int lengthDiff = Ints.compare(lhsDimsLength, rhsDimsLength);
+ if (lengthDiff != 0) {
+ long largerRowAddress = lengthDiff > 0 ? lhs : rhs;
+ retVal = allNull(largerRowAddress, numComparisons) ? 0 : lengthDiff;
+ }
+ }
+
+ return retVal == 0 ? rowIndexCompare(getRowIndex(lhs), getRowIndex(rhs)) : retVal;
+ }
+
+ @Override
+ public int compareKeyAndSerializedKey(IncrementalIndexRow lhs, OakScopedReadBuffer rhsBuffer)
+ {
+ long rhs = getKeyAddress(rhsBuffer);
+
+ int retVal = Longs.compare(lhs.getTimestamp(), getTimestamp(rhs));
+ if (retVal != 0) {
+ return retVal;
+ }
+
+ int lhsDimsLength = lhs.getDimsLength();
+ int rhsDimsLength = getDimsLength(rhs);
+ int numComparisons = Math.min(lhsDimsLength, rhsDimsLength);
+
+ int index = 0;
+ while (retVal == 0 && index < numComparisons) {
+ final Object lhsIdxs = lhs.getDim(index);
+ final Object rhsIdxs = getDim(rhs, index);
+
+ if (lhsIdxs == null) {
+ if (rhsIdxs == null) {
+ ++index;
+ continue;
+ }
+ return -1;
+ }
+
+ if (rhsIdxs == null) {
+ return 1;
+ }
+
+ final DimensionIndexer indexer = dimensionDescs.get(index).getIndexer();
+ retVal = indexer.compareUnsortedEncodedKeyComponents(lhsIdxs, rhsIdxs);
+ ++index;
+ }
+
+ if (retVal == 0) {
+ int lengthDiff = Ints.compare(lhsDimsLength, rhsDimsLength);
+ if (lengthDiff != 0) {
+ boolean isAllNull = lengthDiff > 0 ? allNull(lhs, numComparisons) : allNull(rhs, numComparisons);
+ retVal = isAllNull ? 0 : lengthDiff;
+ }
+ }
+
+ return retVal == 0 ? rowIndexCompare(lhs.getRowIndex(), getRowIndex(rhs)) : retVal;
+ }
+
+ private int rowIndexCompare(int lsIndex, int rsIndex)
+ {
+ if (!rollup || lsIndex == IncrementalIndexRow.EMPTY_ROW_INDEX || rsIndex == IncrementalIndexRow.EMPTY_ROW_INDEX) {
+ // If we are not in a rollup mode (plain mode), then keys should never be equal.
+ // In addition, if one of the keys has no index row (EMPTY_ROW_INDEX) it means it is a lower or upper bound key,
+ // so we must compared it.
+ return Integer.compare(lsIndex, rsIndex);
+ } else {
+ return 0;
+ }
+ }
+
+ private static boolean allNull(IncrementalIndexRow row, int startPosition)
+ {
+ int dimLength = row.getDimsLength();
+ for (int i = startPosition; i < dimLength; i++) {
+ if (!row.isDimNull(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean allNull(long rowAddress, int startPosition)
+ {
+ int dimLength = getDimsLength(rowAddress);
+ for (int i = startPosition; i < dimLength; i++) {
+ if (!isDimNull(rowAddress, i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/oak-incremental-index/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 000000000000..fe90ebe7c969
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.segment.incremental.oak.OakIncrementalIndexModule
diff --git a/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/OakDummyInitTest.java b/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/OakDummyInitTest.java
new file mode 100644
index 000000000000..3d5a738e188d
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/OakDummyInitTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * This test class is a hack to initialize IncrementalIndexCreator to include OakIncrementalIndexSpec.
+ * It is needed because all @Parameterized.Parameters methods are called before any other tests' code segments.
+ * Even before @BeforeClass annotated methods.
+ */
+@RunWith(Parameterized.class)
+public class OakDummyInitTest
+{
+ @Parameterized.Parameters
+ public static Collection> constructorFeeder()
+ {
+ // Add Oak to the available incremental indexes
+ IncrementalIndexCreator.addIndexSpec(OakIncrementalIndexSpec.class, "oak");
+ return ImmutableList.of();
+ }
+}
diff --git a/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/TestSuite.java b/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/TestSuite.java
new file mode 100644
index 000000000000..db40688aa9fb
--- /dev/null
+++ b/extensions-contrib/oak-incremental-index/src/test/java/org/apache/druid/segment/incremental/oak/TestSuite.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental.oak;
+
+import org.apache.druid.segment.data.IncrementalIndexTest;
+import org.apache.druid.segment.incremental.IncrementalIndexAdapterTest;
+import org.apache.druid.segment.incremental.IncrementalIndexIngestionTest;
+import org.apache.druid.segment.incremental.IncrementalIndexMultiValueSpecTest;
+import org.apache.druid.segment.incremental.IncrementalIndexRowCompTest;
+import org.apache.druid.segment.incremental.IncrementalIndexRowSizeTest;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapterTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Runs all relavent tests of the incremental index on all available implementations (including Oak).
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ OakDummyInitTest.class,
+ IncrementalIndexTest.class,
+ org.apache.druid.segment.incremental.IncrementalIndexTest.class,
+ IncrementalIndexStorageAdapterTest.class,
+ IncrementalIndexRowSizeTest.class,
+ IncrementalIndexRowCompTest.class,
+ IncrementalIndexMultiValueSpecTest.class,
+ IncrementalIndexIngestionTest.class,
+ IncrementalIndexAdapterTest.class,
+})
+public class TestSuite
+{
+}
diff --git a/licenses.yaml b/licenses.yaml
index abf4200a432b..08f9a27e94b5 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -5027,6 +5027,16 @@ libraries:
---
+name: oak
+license_category: binary
+version: 0.2.5
+module: extensions/oak-incremental-index
+license_name: Apache License version 2.0
+libraries:
+ - com.yahoo.oak: oak
+
+---
+
# Web console modules start
name: "@babel/code-frame"
diff --git a/pom.xml b/pom.xml
index 425de34be981..f2a23a44b502 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@
extensions-contrib/aliyun-oss-extensionsextensions-contrib/prometheus-emitterextensions-contrib/opentelemetry-emitter
+ extensions-contrib/oak-incremental-indexdistribution
diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java
index 53207b51d1f7..173469f2c809 100644
--- a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java
@@ -125,20 +125,19 @@ class IndexerDoubleColumnSelector implements DoubleColumnSelector
@Override
public boolean isNull()
{
- final Object[] dims = currEntry.get().getDims();
- return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null);
+ return hasNulls && currEntry.get().isDimNull(dimIndex);
}
@Override
public double getDouble()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
assert NullHandling.replaceWithDefault();
return 0.0;
}
- return (Double) dims[dimIndex];
+ return (Double) dim;
}
@SuppressWarnings("deprecation")
@@ -146,12 +145,12 @@ public double getDouble()
@Override
public Double getObject()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
return NullHandling.defaultDoubleValue();
}
- return (Double) dims[dimIndex];
+ return (Double) dim;
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java
index 2da428f28734..0b44f65b4144 100644
--- a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java
@@ -126,21 +126,20 @@ class IndexerFloatColumnSelector implements FloatColumnSelector
@Override
public boolean isNull()
{
- final Object[] dims = currEntry.get().getDims();
- return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null);
+ return hasNulls && currEntry.get().isDimNull(dimIndex);
}
@Override
public float getFloat()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
assert NullHandling.replaceWithDefault();
return 0.0f;
}
- return (Float) dims[dimIndex];
+ return (Float) dim;
}
@SuppressWarnings("deprecation")
@@ -148,13 +147,13 @@ public float getFloat()
@Override
public Float getObject()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
return NullHandling.defaultFloatValue();
}
- return (Float) dims[dimIndex];
+ return (Float) dim;
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java
index 48960767dab3..c1073d7f1ca0 100644
--- a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java
@@ -126,21 +126,20 @@ class IndexerLongColumnSelector implements LongColumnSelector
@Override
public boolean isNull()
{
- final Object[] dims = currEntry.get().getDims();
- return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null);
+ return hasNulls && currEntry.get().isDimNull(dimIndex);
}
@Override
public long getLong()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
assert NullHandling.replaceWithDefault();
return 0;
}
- return (Long) dims[dimIndex];
+ return (Long) dim;
}
@SuppressWarnings("deprecation")
@@ -148,13 +147,13 @@ public long getLong()
@Override
public Long getObject()
{
- final Object[] dims = currEntry.get().getDims();
+ final Object dim = currEntry.get().getDim(dimIndex);
- if (dimIndex >= dims.length || dims[dimIndex] == null) {
+ if (dim == null) {
return NullHandling.defaultLongValue();
}
- return (Long) dims[dimIndex];
+ return (Long) dim;
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index 589ad84fee32..ea94cdbf80f4 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -282,57 +282,62 @@ public DimensionSelector makeDimensionSelector(
class IndexerDimensionSelector implements DimensionSelector, IdLookup
{
- private final ArrayBasedIndexedInts indexedInts = new ArrayBasedIndexedInts();
+ private final ArrayBasedIndexedInts defaultIndexedInts = new ArrayBasedIndexedInts();
+
+ @Nullable
+ @MonotonicNonNull
+ private IndexedInts cachedIndexedInts;
@Nullable
@MonotonicNonNull
private int[] nullIdIntArray;
- @Override
- public IndexedInts getRow()
+ /**
+ * Tries to fetch the dimention as an IndexedInts.
+ * If the dim is null or with zero length, the value is considered null.
+ * It may be null or empty due to currEntry's rowIndex being smaller than the row's rowIndex in which this
+ * dim first appears.
+ *
+ * @return IndexedInts instance, or null if the dim is null.
+ */
+ @Nullable
+ private IndexedInts getRowOrNull()
{
- final Object[] dims = currEntry.get().getDims();
-
- int[] indices;
- if (dimIndex < dims.length) {
- indices = (int[]) dims[dimIndex];
- } else {
- indices = null;
+ IndexedInts ret = currEntry.get().getIndexedDim(dimIndex, cachedIndexedInts);
+ if (ret != null) {
+ cachedIndexedInts = ret;
+ return ret.size() > 0 ? ret : null;
}
+ return null;
+ }
- int[] row = null;
- int rowSize = 0;
-
- // usually due to currEntry's rowIndex is smaller than the row's rowIndex in which this dim first appears
- if (indices == null || indices.length == 0) {
- if (hasMultipleValues) {
- row = IntArrays.EMPTY_ARRAY;
- rowSize = 0;
- } else {
- final int nullId = getEncodedValue(null, false);
- if (nullId >= 0 && nullId < maxId) {
- // null was added to the dictionary before this selector was created; return its ID.
- if (nullIdIntArray == null) {
- nullIdIntArray = new int[]{nullId};
- }
- row = nullIdIntArray;
- rowSize = 1;
- } else {
- // null doesn't exist in the dictionary; return an empty array.
- // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism
- row = IntArrays.EMPTY_ARRAY;
- rowSize = 0;
+ private IndexedInts getDefaultIndexedInts()
+ {
+ if (hasMultipleValues) {
+ defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0);
+ } else {
+ final int nullId = getEncodedValue(null, false);
+ if (nullId >= 0 && nullId < maxId) {
+ // null was added to the dictionary before this selector was created; return its ID.
+ if (nullIdIntArray == null) {
+ nullIdIntArray = new int[]{nullId};
}
+ defaultIndexedInts.setValues(nullIdIntArray, 1);
+ } else {
+ // null doesn't exist in the dictionary; return an empty array.
+ // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism
+ defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0);
}
}
- if (row == null && indices != null && indices.length > 0) {
- row = indices;
- rowSize = indices.length;
- }
+ return defaultIndexedInts;
+ }
- indexedInts.setValues(row, rowSize);
- return indexedInts;
+ @Override
+ public IndexedInts getRow()
+ {
+ IndexedInts ret = getRowOrNull();
+ return ret != null ? ret : getDefaultIndexedInts();
}
@Override
@@ -346,18 +351,14 @@ public ValueMatcher makeValueMatcher(final String value)
@Override
public boolean matches()
{
- Object[] dims = currEntry.get().getDims();
- if (dimIndex >= dims.length) {
+ IndexedInts dimsInt = getRowOrNull();
+ if (dimsInt == null) {
return value == null;
}
- int[] dimsInt = (int[]) dims[dimIndex];
- if (dimsInt == null || dimsInt.length == 0) {
- return value == null;
- }
-
- for (int id : dimsInt) {
- if (id == valueId) {
+ int size = dimsInt.size();
+ for (int i = 0; i < size; i++) {
+ if (dimsInt.get(i) == valueId) {
return true;
}
}
@@ -392,17 +393,14 @@ public ValueMatcher makeValueMatcher(final Predicate predicate)
@Override
public boolean matches()
{
- Object[] dims = currEntry.get().getDims();
- if (dimIndex >= dims.length) {
- return matchNull;
- }
-
- int[] dimsInt = (int[]) dims[dimIndex];
- if (dimsInt == null || dimsInt.length == 0) {
+ IndexedInts dimsInt = getRowOrNull();
+ if (dimsInt == null) {
return matchNull;
}
- for (int id : dimsInt) {
+ int size = dimsInt.size();
+ for (int i = 0; i < size; i++) {
+ int id = dimsInt.get(i);
if (checkedIds.get(id)) {
if (matchingIds.get(id)) {
return true;
@@ -487,12 +485,12 @@ public Object getObject()
return null;
}
- Object[] dims = key.getDims();
- if (dimIndex >= dims.length) {
+ Object dim = key.getDim(dimIndex);
+ if (dim == null) {
return null;
}
- return convertUnsortedEncodedKeyComponentToActualList((int[]) dims[dimIndex]);
+ return convertUnsortedEncodedKeyComponentToActualList((int[]) dim);
}
@SuppressWarnings("deprecation")
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 484bc89fc955..7208070b5416 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -34,6 +34,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -41,6 +42,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.UnparseableColumnsParseException;
@@ -84,6 +86,7 @@
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
@@ -99,7 +102,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable, ColumnInspector
@@ -222,6 +225,36 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
private final long minTimestamp;
private final Granularity gran;
private final boolean rollup;
+ protected final int maxRowCount;
+ protected final long maxBytesInMemory;
+
+ /**
+ * Flag denoting if max possible values should be used to estimate on-heap mem
+ * usage.
+ *
+ * There is one instance of Aggregator per metric per row key.
+ *
Aggregator: For a given metric, compute the max memory an aggregator
+ * can use and multiply that by number of aggregators (same as number of
+ * aggregated rows or number of unique row keys)
+ *
DimensionIndexer: For each row, encode dimension values and estimate
+ * size of original dimension values