Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4dc9c5b
Support segmentGranularity for auto-compaction
maytasm Jan 26, 2021
43df955
Support segmentGranularity for auto-compaction
maytasm Jan 26, 2021
77d953c
Support segmentGranularity for auto-compaction
maytasm Jan 26, 2021
064a43f
Support segmentGranularity for auto-compaction
maytasm Feb 3, 2021
4182bd5
resolve conflict
maytasm Feb 3, 2021
7c6d17a
resolve conflict
maytasm Feb 3, 2021
7786851
Support segmentGranularity for auto-compaction
maytasm Feb 4, 2021
ac12903
Support segmentGranularity for auto-compaction
maytasm Feb 4, 2021
4994ca7
fix tests
maytasm Feb 4, 2021
46e3416
fix more tests
maytasm Feb 4, 2021
9a0fb8b
fix checkstyle
maytasm Feb 4, 2021
5a22162
add unit tests
maytasm Feb 5, 2021
092868f
fix checkstyle
maytasm Feb 5, 2021
ace4281
fix checkstyle
maytasm Feb 5, 2021
5873318
fix checkstyle
maytasm Feb 5, 2021
a09deea
add unit tests
maytasm Feb 5, 2021
9ed2c61
add integration tests
maytasm Feb 5, 2021
020566a
fix checkstyle
maytasm Feb 5, 2021
4f031b6
fix checkstyle
maytasm Feb 5, 2021
2dfdcd4
fix failing tests
maytasm Feb 5, 2021
ad11616
address comments
maytasm Feb 6, 2021
94ee532
address comments
maytasm Feb 6, 2021
343a07b
fix tests
maytasm Feb 10, 2021
a506287
fix tests
maytasm Feb 10, 2021
da018dc
fix test
maytasm Feb 10, 2021
5dd9e50
fix test
maytasm Feb 10, 2021
d9783b4
fix test
maytasm Feb 10, 2021
4405fc6
fix test
maytasm Feb 10, 2021
06060f9
fix test
maytasm Feb 10, 2021
e6e9030
fix test
maytasm Feb 10, 2021
578d433
fix test
maytasm Feb 11, 2021
7784f70
fix test
maytasm Feb 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void setup()
null,
null,
null,
null,
null
)
);
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/org/apache/druid/timeline/CompactionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,20 @@ public class CompactionState
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;
// org.apache.druid.segment.indexing.granularity.GranularitySpec cannot be used here because it's in the
// 'server' module which has a dependency on the 'core' module where this class is.
private final Map<String, Object> granularitySpec;

@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@JsonProperty("indexSpec") Map<String, Object> indexSpec
@JsonProperty("indexSpec") Map<String, Object> indexSpec,
@JsonProperty("granularitySpec") Map<String, Object> granularitySpec
)
{
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec;
this.granularitySpec = granularitySpec;
}

@JsonProperty
Expand All @@ -66,6 +71,12 @@ public Map<String, Object> getIndexSpec()
return indexSpec;
}

@JsonProperty
public Map<String, Object> getGranularitySpec()
{
return granularitySpec;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -77,13 +88,14 @@ public boolean equals(Object o)
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(indexSpec, that.indexSpec);
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(granularitySpec, that.granularitySpec);
}

@Override
public int hashCode()
{
return Objects.hash(partitionsSpec, indexSpec);
return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
}

