diff --git a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java index 88bcc527ab78..f0ca93741543 100644 --- a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java @@ -49,6 +49,7 @@ public DefaultObjectMapper(JsonFactory factory) registerModule(new AggregatorsModule()); registerModule(new SegmentsModule()); registerModule(new StringComparatorModule()); + registerModule(new SegmentizerModule()); configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); configure(MapperFeature.AUTO_DETECT_GETTERS, false); diff --git a/processing/src/main/java/io/druid/jackson/SegmentizerModule.java b/processing/src/main/java/io/druid/jackson/SegmentizerModule.java new file mode 100644 index 000000000000..e417034ac5fa --- /dev/null +++ b/processing/src/main/java/io/druid/jackson/SegmentizerModule.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.jackson; + +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import io.druid.segment.loading.MMappedQueryableSegmentizerFactory; + +public class SegmentizerModule extends SimpleModule +{ + public SegmentizerModule() { + super("SegmentizerModule"); + registerSubtypes(new NamedType(MMappedQueryableSegmentizerFactory.class, "mMapSegmentFactory")); + } +} diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index b4e5bf145b98..764046f87386 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -46,6 +46,7 @@ import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.IOPeon; import io.druid.segment.data.TmpFileIOPeon; +import io.druid.segment.loading.MMappedQueryableSegmentizerFactory; import io.druid.segment.serde.ComplexColumnPartSerde; import io.druid.segment.serde.ComplexColumnSerializer; import io.druid.segment.serde.ComplexMetricSerde; @@ -59,6 +60,7 @@ import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.IntBuffer; @@ -156,6 +158,13 @@ public void close() throws IOException ); log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime); + progress.progress(); + startTime = System.currentTimeMillis(); + try (FileOutputStream fos = new FileOutputStream(new File(outDir, "factory.json"))) { + mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO)); + } + log.info("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime); + progress.progress(); final Map metricsValueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); @@ -206,7 +215,7 @@ public void close() throws IOException makeTimeColumn(v9Smoosher, progress, timeWriter); makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters); - for(int i = 0; i < mergedDimensions.size(); i++) { + for (int i = 0; i < mergedDimensions.size(); i++) { DimensionMergerV9 merger = (DimensionMergerV9) mergers.get(i); merger.writeIndexes(rowNumConversions, closer); if (merger.canSkip()) { diff --git a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java b/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java similarity index 67% rename from server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java rename to processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index 3f095bff556b..23c9007c9ab0 100644 --- a/server/src/main/java/io/druid/segment/loading/MMappedQueryableIndexFactory.java +++ b/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -19,35 +19,35 @@ package io.druid.segment.loading; +import com.fasterxml.jackson.annotation.JacksonInject; import com.google.common.base.Preconditions; -import com.google.inject.Inject; - import io.druid.java.util.common.logger.Logger; import io.druid.segment.IndexIO; -import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.Segment; +import io.druid.timeline.DataSegment; import java.io.File; import java.io.IOException; /** */ -public class MMappedQueryableIndexFactory implements QueryableIndexFactory +public class MMappedQueryableSegmentizerFactory implements SegmentizerFactory { - private static final Logger log = new Logger(MMappedQueryableIndexFactory.class); + private static final Logger log = new Logger(MMappedQueryableSegmentizerFactory.class); private final IndexIO indexIO; - @Inject - public MMappedQueryableIndexFactory(IndexIO indexIO) + public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO) { this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @Override - public QueryableIndex factorize(File parentDir) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException { try { - return indexIO.loadIndex(parentDir); + return new QueryableIndexSegment(dataSegment.getIdentifier(), indexIO.loadIndex(parentDir)); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/server/src/main/java/io/druid/segment/loading/QueryableIndexFactory.java b/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java similarity index 65% rename from server/src/main/java/io/druid/segment/loading/QueryableIndexFactory.java rename to processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java index 475295a21164..89aa9f002b8f 100644 --- a/server/src/main/java/io/druid/segment/loading/QueryableIndexFactory.java +++ b/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java @@ -19,13 +19,17 @@ package io.druid.segment.loading; -import io.druid.segment.QueryableIndex; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.segment.Segment; +import io.druid.timeline.DataSegment; import java.io.File; /** + * Factory that loads segment files from the disk and creates {@link Segment} object */ -public interface QueryableIndexFactory +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class) +public interface SegmentizerFactory { - public QueryableIndex factorize(File parentDir) throws SegmentLoadingException; + public Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException; } diff --git a/processing/src/test/java/io/druid/segment/loading/SegmentizerFactoryTest.java b/processing/src/test/java/io/druid/segment/loading/SegmentizerFactoryTest.java new file mode 100644 index 000000000000..aa462b377227 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/loading/SegmentizerFactoryTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.jackson.SegmentizerModule; +import io.druid.segment.IndexIO; +import io.druid.segment.column.ColumnConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; + +public class SegmentizerFactoryTest +{ + @Test + public void testFactory() throws IOException{ + File factoryFile = Files.createTempFile("", "factory.json").toFile(); + FileOutputStream fos = new FileOutputStream(factoryFile); + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerModule(new SegmentizerModule()); + IndexIO indexIO = new IndexIO(mapper, new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 777; + } + }); + mapper.setInjectableValues( + new InjectableValues.Std().addValue( + IndexIO.class, + indexIO + ) + ); + mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO)); + fos.close(); + + SegmentizerFactory factory = mapper.readValue(factoryFile, SegmentizerFactory.class); + Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory); + } +} diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index 4d9d43b9d0da..65c4a9d7f125 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -30,8 +30,6 @@ import io.druid.query.DruidProcessingConfig; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.column.ColumnConfig; -import io.druid.segment.loading.MMappedQueryableIndexFactory; -import io.druid.segment.loading.QueryableIndexFactory; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; @@ -49,7 +47,6 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); binder.bind(NodeTypeConfig.class).toProvider(Providers.of(null)); - binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); binder.bind(QueryRunnerFactoryConglomerate.class) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 97dfb5e3fc15..cb3cbb5309fd 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -26,8 +26,7 @@ import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.guice.annotations.Json; -import io.druid.segment.QueryableIndex; -import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.IndexIO; import io.druid.segment.Segment; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; @@ -43,7 +42,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader { private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class); - private final QueryableIndexFactory factory; + private final IndexIO indexIO; private final SegmentLoaderConfig config; private final ObjectMapper jsonMapper; @@ -53,12 +52,12 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader @Inject public SegmentLoaderLocalCacheManager( - QueryableIndexFactory factory, + IndexIO indexIO, SegmentLoaderConfig config, @Json ObjectMapper mapper ) { - this.factory = factory; + this.indexIO = indexIO; this.config = config; this.jsonMapper = mapper; @@ -78,7 +77,7 @@ public int compare(StorageLocation left, StorageLocation right) public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config) { - return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper); + return new SegmentLoaderLocalCacheManager(indexIO, config, jsonMapper); } @Override @@ -102,9 +101,21 @@ public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) public Segment getSegment(DataSegment segment) throws SegmentLoadingException { File segmentFiles = getSegmentFiles(segment); - final QueryableIndex index = factory.factorize(segmentFiles); + File factoryJson = new File(segmentFiles, "factory.json"); + final SegmentizerFactory factory; - return new QueryableIndexSegment(segment.getIdentifier(), index); + if (factoryJson.exists()) { + try { + factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "%s", e.getMessage()); + } + } else { + factory = new MMappedQueryableSegmentizerFactory(indexIO); + } + + return factory.factorize(segment, segmentFiles); } @Override @@ -142,7 +153,11 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage return loc; } catch (SegmentLoadingException e) { - log.makeAlert(e, "Failed to load segment in current location %s, try next location if any", loc.getPath().getAbsolutePath()) + log.makeAlert( + e, + "Failed to load segment in current location %s, try next location if any", + loc.getPath().getAbsolutePath() + ) .addData("location", loc.getPath().getAbsolutePath()) .emit(); diff --git a/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java index 2e4c9bb855f4..85bdc4ca8373 100644 --- a/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java +++ b/server/src/test/java/io/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java @@ -76,7 +76,7 @@ public void setUp() throws Exception locations.add(locationConfig); manager = new SegmentLoaderLocalCacheManager( - new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()), + TestHelper.getTestIndexIO(), new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -150,7 +150,7 @@ public void testRetrySuccessAtFirstLocation() throws Exception locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( - new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()), + TestHelper.getTestIndexIO(), new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -203,7 +203,7 @@ public void testRetrySuccessAtSecondLocation() throws Exception locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( - new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()), + TestHelper.getTestIndexIO(), new SegmentLoaderConfig().withLocations(locations), jsonMapper ); @@ -258,7 +258,7 @@ public void testRetryAllFail() throws Exception locations.add(locationConfig2); manager = new SegmentLoaderLocalCacheManager( - new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()), + TestHelper.getTestIndexIO(), new SegmentLoaderConfig().withLocations(locations), jsonMapper );