Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_CannotParseExternalData">`CannotParseExternalData`</a> | A worker task could not parse data from an external datasource. | `errorMessage`: More details on why parsing failed. |
| <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query uses a restricted column name. | `columnName`: The restricted column name. |
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The column type is not supported. This can be because:<br /> <br /><ul><li>Support for writing or reading from a particular column type is not supported.</li><li>The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.</li></ul> | `columnName`: The column name with an unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if the error contains the `allocatedInterval` then alternatively rerun the INSERT job with the mentioned granularity to append to existing data. Note that it might not always be possible to append to the existing data using INSERT and can only be done if `allocatedInterval` is present. | `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. <br /> <br /> `allocatedInterval`: The incorrect interval allocated by the overlord. It can be null |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,22 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
segmentGranularity.bucket(timestamp)
segmentGranularity.bucket(timestamp),
null
)
);
}

// Even if allocation isn't null, the overlord makes the best effort job of allocating a segment with the given
// segmentGranularity. This is commonly seen in case when there is already a coarser segment in the interval where
// the requested segment is present and that segment completely overlaps the request. Throw an error if the interval
// doesn't match the granularity requested
if (!IntervalUtils.isAligned(allocation.getInterval(), segmentGranularity)) {
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
segmentGranularity.bucket(timestamp),
allocation.getInterval()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.DurationGranularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Objects;

@JsonTypeName(InsertCannotAllocateSegmentFault.CODE)
Expand All @@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault
private final String dataSource;
private final Interval interval;

@Nullable
private final Interval allocatedInterval;

@JsonCreator
public InsertCannotAllocateSegmentFault(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") final Interval interval
@JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("allocatedInterval") final Interval allocatedInterval
)
{
super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]", dataSource, interval);
super(CODE, getErrorMessage(dataSource, interval, allocatedInterval));
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.allocatedInterval = allocatedInterval;
}

@JsonProperty
Expand All @@ -58,6 +67,57 @@ public Interval getInterval()
return interval;
}

@Nullable
@JsonProperty
public Interval getAllocatedInterval()
{
return allocatedInterval;
}

private static String getErrorMessage(
final String dataSource,
final Interval interval,
@Nullable final Interval allocatedInterval
)
{
String errorMessage;
if (allocatedInterval == null) {
errorMessage = StringUtils.format(
"Cannot allocate segment for dataSource [%s], interval [%s]. This can happen if the prior ingestion "
+ "uses non-extendable shard specs or if the partitioned by granularity is different from the granularity of the "
+ "pre-existing segments. Check the granularities of the pre-existing segments or re-run the ingestion with REPLACE "
+ "to overwrite over the existing data",
dataSource,
interval
);
} else {
errorMessage = StringUtils.format(
"Requested segment for dataSource [%s], interval [%s], but got [%s] interval instead. "
+ "This happens when an overlapping segment is already present with a coarser granularity for the requested interval. "
+ "Either set the partition granularity for the INSERT to [%s] to append to existing data or use REPLACE to "
+ "overwrite over the pre-existing segment",
dataSource,
interval,
allocatedInterval,
convertIntervalToGranularityString(allocatedInterval)
);
}
return errorMessage;
}

/**
* Converts the given interval to a string representing the granularity which is more user-friendly.
*/
private static String convertIntervalToGranularityString(final Interval interval)
{
try {
return GranularityType.fromPeriod(interval.toPeriod()).name();
}
catch (Exception e) {
return new DurationGranularity(interval.toDurationMillis(), null).toString();
}
}

@Override
public boolean equals(Object o)
{
Expand All @@ -71,12 +131,14 @@ public boolean equals(Object o)
return false;
}
InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault) o;
return Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval);
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(allocatedInterval, that.allocatedInterval);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), dataSource, interval);
return Objects.hash(super.hashCode(), dataSource, interval, allocatedInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.apache.druid.msq.util;

import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.List;

/**
* Things that would make sense in {@link org.apache.druid.java.util.common.Intervals} if this were not an extension.
* Things that would make sense in {@link Intervals} if this were not an extension.
*/
public class IntervalUtils
{
Expand Down Expand Up @@ -61,4 +64,21 @@ public static List<Interval> difference(final List<Interval> list1, final List<I

return retVal;
}

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add this method to Intervals class instead, as it could have uses outside the MSQ extension too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left this method in the interval utils since the top-level Javadoc for IntervalUtils mentions that the methods are specific to MSQ for now, and would make more sense to move to Intervals if MSQ wasn't an extension

* This method checks if the provided interval is aligned by the granularity or is an instance of {@link Intervals#ETERNITY}
* This is used to check if the granularity allocation made by the overlord is the same as the one requested in the
* SQL query
*/
public static boolean isAligned(
final Interval interval,
final Granularity granularity
)
{
// AllGranularity needs special handling since AllGranularity#bucketStart always returns false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could fix AllGranularity.isAligned() to return true when the interval is eternity.

Copy link
Copy Markdown
Contributor Author

@LakshSingla LakshSingla Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not confident if that would break any pre-existing logic. Also, Javadoc states:

  /**
   * No interval is aligned with all granularity since it's infinite.
   */

Therefore I found suitable to extract the logic as a helper method

if (granularity instanceof AllGranularity) {
return Intervals.isEternity(interval);
}
return granularity.isAligned(interval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Test;
import org.mockito.Mockito;

Expand All @@ -51,7 +53,7 @@
public class MSQFaultsTest extends MSQTestBase
{
@Test
public void testInsertCannotAllocateSegmentFault()
public void testInsertCannotAllocateSegmentFaultWhenNullAllocation()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
Expand All @@ -62,18 +64,64 @@ public void testInsertCannotAllocateSegmentFault()
Mockito.doReturn(null).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));

testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1")
"insert into foo1"
+ " select __time, dim1 , count(*) as cnt"
+ " from foo"
+ " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ " group by 1, 2"
+ " PARTITIONED by day"
+ " clustered by dim1"
)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
null
)
)
.verifyResults();
}

@Test
public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();

// If there is some problem allocating the segment,task action client will return a null value.
Mockito.doReturn(new SegmentIdWithShardSpec(
"foo1",
Intervals.of("2000-01-01/2000-02-01"),
"test",
new LinearShardSpec(2)
)).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));

testIngestQuery().setSql(
"insert into foo1"
+ " select __time, dim1 , count(*) as cnt"
+ " from foo"
+ " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ " group by 1, 2"
+ " PARTITIONED by day"
+ " clustered by dim1"
)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z")
)
)
.verifyResults();
}


@Test
public void testInsertCannotBeEmptyFault()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null));
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", ColumnType.STRING_ARRAY));
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY, null));
assertFaultSerde(new InsertCannotAllocateSegmentFault(
"the datasource",
Intervals.of("2000-01-01/2002-01-01"),
Intervals.ETERNITY
));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.apache.druid.msq.util;

import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -84,6 +88,101 @@ public void test_difference()
);
}

@Test
public void test_doesIntervalMatchesGranularity_withStandardGranularities()
{

Assert.assertTrue(
IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER
)
);


Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH
)
);

// With the way WEEK granularities work, this needs to be aligned to an actual week
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"), Granularities.EIGHT_HOUR
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"), Granularities.HOUR
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"), Granularities.MINUTE
)
);

Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"), Granularities.SECOND
)
);

Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR
)
);

Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR
)
);
}

@Test
public void test_doesIntervalMatchesGranularity_withPeriodGranularity()
{
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-04"),
new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-01"), null)
)
);

Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-04"),
new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-02"), null)
)
);
}

public static List<Interval> intervals(final String... intervals)
{
return Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList());
Expand Down
Loading