Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
Expand All @@ -45,6 +45,7 @@
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
Expand All @@ -57,6 +58,7 @@
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -84,6 +86,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -125,19 +128,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
false,
0
);
private static final CompactionState DEFAULT_COMPACTION_STATE = new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
);
private static CompactionState DEFAULT_COMPACTION_STATE;

private static final List<String> TEST_ROWS = ImmutableList.of(
"2014-01-01T00:00:10Z,a,1\n",
Expand Down Expand Up @@ -195,6 +186,17 @@ public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(
this.lockGranularity = lockGranularity;
}

@BeforeClass
public static void setupClass() throws JsonProcessingException
{
ObjectMapper mapper = new DefaultObjectMapper();

DEFAULT_COMPACTION_STATE = new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class)
);
}

@Before
public void setup() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.serde.ColumnPartSerde;
import org.apache.druid.segment.serde.ComplexColumnPartSerde;
import org.apache.druid.segment.serde.ComplexMetricSerde;
Expand Down Expand Up @@ -152,7 +153,12 @@ private File makeIndexFiles(
progress.progress();
startTime = System.currentTimeMillis();
try (FileOutputStream fos = new FileOutputStream(new File(outDir, "factory.json"))) {
mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO));
SegmentizerFactory customSegmentLoader = indexSpec.getSegmentLoader();
if (customSegmentLoader != null) {
mapper.writeValue(fos, customSegmentLoader);
} else {
mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO));
}
}
log.debug("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime);

Expand Down
34 changes: 30 additions & 4 deletions processing/src/main/java/org/apache/druid/segment/IndexSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.druid.segment.data.BitmapSerde;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.loading.SegmentizerFactory;

import javax.annotation.Nullable;
import java.util.Arrays;
Expand Down Expand Up @@ -62,13 +64,26 @@ public class IndexSpec
private final CompressionStrategy metricCompression;
private final CompressionFactory.LongEncodingStrategy longEncoding;

@Nullable
private final SegmentizerFactory segmentLoader;

/**
* Creates an IndexSpec with default parameters
*/
public IndexSpec()
{
this(null, null, null, null);
this(null, null, null, null, null);
}

@VisibleForTesting
public IndexSpec(
@Nullable BitmapSerdeFactory bitmapSerdeFactory,
@Nullable CompressionStrategy dimensionCompression,
@Nullable CompressionStrategy metricCompression,
@Nullable CompressionFactory.LongEncodingStrategy longEncoding
)
{
this(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, null);
}

/**
Expand All @@ -93,7 +108,8 @@ public IndexSpec(
@JsonProperty("bitmap") @Nullable BitmapSerdeFactory bitmapSerdeFactory,
@JsonProperty("dimensionCompression") @Nullable CompressionStrategy dimensionCompression,
@JsonProperty("metricCompression") @Nullable CompressionStrategy metricCompression,
@JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding
@JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding,
@JsonProperty("segmentLoader") @Nullable SegmentizerFactory segmentLoader
)
{
Preconditions.checkArgument(dimensionCompression == null || DIMENSION_COMPRESSION.contains(dimensionCompression),
Expand All @@ -111,6 +127,7 @@ public IndexSpec(
this.dimensionCompression = dimensionCompression == null ? DEFAULT_DIMENSION_COMPRESSION : dimensionCompression;
this.metricCompression = metricCompression == null ? DEFAULT_METRIC_COMPRESSION : metricCompression;
this.longEncoding = longEncoding == null ? DEFAULT_LONG_ENCODING : longEncoding;
this.segmentLoader = segmentLoader;
}

@JsonProperty("bitmap")
Expand All @@ -137,6 +154,13 @@ public CompressionFactory.LongEncodingStrategy getLongEncoding()
return longEncoding;
}

@JsonProperty
@Nullable
public SegmentizerFactory getSegmentLoader()
{
return segmentLoader;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -150,13 +174,14 @@ public boolean equals(Object o)
return Objects.equals(bitmapSerdeFactory, indexSpec.bitmapSerdeFactory) &&
dimensionCompression == indexSpec.dimensionCompression &&
metricCompression == indexSpec.metricCompression &&
longEncoding == indexSpec.longEncoding;
longEncoding == indexSpec.longEncoding &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be useful to add an EqualsVerifier test to IndexSpecTest for this.

Objects.equals(segmentLoader, indexSpec.segmentLoader);
}

@Override
public int hashCode()
{
return Objects.hash(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding);
return Objects.hash(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, segmentLoader);
}

@Override
Expand All @@ -167,6 +192,7 @@ public String toString()
", dimensionCompression=" + dimensionCompression +
", metricCompression=" + metricCompression +
", longEncoding=" + longEncoding +
", segmentLoader=" + segmentLoader +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;

public class CustomSegmentizerFactoryTest extends InitializedNullHandlingTest
{
private static ObjectMapper JSON_MAPPER;
private static IndexIO INDEX_IO;
private static IndexMerger INDEX_MERGER;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@BeforeClass
public static void setup()
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModule(new SegmentizerModule());
mapper.registerSubtypes(new NamedType(CustomSegmentizerFactory.class, "customSegmentFactory"));
final IndexIO indexIO = new IndexIO(mapper, () -> 0);

mapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), mapper)
.addValue(IndexIO.class, indexIO)
.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT)
);

JSON_MAPPER = mapper;
INDEX_IO = indexIO;
INDEX_MERGER = new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}

@Test
public void testDefaultSegmentizerPersist() throws IOException
{
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
File segment = new File(temporaryFolder.newFolder(), "segment");
File persisted = INDEX_MERGER.persist(
data,
Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"),
segment,
new IndexSpec(
null,
null,
null,
null,
null
),
null
);

File factoryJson = new File(persisted, "factory.json");
Assert.assertTrue(factoryJson.exists());
SegmentizerFactory factory = JSON_MAPPER.readValue(factoryJson, SegmentizerFactory.class);
Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory);
}

@Test
public void testCustomSegmentizerPersist() throws IOException
{
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
File segment = new File(temporaryFolder.newFolder(), "segment");
File persisted = INDEX_MERGER.persist(
data,
Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"),
segment,
new IndexSpec(
null,
null,
null,
null,
new CustomSegmentizerFactory()
),
null
);

File factoryJson = new File(persisted, "factory.json");
Assert.assertTrue(factoryJson.exists());
SegmentizerFactory factory = JSON_MAPPER.readValue(factoryJson, SegmentizerFactory.class);
Assert.assertTrue(factory instanceof CustomSegmentizerFactory);
}

private static class CustomSegmentizerFactory implements SegmentizerFactory
{
@Override
public Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException
{
try {
return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy), segment.getId());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment;

import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
Expand Down Expand Up @@ -65,4 +66,10 @@ public void testDefaults()
Assert.assertEquals(CompressionStrategy.LZ4, spec.getMetricCompression());
Assert.assertEquals(CompressionFactory.LongEncodingStrategy.LONGS, spec.getLongEncoding());
}

@Test
public void testEquals()
{
EqualsVerifier.forClass(IndexSpec.class).usingGetClass().verify();
}
}