Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The column type is not supported. This can be because:<br /> <br /><ul><li>Support for writing or reading from a particular column type is not supported.</li><li>The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.</li></ul> | `columnName`: The column name with an unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
Expand All @@ -449,3 +448,4 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | Deprecated. An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. The query returns a `ValidationException` instead of the fault. | `columnName` |
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
Expand Down Expand Up @@ -1843,10 +1842,6 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment.
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
if (clusterByColumn.order() == KeyOrder.DESCENDING) {
throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName()));
}

final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
for (final int outputColumn : outputColumns) {
outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
Expand Down Expand Up @@ -113,7 +112,6 @@ public class MSQIndexingModule implements DruidModule
DurableStorageConfigurationFault.class,
InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class,
InsertCannotOrderByDescendingFault.class,
InsertLockPreemptedFault.class,
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
Expand Down Expand Up @@ -92,23 +91,6 @@ public void testInsertCannotBeEmptyFault()
.verifyResults();
}

@Test
public void testInsertCannotOrderByDescendingFault()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();

// Add an DESC clustered by column, which should not be allowed
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '2000-01-02 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1 DESC")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(new InsertCannotOrderByDescendingFault("d1"))
.verifyResults();
}

@Test
public void testInsertTimeOutOfBoundsFault()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,25 @@ public void testRollUpOnFoo1WithTimeFunction()

}

@Test
public void testInsertWithClusteredByDescendingThrowsException()
{
// Add a DESC clustered by column, which should not be allowed
testIngestQuery().setSql("INSERT INTO foo1 "
+ "SELECT __time, dim1 , count(*) as cnt "
+ "FROM foo "
+ "GROUP BY 1, 2"
+ "PARTITIONED BY DAY "
+ "CLUSTERED BY dim1 DESC"
)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."))
))
.verifyPlanningErrors();
}

@Test
public void testRollUpOnFoo1WithTimeFunctionComplexCol()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public void testReplaceTimeChunksLargerThanData()
}

@Test
public void testInsertOnFoo1Range()
public void testReplaceOnFoo1Range()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
Expand Down Expand Up @@ -731,6 +731,25 @@ public void testReplaceSegmentsInsertIntoNewTable()
.verifyResults();
}

@Test
public void testReplaceWithClusteredByDescendingThrowsException()
{
// Add a DESC clustered by column, which should not be allowed
testIngestQuery().setSql(" REPLACE INTO foobar "
+ "OVERWRITE ALL "
+ "SELECT __time, m1, m2 "
+ "FROM foo "
+ "PARTITIONED BY ALL TIME "
+ "CLUSTERED BY m2, m1 DESC"
)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"[`m1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."))
))
.verifyPlanningErrors();
}

@Test
public void testReplaceTombstonesOverPartiallyOverlappingSegments()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(new InsertCannotOrderByDescendingFault("the column"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
Expand Down Expand Up @@ -256,19 +257,22 @@ public static List<String> validateQueryAndConvertToIntervals(
* @param query sql query
* @param clusteredByList List of clustered by columns
* @return SqlOrderBy node containing the clusteredByList information
* @throws ValidationException if any of the clustered by columns contain DESCENDING order.
*/
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList)
throws ValidationException
{
validateClusteredByColumns(clusteredByList);
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new
// SqlOrderBy node
SqlNode offset = null;
SqlNode fetch = null;

if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
// BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
// query represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo GROUP BY dim1 ORDER BY dim1 FETCH 30 OFFSET 10",
// this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
Expand All @@ -283,6 +287,29 @@ public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList cl
);
}

/**
* Validates the clustered by columns to ensure that it does not contain DESCENDING order columns.
*
* @param clusteredByNodes List of SqlNodes representing columns to be clustered by.
* @throws ValidationException if any of the clustered by columns contain DESCENDING order.
*/
public static void validateClusteredByColumns(final SqlNodeList clusteredByNodes) throws ValidationException
{
if (clusteredByNodes == null) {
return;
}

for (final SqlNode clusteredByNode : clusteredByNodes.getList()) {
if (clusteredByNode.isA(ImmutableSet.of(SqlKind.DESCENDING))) {
throw new ValidationException(
StringUtils.format("[%s] is invalid."
+ " CLUSTERED BY columns cannot be sorted in descending order.", clusteredByNode.toString()
)
);
}
}
}

/**
* This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query.
* It takes the timezone as a separate parameter, as Sql timestamps don't contain that information. Supported functions
Expand Down
Loading