From 09d367866736cc67d0015d606e184ad534061197 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 22 Jan 2016 22:54:01 -0600 Subject: [PATCH] adding single threaded indexing and querying test for IncrementalIndex --- .../segment/data/IncrementalIndexTest.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 3a174876331a..b769c6ba3fbf 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -22,6 +22,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -221,6 +222,106 @@ public void testCaseSensitivity() throws Exception Assert.assertEquals(timestamp, row.getTimestampFromEpoch()); Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); + + index.close(); + } + + @Test + public void testSingleThreadedIndexingAndQuery() throws Exception + { + final int dimensionCount = 5; + final ArrayList ingestAggregatorFactories = new ArrayList<>(); + ingestAggregatorFactories.add(new CountAggregatorFactory("rows")); + for (int i = 0; i < dimensionCount; ++i) { + ingestAggregatorFactories.add( + new LongSumAggregatorFactory( + String.format("sumResult%s", i), + String.format("Dim_%s", i) + ) + ); + ingestAggregatorFactories.add( + new DoubleSumAggregatorFactory( + String.format("doubleSumResult%s", i), + String.format("Dim_%s", i) + ) + ); + } + + final IncrementalIndex index = indexCreator.createIndex( + ingestAggregatorFactories.toArray( + new AggregatorFactory[ingestAggregatorFactories.size()] + ) + ); + final long timestamp = System.currentTimeMillis(); + + final int rows = 50; + + //ingesting same data twice to have some merging happening + for (int i = 0; i < rows; i++) { + index.add(getLongRow(timestamp + i, i, dimensionCount)); + } + + for (int i = 0; i < rows; i++) { + index.add(getLongRow(timestamp + i, i, dimensionCount)); + } + + //run a timeseries query on the index and verify results + final ArrayList queryAggregatorFactories = new ArrayList<>(); + queryAggregatorFactories.add(new CountAggregatorFactory("rows")); + for (int i = 0; i < dimensionCount; ++i) { + queryAggregatorFactories.add( + new LongSumAggregatorFactory( + String.format("sumResult%s", i), + String.format("sumResult%s", i) + ) + ); + queryAggregatorFactories.add( + new DoubleSumAggregatorFactory( + String.format("doubleSumResult%s", i), + String.format("doubleSumResult%s", i) + ) + ); + } + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("xxx") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(new Interval("2000/2030"))) + .aggregators(queryAggregatorFactories) + .build(); + + final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); + final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + final QueryRunner> runner = new FinalizeResultsQueryRunner>( + factory.createRunner(incrementalIndexSegment), + factory.getToolchest() + ); + + + List> results = Sequences.toList( + runner.run(query, new HashMap()), + new LinkedList>() + ); + Result result = Iterables.getOnlyElement(results); + Assert.assertEquals(rows, result.getValue().getLongMetric("rows").intValue()); + for (int i = 0; i < dimensionCount; ++i) { + Assert.assertEquals( + String.format("Failed long sum on dimension %d", i), + 2*rows, + result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue() + ); + Assert.assertEquals( + String.format("Failed double sum on dimension %d", i), + 2*rows, + result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() + ); + } + + index.close(); } @Test(timeout = 60_000L) @@ -436,6 +537,8 @@ public Double[] accumulate( ); } } + + index.close(); } @Test @@ -481,6 +584,8 @@ public void run() curr++; } Assert.assertEquals(elementsPerThread, curr); + + index.close(); } @Test @@ -507,5 +612,7 @@ public void testgetDimensions() 1000000 ); Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); + + incrementalIndex.close(); } }