From d5c8ed493b71df3bd755d419a95ffe81ac54542f Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 21 Sep 2016 14:50:36 -0700 Subject: [PATCH 1/4] adding default offline appenderator --- .../appenderator/AppenderatorFactory.java | 3 +- .../realtime/appenderator/Appenderators.java | 6 +- .../DefaultOfflineAppenderatorFactory.java | 60 +++++++++++++++++++ ...> DefaultRealtimeAppenderatorFactory.java} | 4 +- 4 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java rename server/src/main/java/io/druid/segment/realtime/appenderator/{DefaultAppenderatorFactory.java => DefaultRealtimeAppenderatorFactory.java} (97%) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java index 039fb8995bc7..5e21a64f10a5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java @@ -28,7 +28,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "default", value = DefaultAppenderatorFactory.class) + @JsonSubTypes.Type(name = "default", value = DefaultRealtimeAppenderatorFactory.class), + @JsonSubTypes.Type(name = "offline", value = DefaultOfflineAppenderatorFactory.class) }) public interface AppenderatorFactory { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index 7ec9e566e07c..73746b01492a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -23,11 +23,11 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -124,8 +124,8 @@ public boolean isAnnounced(DataSegment segment) null, indexIO, indexMerger, - null, - null + MapCache.create(500000), + new CacheConfig() ); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java new file mode 100644 index 000000000000..0b3ceeecfec6 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.FireDepartmentMetrics; + + +public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory +{ + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + + @Inject + public DefaultOfflineAppenderatorFactory( + @JacksonInject DataSegmentPusher dataSegmentPusher, + @JacksonInject ObjectMapper objectMapper, + @JacksonInject IndexIO indexIO, + @JacksonInject IndexMerger indexMerger + ) { + this.dataSegmentPusher = dataSegmentPusher; + this.objectMapper = objectMapper; + this.indexIO = indexIO; + this.indexMerger = indexMerger; + } + + @Override + public Appenderator build( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return Appenderators.createOffline(schema, config, metrics, dataSegmentPusher, objectMapper, indexIO, indexMerger); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java similarity index 97% rename from server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java rename to server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index a7b8d06bd261..6be9b1b355fe 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -39,7 +39,7 @@ import java.io.File; import java.util.concurrent.ExecutorService; -public class DefaultAppenderatorFactory implements AppenderatorFactory +public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory { private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; @@ -52,7 +52,7 @@ public class DefaultAppenderatorFactory implements AppenderatorFactory private final Cache cache; private final CacheConfig cacheConfig; - public DefaultAppenderatorFactory( + public DefaultRealtimeAppenderatorFactory( @JacksonInject ServiceEmitter emitter, @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, From 8e721925342f14b2d8b46d9b43a4b94aaea3ff77 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 21 Sep 2016 15:44:48 -0700 Subject: [PATCH 2/4] adding test --- ...DefaultOfflineAppenderatorFactoryTest.java | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java new file mode 100644 index 000000000000..1fb7907544e2 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.guice.GuiceInjectors; +import io.druid.initialization.Initialization; +import io.druid.query.DruidProcessingConfig; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.FireDepartmentMetrics; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Map; + +public class DefaultOfflineAppenderatorFactoryTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testBuild() throws IOException + { + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bind(DruidProcessingConfig.class).toInstance( + new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return "processing-%s"; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 100 * 1024 * 1024; + } + + @Override + public int getNumThreads() + { + return 1; + } + + @Override + public int columnCacheSizeBytes() + { + return 25 * 1024 * 1024; + } + } + ); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); + } + } + ) + ); + DefaultOfflineAppenderatorFactory defaultOfflineAppenderatorFactory = injector.getInstance( + DefaultOfflineAppenderatorFactory.class); + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + final Map parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null + ) + ), + Map.class + ); + DataSchema schema = new DataSchema( + "dataSourceName", + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularity.MINUTE, QueryGranularities.NONE, null), + objectMapper + ); + + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 75000, + null, + null, + temporaryFolder.newFolder(), + null, + null, + null, + null, + null, + null, + 0, + 0, + null, + null + ); + + Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + schema, + tuningConfig, + new FireDepartmentMetrics() + ); + Assert.assertEquals(null, appenderator.startJob()); + Assert.assertEquals("dataSourceName", appenderator.getDataSource()); + appenderator.close(); + } + +} \ No newline at end of file From 6ca2a568b1556057e8bc7df83e61015137b34781 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 13 Oct 2016 10:53:59 -0700 Subject: [PATCH 3/4] fix comments --- .../appenderator/AppenderatorImpl.java | 8 +++--- .../realtime/appenderator/Appenderators.java | 6 ++--- ...DefaultOfflineAppenderatorFactoryTest.java | 27 +++++++++++++------ 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index f2588a9a2e0a..6f59a8de6180 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -143,7 +143,7 @@ public AppenderatorImpl( this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer"); this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger"); - this.cache = Preconditions.checkNotNull(cache, "cache"); + this.cache = cache; this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker( schema.getDataSource(), sinkTimeline, @@ -151,7 +151,7 @@ public AppenderatorImpl( emitter, conglomerate, queryExecutorService, - cache, + Preconditions.checkNotNull(cache, "cache"), cacheConfig ); @@ -912,7 +912,9 @@ public Object apply(@Nullable Object input) identifier.getShardSpec().createChunk(sink) ); for (FireHydrant hydrant : sink) { - cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + if (cache != null) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } } if (removeOnDiskData) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index 73746b01492a..f978221e9558 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -23,7 +23,6 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.client.cache.MapCache; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -124,10 +123,9 @@ public boolean isAnnounced(DataSegment segment) null, indexIO, indexMerger, - MapCache.create(500000), - new CacheConfig() + null, + null ); } - } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 1fb7907544e2..f007caf9e5ff 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; @@ -42,6 +43,9 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.plumber.Committers; +import io.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -56,7 +60,7 @@ public class DefaultOfflineAppenderatorFactoryTest public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testBuild() throws IOException + public void testBuild() throws IOException, SegmentNotWritableException { Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -142,14 +146,21 @@ public int columnCacheSizeBytes() null ); - Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + try(Appenderator appenderator = defaultOfflineAppenderatorFactory.build( schema, tuningConfig, new FireDepartmentMetrics() - ); - Assert.assertEquals(null, appenderator.startJob()); - Assert.assertEquals("dataSourceName", appenderator.getDataSource()); - appenderator.close(); + )){ + Assert.assertEquals("dataSourceName", appenderator.getDataSource()); + Assert.assertEquals(null, appenderator.startJob()); + SegmentIdentifier identifier = new SegmentIdentifier("dataSourceName", new Interval("2000/2001"), "A", new LinearShardSpec(0)); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(identifier, AppenderatorTest.IR("2000", "bar", 1), Suppliers.ofInstance(Committers.nil())); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(identifier, AppenderatorTest.IR("2000", "baz", 1), Suppliers.ofInstance(Committers.nil())); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } } - -} \ No newline at end of file +} From c62aaebaeaee8d6d4b48edb19e6ac513c9f9350a Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Tue, 18 Oct 2016 10:42:14 -0700 Subject: [PATCH 4/4] fix comments --- .../DefaultOfflineAppenderatorFactory.java | 4 ++-- .../DefaultRealtimeAppenderatorFactory.java | 3 ++- .../DefaultOfflineAppenderatorFactoryTest.java | 16 +++++++++++----- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java index 0b3ceeecfec6..2b81e4f4d850 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -20,8 +20,8 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; @@ -37,7 +37,7 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory private final IndexIO indexIO; private final IndexMerger indexMerger; - @Inject + @JsonCreator public DefaultOfflineAppenderatorFactory( @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject ObjectMapper objectMapper, diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 6be9b1b355fe..9ff9c8edf001 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -39,7 +39,8 @@ import java.io.File; import java.util.concurrent.ExecutorService; -public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory +public class +DefaultRealtimeAppenderatorFactory implements AppenderatorFactory { private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index f007caf9e5ff..53c8c177a379 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -104,9 +104,10 @@ public int columnCacheSizeBytes() } ) ); - DefaultOfflineAppenderatorFactory defaultOfflineAppenderatorFactory = injector.getInstance( - DefaultOfflineAppenderatorFactory.class); ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + AppenderatorFactory defaultOfflineAppenderatorFactory = objectMapper.reader(AppenderatorFactory.class) + .readValue("{\"type\":\"offline\"}"); + final Map parserMap = objectMapper.convertValue( new MapInputRowParser( new JSONParseSpec( @@ -146,14 +147,19 @@ public int columnCacheSizeBytes() null ); - try(Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build( schema, tuningConfig, new FireDepartmentMetrics() - )){ + )) { Assert.assertEquals("dataSourceName", appenderator.getDataSource()); Assert.assertEquals(null, appenderator.startJob()); - SegmentIdentifier identifier = new SegmentIdentifier("dataSourceName", new Interval("2000/2001"), "A", new LinearShardSpec(0)); + SegmentIdentifier identifier = new SegmentIdentifier( + "dataSourceName", + new Interval("2000/2001"), + "A", + new LinearShardSpec(0) + ); Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); appenderator.add(identifier, AppenderatorTest.IR("2000", "bar", 1), Suppliers.ofInstance(Committers.nil())); Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());