diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 871486657164..19397787a854 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -272,6 +272,11 @@
assertj-core
test
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ test
+
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
index a2e6dd0c476d..a7aea29a2846 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
@@ -25,6 +25,7 @@
import org.joda.time.Interval;
import java.util.Map;
+import java.util.Objects;
public class DimensionDistributionReport implements SubTaskReport
{
@@ -65,4 +66,24 @@ public String toString()
", intervalToDistribution=" + intervalToDistribution +
'}';
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DimensionDistributionReport that = (DimensionDistributionReport) o;
+ return Objects.equals(taskId, that.taskId) &&
+ Objects.equals(intervalToDistribution, that.intervalToDistribution);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(taskId, intervalToDistribution);
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index bba16cc46628..dc4dbe1e06a8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Comparator;
+import java.util.Objects;
/**
* Counts approximate frequencies of strings.
@@ -137,6 +138,40 @@ public String toString()
'}';
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StringSketch that = (StringSketch) o;
+
+ // ParallelIndexPhaseRunner.collectReport() uses equals() to check subtasks send identical reports if they retry.
+ // However, ItemsSketch does not override equals(): https://github.com/apache/incubator-datasketches-java/issues/140
+ //
+ // Since ItemsSketch has built-in non-determinism, only rely on ItemsSketch properties that are deterministic. This
+ // check is best-effort as it is possible for it to return true for sketches that contain different values.
+ return delegate.getK() == that.delegate.getK() &&
+ delegate.getN() == that.delegate.getN() &&
+ Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) &&
+ Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // See comment in equals() regarding ItemsSketch.
+ return Objects.hash(
+ delegate.getK(),
+ delegate.getN(),
+ delegate.getMaxValue(),
+ delegate.getMinValue()
+ );
+ }
+
ItemsSketch getDelegate()
{
return delegate;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 880d8065e174..a7c4396b780f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -107,7 +107,8 @@ Set runTestTask(
Interval interval,
File inputDir,
String filter,
- DimensionBasedPartitionsSpec partitionsSpec
+ DimensionBasedPartitionsSpec partitionsSpec,
+ int maxNumConcurrentSubTasks
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
@@ -115,7 +116,8 @@ Set runTestTask(
interval,
inputDir,
filter,
- partitionsSpec
+ partitionsSpec,
+ maxNumConcurrentSubTasks
);
actionClient = createActionClient(task);
@@ -137,7 +139,8 @@ private ParallelIndexSupervisorTask newTask(
Interval interval,
File inputDir,
String filter,
- DimensionBasedPartitionsSpec partitionsSpec
+ DimensionBasedPartitionsSpec partitionsSpec,
+ int maxNumConcurrentSubTasks
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
@@ -163,7 +166,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
- 2,
+ maxNumConcurrentSubTasks,
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
index c23362f3e9c3..e82041be814c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.Intervals;
@@ -52,4 +53,12 @@ public void serializesDeserializes()
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}
+
+ @Test
+ public void abidesEqualsContract()
+ {
+ EqualsVerifier.forClass(DimensionDistributionReport.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index dffd9d52e8b2..7219f16ef88b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
false,
0
);
+ private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable