Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public DefaultObjectMapper(JsonFactory factory)
registerModule(new AggregatorsModule());
registerModule(new SegmentsModule());
registerModule(new StringComparatorModule());
registerModule(new SegmentizerModule());

configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
Expand Down
32 changes: 32 additions & 0 deletions processing/src/main/java/io/druid/jackson/SegmentizerModule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.jackson;

import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.druid.segment.loading.MMappedQueryableSegmentizerFactory;

public class SegmentizerModule extends SimpleModule
{
public SegmentizerModule() {
super("SegmentizerModule");
registerSubtypes(new NamedType(MMappedQueryableSegmentizerFactory.class, "mMapSegmentFactory"));
}
}
11 changes: 10 additions & 1 deletion processing/src/main/java/io/druid/segment/IndexMergerV9.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.loading.MMappedQueryableSegmentizerFactory;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnSerializer;
import io.druid.segment.serde.ComplexMetricSerde;
Expand All @@ -59,6 +60,7 @@
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
Expand Down Expand Up @@ -156,6 +158,13 @@ public void close() throws IOException
);
log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime);

progress.progress();
startTime = System.currentTimeMillis();
try (FileOutputStream fos = new FileOutputStream(new File(outDir, "factory.json"))) {
mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO));
}
log.info("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime);

progress.progress();
final Map<String, ValueType> metricsValueTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final Map<String, String> metricTypeNames = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
Expand Down Expand Up @@ -206,7 +215,7 @@ public void close() throws IOException
makeTimeColumn(v9Smoosher, progress, timeWriter);
makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters);

for(int i = 0; i < mergedDimensions.size(); i++) {
for (int i = 0; i < mergedDimensions.size(); i++) {
DimensionMergerV9 merger = (DimensionMergerV9) mergers.get(i);
merger.writeIndexes(rowNumConversions, closer);
if (merger.canSkip()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,35 @@

package io.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;

import io.druid.java.util.common.logger.Logger;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;

import java.io.File;
import java.io.IOException;

/**
*/
public class MMappedQueryableIndexFactory implements QueryableIndexFactory
public class MMappedQueryableSegmentizerFactory implements SegmentizerFactory
{
private static final Logger log = new Logger(MMappedQueryableIndexFactory.class);
private static final Logger log = new Logger(MMappedQueryableSegmentizerFactory.class);

private final IndexIO indexIO;

@Inject
public MMappedQueryableIndexFactory(IndexIO indexIO)
public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO)
{
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
}

@Override
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException
public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException
{
try {
return indexIO.loadIndex(parentDir);
return new QueryableIndexSegment(dataSegment.getIdentifier(), indexIO.loadIndex(parentDir));
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package io.druid.segment.loading;

import io.druid.segment.QueryableIndex;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;

import java.io.File;

/**
* Factory that loads segment files from the disk and creates {@link Segment} object
*/
public interface QueryableIndexFactory
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class)
public interface SegmentizerFactory
{
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException;
public Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment.loading;

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.jackson.SegmentizerModule;
import io.druid.segment.IndexIO;
import io.druid.segment.column.ColumnConfig;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;

public class SegmentizerFactoryTest
{
@Test
public void testFactory() throws IOException{
File factoryFile = Files.createTempFile("", "factory.json").toFile();
FileOutputStream fos = new FileOutputStream(factoryFile);
ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModule(new SegmentizerModule());
IndexIO indexIO = new IndexIO(mapper, new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 777;
}
});
mapper.setInjectableValues(
new InjectableValues.Std().addValue(
IndexIO.class,
indexIO
)
);
mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO));
fos.close();

SegmentizerFactory factory = mapper.readValue(factoryFile, SegmentizerFactory.class);
Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory);
}
}
3 changes: 0 additions & 3 deletions server/src/main/java/io/druid/guice/StorageNodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import io.druid.query.DruidProcessingConfig;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.MMappedQueryableIndexFactory;
import io.druid.segment.loading.QueryableIndexFactory;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
Expand All @@ -49,7 +47,6 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);

binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null));
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);

binder.bind(QueryRunnerFactoryConglomerate.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Json;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.Segment;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
Expand All @@ -43,7 +42,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);

private final QueryableIndexFactory factory;
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;

Expand All @@ -53,12 +52,12 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader

@Inject
public SegmentLoaderLocalCacheManager(
QueryableIndexFactory factory,
IndexIO indexIO,
SegmentLoaderConfig config,
@Json ObjectMapper mapper
)
{
this.factory = factory;
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;

Expand All @@ -78,7 +77,7 @@ public int compare(StorageLocation left, StorageLocation right)

public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config)
{
return new SegmentLoaderLocalCacheManager(factory, config, jsonMapper);
return new SegmentLoaderLocalCacheManager(indexIO, config, jsonMapper);
}

@Override
Expand All @@ -102,9 +101,21 @@ public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
File segmentFiles = getSegmentFiles(segment);
final QueryableIndex index = factory.factorize(segmentFiles);
File factoryJson = new File(segmentFiles, "factory.json");
final SegmentizerFactory factory;

return new QueryableIndexSegment(segment.getIdentifier(), index);
if (factoryJson.exists()) {
try {
factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
} else {
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}

return factory.factorize(segment, segmentFiles);
}

@Override
Expand Down Expand Up @@ -142,7 +153,11 @@ private StorageLocation loadSegmentWithRetry(DataSegment segment, String storage
return loc;
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment in current location %s, try next location if any", loc.getPath().getAbsolutePath())
log.makeAlert(
e,
"Failed to load segment in current location %s, try next location if any",
loc.getPath().getAbsolutePath()
)
.addData("location", loc.getPath().getAbsolutePath())
.emit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setUp() throws Exception
locations.add(locationConfig);

manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testRetrySuccessAtFirstLocation() throws Exception
locations.add(locationConfig2);

manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testRetrySuccessAtSecondLocation() throws Exception
locations.add(locationConfig2);

manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testRetryAllFail() throws Exception
locations.add(locationConfig2);

manager = new SegmentLoaderLocalCacheManager(
new MMappedQueryableIndexFactory(TestHelper.getTestIndexIO()),
TestHelper.getTestIndexIO(),
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
Expand Down