From ac366219d5813f7b45b182079a342853281e6235 Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Mon, 5 Oct 2020 16:12:34 +0300 Subject: [PATCH] Move common configurations to TuningConfig * Move common methods that are used in HadoopTuningConfig and in AppenderatorConfig to TuningConfig * Rename rowFlushBoundary in HadoopTuningConfig to maxRowsInMemory to match TuningConfig API --- .../MaterializedViewSupervisorSpec.java | 4 +-- .../druid/indexer/HadoopTuningConfig.java | 25 +++++++++++-------- .../druid/indexer/IndexGeneratorJob.java | 2 +- .../druid/indexer/HadoopTuningConfigTest.java | 2 +- .../druid/segment/indexing/TuningConfig.java | 13 ++++++++++ .../appenderator/AppenderatorConfig.java | 13 ---------- 6 files changed, 31 insertions(+), 28 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 5388388498f2..db63a7316d1f 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -182,7 +182,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); - private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final boolean DEFAULT_USE_COMBINER = false; private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0; @@ -58,7 +57,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_INDEX_SPEC, DEFAULT_INDEX_SPEC, DEFAULT_APPENDABLE_INDEX, - DEFAULT_ROW_FLUSH_BOUNDARY, + DEFAULT_MAX_ROWS_IN_MEMORY, 0L, false, true, @@ -86,7 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final IndexSpec indexSpec; private final IndexSpec indexSpecForIntermediatePersists; private final AppendableIndexSpec appendableIndexSpec; - private final int rowFlushBoundary; + private final int maxRowsInMemory; private final long maxBytesInMemory; private final boolean leaveIntermediate; private final boolean cleanupOnFailure; @@ -141,8 +140,8 @@ public HadoopTuningConfig( this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? this.indexSpec : indexSpecForIntermediatePersists; - this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null - ? DEFAULT_ROW_FLUSH_BOUNDARY + this.maxRowsInMemory = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null + ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemoryCOMPAT : maxRowsInMemory; this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; // initializing this to 0, it will be lazily initialized to a value @@ -192,6 +191,7 @@ public String getVersion() return version; } + @Override @JsonProperty public DimensionBasedPartitionsSpec getPartitionsSpec() { @@ -204,12 +204,14 @@ public Map> getShardSpecs() return shardSpecs; } + @Override @JsonProperty public IndexSpec getIndexSpec() { return indexSpec; } + @Override @JsonProperty public IndexSpec getIndexSpecForIntermediatePersists() { @@ -223,10 +225,11 @@ public AppendableIndexSpec getAppendableIndexSpec() return appendableIndexSpec; } - @JsonProperty("maxRowsInMemory") - public int getRowFlushBoundary() + @Override + @JsonProperty + public int getMaxRowsInMemory() { - return rowFlushBoundary; + return maxRowsInMemory; } @JsonProperty @@ -341,7 +344,7 @@ public HadoopTuningConfig withWorkingPath(String path) indexSpec, indexSpecForIntermediatePersists, appendableIndexSpec, - rowFlushBoundary, + maxRowsInMemory, maxBytesInMemory, leaveIntermediate, cleanupOnFailure, @@ -372,7 +375,7 @@ public HadoopTuningConfig withVersion(String ver) indexSpec, indexSpecForIntermediatePersists, appendableIndexSpec, - rowFlushBoundary, + maxRowsInMemory, maxBytesInMemory, leaveIntermediate, cleanupOnFailure, @@ -403,7 +406,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs indexSpec, indexSpecForIntermediatePersists, appendableIndexSpec, - rowFlushBoundary, + maxRowsInMemory, maxBytesInMemory, leaveIntermediate, cleanupOnFailure, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index abc0b3bc96f5..be72da01bb10 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -304,7 +304,7 @@ private static IncrementalIndex makeIncrementalIndex( // Build the incremental-index according to the spec that was chosen by the user IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder() .setIndexSchema(indexSchema) - .setMaxRowCount(tuningConfig.getRowFlushBoundary()) + .setMaxRowCount(tuningConfig.getMaxRowsInMemory()) .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault()) .build(); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index fce828b64be1..277f590cecc7 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -75,7 +75,7 @@ public void testSerde() throws Exception Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists()); - Assert.assertEquals(100, actual.getRowFlushBoundary()); + Assert.assertEquals(100, actual.getMaxRowsInMemory()); Assert.assertEquals(true, actual.isLeaveIntermediate()); Assert.assertEquals(true, actual.isCleanupOnFailure()); Assert.assertEquals(true, actual.isOverwriteFiles()); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java index e3a4e1fadd05..760494e8b703 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; @@ -43,6 +45,11 @@ public interface TuningConfig */ AppendableIndexSpec getAppendableIndexSpec(); + /** + * Maximum number of rows in memory before persisting to local storage + */ + int getMaxRowsInMemory(); + /** * Maximum number of bytes (estimated) to store in memory before persisting to local storage */ @@ -66,4 +73,10 @@ default long getMaxBytesInMemoryOrDefault() return Long.MAX_VALUE; } } + + PartitionsSpec getPartitionsSpec(); + + IndexSpec getIndexSpec(); + + IndexSpec getIndexSpecForIntermediatePersists(); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 259e58c400a6..3fc8164cdf18 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -19,8 +19,6 @@ package org.apache.druid.segment.realtime.appenderator; -import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -32,11 +30,6 @@ public interface AppenderatorConfig extends TuningConfig { boolean isReportParseExceptions(); - /** - * Maximum number of rows in memory before persisting to local storage - */ - int getMaxRowsInMemory(); - int getMaxPendingPersists(); /** @@ -57,17 +50,11 @@ default Long getMaxTotalRows() throw new UnsupportedOperationException("maxTotalRows is not implemented."); } - PartitionsSpec getPartitionsSpec(); - /** * Period that sets frequency to persist to local storage if no other thresholds are met */ Period getIntermediatePersistPeriod(); - IndexSpec getIndexSpec(); - - IndexSpec getIndexSpecForIntermediatePersists(); - File getBasePersistDirectory(); AppenderatorConfig withBasePersistDirectory(File basePersistDirectory);