Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
Expand Down Expand Up @@ -177,7 +178,7 @@ public class CachingClusteredClientBenchmark
new Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), JSON_MAPPER)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());

@Param("100")
private int numDataSources;
Expand All @@ -72,7 +73,7 @@ public class NewestSegmentFirstPolicyBenchmark
private int numPartitionsPerDayInterval;

@Param("800000000")
private long targetCompactionSizeBytes;
private long inputSegmentSizeBytes;

@Param("1000000")
private long segmentSizeBytes;
Expand All @@ -94,8 +95,7 @@ public void setup()
new DataSourceCompactionConfig(
dataSource,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
inputSegmentSizeBytes,
null,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
*/
public class DynamicPartitionsSpec implements PartitionsSpec
{
/**
* Default maxTotalRows for most task types except compaction task.
*/
public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
static final String NAME = "dynamic";

private final int maxRowsPerSegment;
private final long maxTotalRows;
@Nullable
private final Long maxTotalRows;

@JsonCreator
public DynamicPartitionsSpec(
Expand All @@ -45,7 +49,7 @@ public DynamicPartitionsSpec(
this.maxRowsPerSegment = PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)
? DEFAULT_MAX_ROWS_PER_SEGMENT
: maxRowsPerSegment;
this.maxTotalRows = PartitionsSpec.isEffectivelyNull(maxTotalRows) ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
this.maxTotalRows = maxTotalRows;
}

@Override
Expand All @@ -55,12 +59,22 @@ public Integer getMaxRowsPerSegment()
return maxRowsPerSegment;
}

@Nullable
@JsonProperty
public long getMaxTotalRows()
public Long getMaxTotalRows()
{
return maxTotalRows;
}

/**
* Get the given maxTotalRows or the default.
* The default can be different depending on the caller.
*/
public long getMaxTotalRowsOr(long defaultMaxTotalRows)
{
return PartitionsSpec.isEffectivelyNull(maxTotalRows) ? defaultMaxTotalRows : maxTotalRows;
}

@Override
public boolean needsDeterminePartitions(boolean useForHadoopTask)
{
Expand All @@ -78,7 +92,7 @@ public boolean equals(Object o)
}
DynamicPartitionsSpec that = (DynamicPartitionsSpec) o;
return maxRowsPerSegment == that.maxRowsPerSegment &&
maxTotalRows == that.maxTotalRows;
Objects.equals(maxTotalRows, that.maxTotalRows);
}

@Override
Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/CompactionState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.PartitionsSpec;

import java.util.Map;
import java.util.Objects;

/**
* This class describes what compaction task spec was used to create a given segment.
* The compaction task is a task that reads Druid segments and overwrites them with new ones. Since this task always
* reads segments in the same order, the same task spec will always create the same set of segments
* (not same segment ID, but same content).
*
* Note that this class doesn't include all fields in the compaction task spec. Only the configurations that can
* affect the content of segment should be included.
*
* @see DataSegment#lastCompactionState
*/
public class CompactionState
Copy link
Copy Markdown
Contributor

@himanshug himanshug Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add javadoc for this class describing what this is, why it has the field it does ...(I know there is some discussion in proposal, but it would be very non-obvious for someone reading the code) and what guarantees it provides ... e.g. something like if a CompactionTask is run with parameters matching here then row distribution in segments created would be exactly same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc. Please take a look if it's enough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

{
private final PartitionsSpec partitionsSpec;
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;

@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@JsonProperty("indexSpec") Map<String, Object> indexSpec
)
{
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec;
}

@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}

@JsonProperty
public Map<String, Object> getIndexSpec()
{
return indexSpec;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
Objects.equals(indexSpec, that.indexSpec);
}

@Override
public int hashCode()
{
return Objects.hash(partitionsSpec, indexSpec);
}

@Override
public String toString()
{
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
'}';
}
}
Loading