From 45d0e395b9ce1f775255ad9bff47756072ba0e24 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sat, 30 May 2020 02:41:53 -0700 Subject: [PATCH 1/2] support customized factory.json via IndexSpec for segment persist --- .../common/task/CompactionTaskRunTest.java | 30 ++-- .../apache/druid/segment/IndexMergerV9.java | 8 +- .../org/apache/druid/segment/IndexSpec.java | 34 ++++- .../segment/CustomSegmentizerFactoryTest.java | 139 ++++++++++++++++++ 4 files changed, 192 insertions(+), 19 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index cf2bb2a255df..2a5b02e1db17 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -19,10 +19,10 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -45,6 +45,7 @@ import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -57,6 +58,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.indexing.DataSchema; @@ -84,6 +86,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -125,19 +128,7 @@ public class CompactionTaskRunTest extends IngestionTestBase false, 0 ); - private static final CompactionState DEFAULT_COMPACTION_STATE = new CompactionState( - new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), - ImmutableMap.of( - "bitmap", - ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true), - "dimensionCompression", - "lz4", - "metricCompression", - "lz4", - "longEncoding", - "longs" - ) - ); + private static CompactionState DEFAULT_COMPACTION_STATE; private static final List TEST_ROWS = ImmutableList.of( "2014-01-01T00:00:10Z,a,1\n", @@ -195,6 +186,17 @@ public Collection fetchUsedSegmentsInDataSourceForIntervals( this.lockGranularity = lockGranularity; } + @BeforeClass + public static void setupClass() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + + DEFAULT_COMPACTION_STATE = new CompactionState( + new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), + mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class) + ); + } + @Before public void setup() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index e1a92f7d1a36..bdf0bccf672a 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -49,6 +49,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexAdapter; import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory; +import org.apache.druid.segment.loading.SegmentizerFactory; import org.apache.druid.segment.serde.ColumnPartSerde; import org.apache.druid.segment.serde.ComplexColumnPartSerde; import org.apache.druid.segment.serde.ComplexMetricSerde; @@ -152,7 +153,12 @@ private File makeIndexFiles( progress.progress(); startTime = System.currentTimeMillis(); try (FileOutputStream fos = new FileOutputStream(new File(outDir, "factory.json"))) { - mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO)); + SegmentizerFactory customSegmentLoader = indexSpec.getSegmentLoader(); + if (customSegmentLoader != null) { + mapper.writeValue(fos, customSegmentLoader); + } else { + mapper.writeValue(fos, new MMappedQueryableSegmentizerFactory(indexIO)); + } } log.debug("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java index 6edc33686f23..13101de94db0 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java @@ -21,12 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.druid.segment.data.BitmapSerde; import org.apache.druid.segment.data.BitmapSerdeFactory; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.loading.SegmentizerFactory; import javax.annotation.Nullable; import java.util.Arrays; @@ -62,13 +64,26 @@ public class IndexSpec private final CompressionStrategy metricCompression; private final CompressionFactory.LongEncodingStrategy longEncoding; + @Nullable + private final SegmentizerFactory segmentLoader; /** * Creates an IndexSpec with default parameters */ public IndexSpec() { - this(null, null, null, null); + this(null, null, null, null, null); + } + + @VisibleForTesting + public IndexSpec( + @Nullable BitmapSerdeFactory bitmapSerdeFactory, + @Nullable CompressionStrategy dimensionCompression, + @Nullable CompressionStrategy metricCompression, + @Nullable CompressionFactory.LongEncodingStrategy longEncoding + ) + { + this(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, null); } /** @@ -93,7 +108,8 @@ public IndexSpec( @JsonProperty("bitmap") @Nullable BitmapSerdeFactory bitmapSerdeFactory, @JsonProperty("dimensionCompression") @Nullable CompressionStrategy dimensionCompression, @JsonProperty("metricCompression") @Nullable CompressionStrategy metricCompression, - @JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding + @JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding, + @JsonProperty("segmentLoader") @Nullable SegmentizerFactory segmentLoader ) { Preconditions.checkArgument(dimensionCompression == null || DIMENSION_COMPRESSION.contains(dimensionCompression), @@ -111,6 +127,7 @@ public IndexSpec( this.dimensionCompression = dimensionCompression == null ? DEFAULT_DIMENSION_COMPRESSION : dimensionCompression; this.metricCompression = metricCompression == null ? DEFAULT_METRIC_COMPRESSION : metricCompression; this.longEncoding = longEncoding == null ? DEFAULT_LONG_ENCODING : longEncoding; + this.segmentLoader = segmentLoader; } @JsonProperty("bitmap") @@ -137,6 +154,13 @@ public CompressionFactory.LongEncodingStrategy getLongEncoding() return longEncoding; } + @JsonProperty + @Nullable + public SegmentizerFactory getSegmentLoader() + { + return segmentLoader; + } + @Override public boolean equals(Object o) { @@ -150,13 +174,14 @@ public boolean equals(Object o) return Objects.equals(bitmapSerdeFactory, indexSpec.bitmapSerdeFactory) && dimensionCompression == indexSpec.dimensionCompression && metricCompression == indexSpec.metricCompression && - longEncoding == indexSpec.longEncoding; + longEncoding == indexSpec.longEncoding && + Objects.equals(segmentLoader, indexSpec.segmentLoader); } @Override public int hashCode() { - return Objects.hash(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding); + return Objects.hash(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, segmentLoader); } @Override @@ -167,6 +192,7 @@ public String toString() ", dimensionCompression=" + dimensionCompression + ", metricCompression=" + metricCompression + ", longEncoding=" + longEncoding + + ", segmentLoader=" + segmentLoader + '}'; } } diff --git a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java new file mode 100644 index 000000000000..8b4a8d3f5313 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.jackson.SegmentizerModule; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentizerFactory; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class CustomSegmentizerFactoryTest extends InitializedNullHandlingTest +{ + private static ObjectMapper JSON_MAPPER; + private static IndexIO INDEX_IO; + private static IndexMerger INDEX_MERGER; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerModule(new SegmentizerModule()); + mapper.registerSubtypes(new NamedType(CustomSegmentizerFactory.class, "customSegmentFactory")); + final IndexIO indexIO = new IndexIO(mapper, () -> 0); + + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(IndexIO.class, indexIO) + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT) + ); + + JSON_MAPPER = mapper; + INDEX_IO = indexIO; + INDEX_MERGER = new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + } + + @Test + public void testDefaultSegmentizerPersist() throws IOException + { + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); + File segment = new File(temporaryFolder.newFolder(), "segment"); + File persisted = INDEX_MERGER.persist( + data, + Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"), + segment, + new IndexSpec( + null, + null, + null, + null, + null + ), + null + ); + + File factoryJson = new File(persisted, "factory.json"); + Assert.assertTrue(factoryJson.exists()); + SegmentizerFactory factory = JSON_MAPPER.readValue(factoryJson, SegmentizerFactory.class); + Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory); + } + + @Test + public void testCustomSegmentizerPersist() throws IOException + { + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); + File segment = new File(temporaryFolder.newFolder(), "segment"); + File persisted = INDEX_MERGER.persist( + data, + Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"), + segment, + new IndexSpec( + null, + null, + null, + null, + new CustomSegmentizerFactory() + ), + null + ); + + File factoryJson = new File(persisted, "factory.json"); + Assert.assertTrue(factoryJson.exists()); + SegmentizerFactory factory = JSON_MAPPER.readValue(factoryJson, SegmentizerFactory.class); + Assert.assertTrue(factory instanceof CustomSegmentizerFactory); + } + + private static class CustomSegmentizerFactory implements SegmentizerFactory + { + @Override + public Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException + { + try { + return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy), segment.getId()); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "%s", e.getMessage()); + } + } + } +} From 02babe51348ec6c2178d3b339afdedd6f4215db6 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 1 Jun 2020 13:57:23 -0700 Subject: [PATCH 2/2] equals verifier --- .../test/java/org/apache/druid/segment/IndexSpecTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java index 4e03668f68ba..e9197ac8647a 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; @@ -65,4 +66,10 @@ public void testDefaults() Assert.assertEquals(CompressionStrategy.LZ4, spec.getMetricCompression()); Assert.assertEquals(CompressionFactory.LongEncodingStrategy.LONGS, spec.getLongEncoding()); } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(IndexSpec.class).usingGetClass().verify(); + } }