Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.joda.time.Interval;

import java.util.Map;
import java.util.Objects;

public class DimensionDistributionReport implements SubTaskReport
{
Expand Down Expand Up @@ -65,4 +66,24 @@ public String toString()
", intervalToDistribution=" + intervalToDistribution +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionDistributionReport that = (DimensionDistributionReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(intervalToDistribution, that.intervalToDistribution);
}

@Override
public int hashCode()
{
return Objects.hash(taskId, intervalToDistribution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;

/**
* Counts approximate frequencies of strings.
Expand Down Expand Up @@ -137,6 +138,40 @@ public String toString()
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringSketch that = (StringSketch) o;

// ParallelIndexPhaseRunner.collectReport() uses equals() to check subtasks send identical reports if they retry.
// However, ItemsSketch does not override equals(): https://github.com/apache/incubator-datasketches-java/issues/140
//
// Since ItemsSketch has built-in non-determinism, only rely on ItemsSketch properties that are deterministic. This
// check is best-effort as it is possible for it to return true for sketches that contain different values.
return delegate.getK() == that.delegate.getK() &&
delegate.getN() == that.delegate.getN() &&
Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) &&
Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
}

@Override
public int hashCode()
{
// See comment in equals() regarding ItemsSketch.
return Objects.hash(
delegate.getK(),
delegate.getN(),
delegate.getMaxValue(),
delegate.getMinValue()
);
}

ItemsSketch<String> getDelegate()
{
return delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ Set<DataSegment> runTestTask(
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
parseSpec,
interval,
inputDir,
filter,
partitionsSpec
partitionsSpec,
maxNumConcurrentSubTasks
);

actionClient = createActionClient(task);
Expand All @@ -137,7 +139,8 @@ private ParallelIndexSupervisorTask newTask(
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
Expand All @@ -163,7 +166,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
2,
maxNumConcurrentSubTasks,
null,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -52,4 +53,12 @@ public void serializesDeserializes()
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}

@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(DimensionDistributionReport.class)
.usingGetClass()
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
false,
0
);
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;

@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
Expand Down Expand Up @@ -129,7 +130,8 @@ public void testRun() throws Exception
Intervals.of("2017/2018"),
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"))
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS
);
assertHashedPartition(publishedSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
0
);

@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, true}
new Object[]{LockGranularity.TIME_CHUNK, false, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor
);
}

private File inputDir;
private SetMultimap<Interval, String> intervalToDim1;

public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
private final int maxNumConcurrentSubTasks;

public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}

@Override
Expand Down Expand Up @@ -169,7 +177,8 @@ public void createsCorrectRangePartitions() throws Exception
null,
DIM1,
false
)
),
maxNumConcurrentSubTasks
);
assertRangePartitions(publishedSegments);
}
Expand Down Expand Up @@ -362,7 +371,6 @@ private TestPartialRangeSegmentGenerateRunner(
}
}


private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
extends PartialGenericSegmentMergeParallelIndexTaskRunner
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution;

import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -69,6 +70,15 @@ public void serializesDeserializes()
target.put(MAX_STRING);
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}

@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(StringSketch.class)
.usingGetClass()
.withNonnullFields("delegate")
.verify();
}
}

public static class PutTest
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<version>3.1.10</version>
<version>3.1.11</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down