diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 3e54abfce8b4..f33fc78838cb 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -423,7 +423,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| `CannotParseExternalData` | A worker task could not parse data from an external datasource. | `errorMessage`: More details on why parsing failed. |
| `ColumnNameRestricted` | The query uses a restricted column name. | `columnName`: The restricted column name. |
| `ColumnTypeNotSupported` | The column type is not supported. This can be because:
- Support for writing or reading from a particular column type is not supported.
- The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.
| `columnName`: The column name with an unsupported type.
`columnType`: The unknown column type. |
-| `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:
- Attempting to mix different granularities in the same intervals of the same datasource.
- Prior ingestions that used non-extendable shard specs.
| `dataSource`
`interval`: The interval for the attempted new segment allocation. |
+| `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:
- Attempting to mix different granularities in the same intervals of the same datasource.
- Prior ingestions that used non-extendable shard specs.
Use REPLACE to overwrite the existing data or if the error contains the `allocatedInterval` then alternatively rerun the INSERT job with the mentioned granularity to append to existing data. Note that it might not always be possible to append to the existing data using INSERT and can only be done if `allocatedInterval` is present. | `dataSource`
`interval`: The interval for the attempted new segment allocation.
`allocatedInterval`: The incorrect interval allocated by the overlord. It can be null |
| `InsertCannotBeEmpty` | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| `InsertLockPreempted` | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| `InsertTimeNull` | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.
This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index dd163d406d3a..db5e69971791 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -940,7 +940,22 @@ private List generateSegmentIdsWithShardSpecsForAppend(
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
- segmentGranularity.bucket(timestamp)
+ segmentGranularity.bucket(timestamp),
+ null
+ )
+ );
+ }
+
+ // Even if allocation isn't null, the overlord makes the best effort job of allocating a segment with the given
+ // segmentGranularity. This is commonly seen in case when there is already a coarser segment in the interval where
+ // the requested segment is present and that segment completely overlaps the request. Throw an error if the interval
+ // doesn't match the granularity requested
+ if (!IntervalUtils.isAligned(allocation.getInterval(), segmentGranularity)) {
+ throw new MSQException(
+ new InsertCannotAllocateSegmentFault(
+ task.getDataSource(),
+ segmentGranularity.bucket(timestamp),
+ allocation.getInterval()
)
);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
index 403af37d9bf0..f632ae677362 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java
@@ -23,8 +23,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.DurationGranularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Objects;
@JsonTypeName(InsertCannotAllocateSegmentFault.CODE)
@@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault
private final String dataSource;
private final Interval interval;
+ @Nullable
+ private final Interval allocatedInterval;
+
@JsonCreator
public InsertCannotAllocateSegmentFault(
@JsonProperty("dataSource") final String dataSource,
- @JsonProperty("interval") final Interval interval
+ @JsonProperty("interval") final Interval interval,
+ @Nullable @JsonProperty("allocatedInterval") final Interval allocatedInterval
)
{
- super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]", dataSource, interval);
+ super(CODE, getErrorMessage(dataSource, interval, allocatedInterval));
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
+ this.allocatedInterval = allocatedInterval;
}
@JsonProperty
@@ -58,6 +67,57 @@ public Interval getInterval()
return interval;
}
+ @Nullable
+ @JsonProperty
+ public Interval getAllocatedInterval()
+ {
+ return allocatedInterval;
+ }
+
+ private static String getErrorMessage(
+ final String dataSource,
+ final Interval interval,
+ @Nullable final Interval allocatedInterval
+ )
+ {
+ String errorMessage;
+ if (allocatedInterval == null) {
+ errorMessage = StringUtils.format(
+ "Cannot allocate segment for dataSource [%s], interval [%s]. This can happen if the prior ingestion "
+ + "uses non-extendable shard specs or if the partitioned by granularity is different from the granularity of the "
+ + "pre-existing segments. Check the granularities of the pre-existing segments or re-run the ingestion with REPLACE "
+ + "to overwrite over the existing data",
+ dataSource,
+ interval
+ );
+ } else {
+ errorMessage = StringUtils.format(
+ "Requested segment for dataSource [%s], interval [%s], but got [%s] interval instead. "
+ + "This happens when an overlapping segment is already present with a coarser granularity for the requested interval. "
+ + "Either set the partition granularity for the INSERT to [%s] to append to existing data or use REPLACE to "
+ + "overwrite over the pre-existing segment",
+ dataSource,
+ interval,
+ allocatedInterval,
+ convertIntervalToGranularityString(allocatedInterval)
+ );
+ }
+ return errorMessage;
+ }
+
+ /**
+ * Converts the given interval to a string representing the granularity which is more user-friendly.
+ */
+ private static String convertIntervalToGranularityString(final Interval interval)
+ {
+ try {
+ return GranularityType.fromPeriod(interval.toPeriod()).name();
+ }
+ catch (Exception e) {
+ return new DurationGranularity(interval.toDurationMillis(), null).toString();
+ }
+ }
+
@Override
public boolean equals(Object o)
{
@@ -71,12 +131,14 @@ public boolean equals(Object o)
return false;
}
InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault) o;
- return Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval);
+ return Objects.equals(dataSource, that.dataSource)
+ && Objects.equals(interval, that.interval)
+ && Objects.equals(allocatedInterval, that.allocatedInterval);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), dataSource, interval);
+ return Objects.hash(super.hashCode(), dataSource, interval, allocatedInterval);
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
index 43a844b5a6f1..328a0c0b74d7 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java
@@ -19,13 +19,16 @@
package org.apache.druid.msq.util;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
/**
- * Things that would make sense in {@link org.apache.druid.java.util.common.Intervals} if this were not an extension.
+ * Things that would make sense in {@link Intervals} if this were not an extension.
*/
public class IntervalUtils
{
@@ -61,4 +64,21 @@ public static List difference(final List list1, final List= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1")
+ "insert into foo1"
+ + " select __time, dim1 , count(*) as cnt"
+ + " from foo"
+ + " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ + " group by 1, 2"
+ + " PARTITIONED by day"
+ + " clustered by dim1"
+ )
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
- Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")
+ Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+ null
)
)
.verifyResults();
}
+ @Test
+ public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG).build();
+
+ // If there is some problem allocating the segment,task action client will return a null value.
+ Mockito.doReturn(new SegmentIdWithShardSpec(
+ "foo1",
+ Intervals.of("2000-01-01/2000-02-01"),
+ "test",
+ new LinearShardSpec(2)
+ )).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
+
+ testIngestQuery().setSql(
+ "insert into foo1"
+ + " select __time, dim1 , count(*) as cnt"
+ + " from foo"
+ + " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ + " group by 1, 2"
+ + " PARTITIONED by day"
+ + " clustered by dim1"
+ )
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedMSQFault(
+ new InsertCannotAllocateSegmentFault(
+ "foo1",
+ Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
+ Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z")
+ )
+ )
+ .verifyResults();
+ }
+
+
@Test
public void testInsertCannotBeEmptyFault()
{
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 484989e7badf..add533fa41bd 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -54,7 +54,12 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null));
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", ColumnType.STRING_ARRAY));
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
- assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
+ assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY, null));
+ assertFaultSerde(new InsertCannotAllocateSegmentFault(
+ "the datasource",
+ Intervals.of("2000-01-01/2002-01-01"),
+ Intervals.ETERNITY
+ ));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
index 6dd47313e7b1..03371f8b90fb 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java
@@ -19,8 +19,12 @@
package org.apache.druid.msq.util;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.Interval;
+import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@@ -84,6 +88,101 @@ public void test_difference()
);
}
+ @Test
+ public void test_doesIntervalMatchesGranularity_withStandardGranularities()
+ {
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL)
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER
+ )
+ );
+
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH
+ )
+ );
+
+ // With the way WEEK granularities work, this needs to be aligned to an actual week
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"), Granularities.EIGHT_HOUR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"), Granularities.HOUR
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"), Granularities.MINUTE
+ )
+ );
+
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"), Granularities.SECOND
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR
+ )
+ );
+ }
+
+ @Test
+ public void test_doesIntervalMatchesGranularity_withPeriodGranularity()
+ {
+ Assert.assertTrue(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-04"),
+ new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-01"), null)
+ )
+ );
+
+ Assert.assertFalse(
+ IntervalUtils.isAligned(
+ Intervals.of("2000-01-01/2000-01-04"),
+ new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-02"), null)
+ )
+ );
+ }
+
public static List intervals(final String... intervals)
{
return Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList());
diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
index 812538ae6e05..6f5d9fb61e1b 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java
@@ -145,7 +145,8 @@ public static List granularitiesFinerThan(final Granularity gran0)
public abstract DateTime toDate(String filePath, Formatter formatter);
/**
- * Return true if time chunks populated by this granularity includes the given interval time chunk.
+ * Return true only if the time chunks populated by this granularity includes the given interval time chunk. The
+ * interval must fit exactly into the scheme of the granularity for this to return true
*/
public abstract boolean isAligned(Interval interval);