@Override
Expand All @@ -92,6 +104,7 @@ public String toString()
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
", granularitySpec=" + granularitySpec +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public interface PartitionChunk<T> extends Comparable<PartitionChunk<T>>
* Returns true if this chunk is the end of the partition. Most commonly, that means it represents the range
* [X, infinity] for some concrete X.
*
* @return true if the chunk is the beginning of the partition
* @return true if the chunk is the end of the partition
*/
boolean isEnd();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void testV1Serialization() throws Exception
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
ImmutableMap.of(),
ImmutableMap.of()
),
TEST_VERSION,
Expand Down Expand Up @@ -231,7 +232,8 @@ public void testWithLastCompactionState()
{
final CompactionState compactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
Collections.singletonMap("test", "map")
Collections.singletonMap("test", "map"),
Collections.singletonMap("test2", "map2")
);
final DataSegment segment1 = DataSegment.builder()
.dataSource("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,14 @@ public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConf
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
boolean storeCompactionState,
TaskToolbox toolbox,
IndexTuningConfig tuningConfig
IndexTuningConfig tuningConfig,
GranularitySpec granularitySpec
)
{
if (storeCompactionState) {
final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap);
final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final GranularitySpec granularitySpec;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this since it is no longer used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is no longer used?

@Nullable
private final ParallelIndexTuningConfig tuningConfig;
@JsonIgnore
private final SegmentProvider segmentProvider;
Expand Down Expand Up @@ -172,7 +174,8 @@ public CompactionTask(
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("segmentGranularity") @Deprecated @Nullable final Granularity segmentGranularity,
@JsonProperty("granularitySpec") @Nullable final GranularitySpec granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
Expand Down Expand Up @@ -202,6 +205,16 @@ public CompactionTask(
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new UniformGranularitySpec(
segmentGranularity,
null,
null,
null
);
} else {
this.granularitySpec = granularitySpec;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if both segmentGranularity & granularitySpec are non-null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

granularitySpec takes priority since segmentGranularity is deprecated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we have a consistent stance on this but I think several places throw an exception if both the deprecated and new property are specified (such as Checks.checkOneNotNullOrEmpty usages).

I don't feel strongly about it, so I'm just mentioning this in case you feel that's a better approach.

}
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
Expand Down Expand Up @@ -288,7 +301,14 @@ public AggregatorFactory[] getMetricsSpec()
@Override
public Granularity getSegmentGranularity()
{
return segmentGranularity;
return granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
}

@JsonProperty
@Nullable
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
}

@Nullable
Expand Down Expand Up @@ -348,7 +368,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
segmentGranularity,
getSegmentGranularity(),
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
retryPolicyFactory
Expand Down Expand Up @@ -892,6 +912,8 @@ public static class Builder
@Nullable
private Granularity segmentGranularity;
@Nullable
private GranularitySpec granularitySpec;
@Nullable
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
Expand Down Expand Up @@ -941,6 +963,12 @@ public Builder segmentGranularity(Granularity segmentGranularity)
return this;
}

public Builder granularitySpec(GranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
return this;
}

public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
Expand All @@ -966,6 +994,7 @@ public CompactionTask build()
dimensionsSpec,
metricsSpec,
segmentGranularity,
granularitySpec,
tuningConfig,
context,
segmentLoaderFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ private TaskStatus generateAndPublishSegments(
compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
ingestionSchema.getTuningConfig()
ingestionSchema.getTuningConfig(),
ingestionSchema.getDataSchema().getGranularitySpec()
);

// Probably we can publish atomicUpdateGroup along with segments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,8 @@ private void publishSegments(
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
ingestionSchema.getTuningConfig()
ingestionSchema.getTuningConfig(),
ingestionSchema.getDataSchema().getGranularitySpec()
);
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient;
Expand All @@ -45,11 +46,13 @@
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
Expand Down Expand Up @@ -113,6 +116,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
1000,
100
),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
ImmutableMap.of("key", "value")
);

Expand Down Expand Up @@ -186,6 +190,18 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException
query.getTuningConfig().getTotalNumMergeTasks().intValue(),
task.getTuningConfig().getTotalNumMergeTasks()
);
Assert.assertEquals(
query.getGranularitySpec().getQueryGranularity(),
task.getGranularitySpec().getQueryGranularity()
);
Assert.assertEquals(
query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(
query.getGranularitySpec().isRollup(),
task.getGranularitySpec().isRollup()
);
Assert.assertEquals(query.getContext(), task.getContext());
}

Expand Down Expand Up @@ -243,6 +259,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
null
)
)
.granularitySpec(new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, null))
.build();

final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
Expand Down Expand Up @@ -284,6 +301,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
1000,
100
),
new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
new HashMap<>()
);

Expand Down
Loading