diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningFirehoseFactoryIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningFirehoseFactoryIndexTest.java new file mode 100644 index 000000000000..3a0583144bf5 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningFirehoseFactoryIndexTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.function.Function; + +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITCombiningFirehoseFactoryIndexTest extends AbstractITBatchIndexTest +{ + private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_DATASOURCE = "wikipedia_index_test"; + + private static final String COMBINING_INDEX_TASK = "/indexer/wikipedia_combining_firehose_index_task.json"; + private static final String COMBINING_QUERIES_RESOURCE = "/indexer/wikipedia_combining_firehose_index_queries.json"; + private static final String COMBINING_INDEX_DATASOURCE = "wikipedia_comb_index_test"; + + @Test + public void testIndexData() throws Exception + { + try ( + final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored2 = unloader(COMBINING_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + ) { + final Function combiningFirehoseSpecTransform = spec -> { + try { + return StringUtils.replace( + spec, + "%%COMBINING_DATASOURCE%%", + INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + doIndexTest( + INDEX_DATASOURCE, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + false, + true, + true + ); + doIndexTest( + COMBINING_INDEX_DATASOURCE, + COMBINING_INDEX_TASK, + combiningFirehoseSpecTransform, + COMBINING_QUERIES_RESOURCE, + false, + true, + true + ); + } + } + +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_queries.json new file mode 100644 index 000000000000..302d2fea284e --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_queries.json @@ -0,0 +1,141 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query": { + "queryType": "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults": [ + { + "timestamp": "2013-08-31T01:02:33.000Z", + "result": { + "minTime": "2013-08-31T01:02:33.000Z", + "maxTime": "2013-09-01T18:22:39.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query": { + "queryType": "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity": "day", + "intervals": [ + "2013-09-01T00:00/2013-09-02T00:00" + ], + "filter": null, + "aggregations": [ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type": "thetaSketch", + "name": "approxCountTheta", + "fieldName": "thetaSketch", + "size": 16384, + "shouldFinalize": true, + "isInputThetaSketch": false, + "errorBoundsStdDev": null + }, + { + "type": "quantilesDoublesSketch", + "name": "quantilesSketch", + "fieldName": "quantilesDoublesSketch", + "k": 128 + } + ] + }, + "expectedResults": [ + { + "timestamp": "2013-09-01T00:00:00.000Z", + "result": { + "quantilesSketch": 6, + "approxCountTheta": 6.0, + "approxCountHLL": 6 + } + } + ] + }, + { + "description": "having spec on post aggregation", + "query": { + "queryType": "groupBy", + "dataSource": "%%DATASOURCE%%", + "granularity": "day", + "dimensions": [ + "page" + ], + "filter": { + "type": "selector", + "dimension": "language", + "value": "zh" + }, + "aggregations": [ + { + "type": "count", + "name": "rows" + }, + { + "type": "longSum", + "fieldName": "added", + "name": "added_count" + } + ], + "postAggregations": [ + { + "type": "arithmetic", + "name": "added_count_times_ten", + "fn": "*", + "fields": [ + { + "type": "fieldAccess", + "name": "added_count", + "fieldName": "added_count" + }, + { + "type": "constant", + "name": "const", + "value": 10 + } + ] + } + ], + "having": { + "type": "greaterThan", + "aggregation": "added_count_times_ten", + "value": 9000 + }, + "intervals": [ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults": [ + { + "version": "v1", + "timestamp": "2013-08-31T00:00:00.000Z", + "event": { + "added_count_times_ten": 9050.0, + "page": "Crimson Typhoon", + "added_count": 905, + "rows": 1 + } + }, + { + "version": "v1", + "timestamp": "2013-08-31T00:00:00.000Z", + "event": { + "added_count_times_ten": 9770.0, + "page": "Gypsy Danger", + "added_count": 977, + "rows": 1 + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_task.json new file mode 100644 index 000000000000..1e7deffc6b5b --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_combining_firehose_index_task.json @@ -0,0 +1,95 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "combining", + "delegates": [ + { + "type": "local", + "baseDir": "/resources/indexer", + "filter": "wikipedia_combining_index_data.json" + }, + { + "type": "ingestSegment", + "dataSource": "%%COMBINING_DATASOURCE%%", + "interval": "2013-08-31/2013-09-02" + } + ] + } + }, + "tuningConfig": { + "type": "index", + "maxRowsPerSegment": 3 + } + } +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_combining_index_data.json b/integration-tests/src/test/resources/indexer/wikipedia_combining_index_data.json new file mode 100644 index 000000000000..fb7ca6ef70d8 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_combining_index_data.json @@ -0,0 +1,3 @@ +{"timestamp": "2013-08-31T05:18:22Z", "page": "Gypsy Danger", "language" : "zh", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 977, "deleted": 200, "delta": -143} +{"timestamp": "2013-08-31T17:57:01Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"timestamp": "2013-09-01T18:22:39Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYo", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java index fb640a2e765b..6e61e19a1ea5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactory.java @@ -22,9 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -33,11 +36,12 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; /** * Creates firehose that combines data from different Firehoses. Useful for ingesting data from multiple sources. */ -public class CombiningFirehoseFactory implements FirehoseFactory +public class CombiningFirehoseFactory implements FiniteFirehoseFactory> { private static final EmittingLogger log = new EmittingLogger(CombiningFirehoseFactory.class); @@ -64,6 +68,32 @@ public List getDelegateFactoryList() return delegateFactoryList; } + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public Stream>> getSplits( + @Nullable SplitHintSpec splitHintSpec + ) + { + return Stream.of(new InputSplit<>(delegateFactoryList)); + } + + @Override + public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) + { + return 1; + } + + @Override + public FiniteFirehoseFactory> withSplit(InputSplit> split) + { + return new CombiningFirehoseFactory(split.get()); + } + class CombiningFirehose implements Firehose { private final InputRowParser parser; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java index cb2be4fe40db..14114701f2ec 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -19,15 +19,18 @@ package org.apache.druid.segment.realtime.firehose; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; @@ -38,22 +41,27 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; public class CombiningFirehoseFactoryTest { + private CombiningFirehoseFactory combiningFirehoseFactory; + private List delegateFirehoses; + + @Before + public void setUp() + { + delegateFirehoses = Arrays.asList( + new ListFirehoseFactory(Arrays.asList(makeRow(1, 1), makeRow(2, 2))), + new ListFirehoseFactory(Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5))) + ); + combiningFirehoseFactory = new CombiningFirehoseFactory(delegateFirehoses); + } @Test public void testCombiningfirehose() throws IOException { - List list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2)); - List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5)); - FirehoseFactory combiningFactory = new CombiningFirehoseFactory( - Arrays.asList( - new ListFirehoseFactory(list1), - new ListFirehoseFactory(list2) - ) - ); - final Firehose firehose = combiningFactory.connect(null, null); + final Firehose firehose = combiningFirehoseFactory.connect(null, null); for (int i = 1; i < 6; i++) { Assert.assertTrue(firehose.hasMore()); final InputRow inputRow = firehose.nextRow(); @@ -63,6 +71,21 @@ public void testCombiningfirehose() throws IOException Assert.assertFalse(firehose.hasMore()); } + @Test + public void testFirehoseNotParallelizable() + { + Optional>> maybeFirehoseWithSplit = combiningFirehoseFactory.getSplits(null) + .findFirst(); + + Assert.assertTrue(maybeFirehoseWithSplit.isPresent()); + FiniteFirehoseFactory> firehoseWithSplit = combiningFirehoseFactory.withSplit( + maybeFirehoseWithSplit.get()); + Assert.assertTrue(firehoseWithSplit instanceof CombiningFirehoseFactory); + Assert.assertFalse(combiningFirehoseFactory.isSplittable()); + Assert.assertEquals(delegateFirehoses, ((CombiningFirehoseFactory) firehoseWithSplit).getDelegateFactoryList()); + } + + private InputRow makeRow(final long timestamp, final float metricValue) { return new InputRow()