Skip to content

Segments sorted by non-time columns.#16849

Merged
gianm merged 29 commits intoapache:masterfrom
gianm:segment-explicit-sort-order
Aug 23, 2024
Merged

Segments sorted by non-time columns.#16849
gianm merged 29 commits intoapache:masterfrom
gianm:segment-explicit-sort-order

Conversation

@gianm
Copy link
Copy Markdown
Contributor

@gianm gianm commented Aug 6, 2024

Summary

Currently, segments are always sorted by __time, followed by the sort order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting by __time enables efficient execution of queries involving time-ordering or granularity. Time-ordering is a simple matter of reading the rows in stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and query performance to sort by arbitrary orders that do not start with __time. With this patch, users can sort segments by such orders.

API

For spec-based ingestion, users add forceSegmentSortByTime: false to dimensionsSpec. The dimensions list determines the sort order. To define a sort order that includes __time, users explicitly include a dimension named __time.

For SQL-based ingestion, users set the context parameter forceSegmentSortByTime: false. The CLUSTERED BY clause is then used as the explicit segment sort order.

In both cases, when the new forceSegmentSortByTime parameter is true (the default), __time is implicitly prepended to the sort order, as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such segments can cause errors when loaded by older servers, due to violating their expectations that timestamps are always monotonically increasing. Second, even on newer servers, not all queries can run on non-time-sorted segments. Scan queries involving time-ordering and any query involving granularity will not run. (To partially mitigate this, a currently-undocumented SQL feature sqlUseGranularity is provided. When set to false the SQL planner avoids using granularity.)

Main changes

Changes on the write path:

  1. DimensionsSpec can now optionally contain a __time dimension, which controls the placement of __time in the sort order. If not present, __time is considered to be first in the sort order, as it has always been.

  2. IncrementalIndex and IndexMerger are updated to sort facts more flexibly; not always by time first.

  3. Metadata (stored in metadata.drd) gains a sortOrder field.

  4. MSQ can generate range-based shard specs even when not all columns are singly-valued strings. It merely stops accepting new clustering key fields when it encounters the first one that isn't a singly-valued string. This is useful because it enables range shard specs on someDim to be created for clauses like CLUSTERED BY someDim, __time.

  5. Auto-compaction respects and propagates sort orders that don't start with __time.

Changes on the read path:

  1. Update cursor holders for QueryableIndex and IncrementalIndex to return the ordering of the underlying index, so query engines can tell how a segment is sorted.

  2. Update CursorGranularizer and VectorCursorGranularizer to throw errors when using granularities on non-time-ordered segments.

  3. Update ScanQueryEngine to throw an error when using the time-ordering order parameter on non-time-ordered segments.

  4. Update TimeBoundaryQueryRunnerFactory to perform a segment scan when running on a non-time-ordered segment.

  5. Add sqlUseGranularity context parameter that causes the SQL planner to avoid using granularities other than ALL. This is undocumented, because it's hopefully a short-term hack. The more ideal thing would be to have all the native queries work properly on these segments.

  6. Move getMinTime, getMaxTime, and getMaxIngestedEventTime from StorageAdapter to TimeBoundaryInspector and MaxIngestedEventTimeInspector. This is mainly necessary for timeBoundary queries to be able to tell whether they can use getMinTime and getMaxTime, vs. needing to use a cursor. Previously, timeBoundary assumed that an adapter backing a TableDataSource was guaranteed to have an exact getMinTime and getMaxTime. But after this patch, that isn't necessarily going to be the case (in particular: a non-time-sorted segment won't be able to know its exact min/max time without a full scan.)

Other changes:

  1. Rename DimensionsSpec#hasCustomDimensions to hasFixedDimensions and change the meaning subtly: it now returns true if the DimensionsSpec represents an unchanging list of dimensions, or false if there is some discovery happening. This is what call sites had expected anyway.

  2. Removed descending from Joinable#makeJoinMatcher. Now that descending has more or less been phased out in favor of ordering: [__time DESC], the joinable order no longer needs to be reversed. (All joined rows arising from the same left-hand row have the same value.)

Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.

For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".

For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.

In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)

Changes on the write path:

1) DimensionsSpec can now optionally contain a __time dimension, which
   controls the placement of __time in the sort order. If not present,
   __time is considered to be first in the sort order, as it has always
   been.

2) IncrementalIndex and IndexMerger are updated to sort facts more
   flexibly; not always by time first.

3) Metadata (stored in metadata.drd) gains a "sortOrder" field.

4) MSQ can generate range-based shard specs even when not all columns are
   singly-valued strings. It merely stops accepting new clustering key
   fields when it encounters the first one that isn't a singly-valued
   string. This is useful because it enables range shard specs on
   "someDim" to be created for clauses like "CLUSTERED BY someDim, __time".

Changes on the read path:

1) Add StorageAdapter#getSortOrder so query engines can tell how a
   segment is sorted.

2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
   and VectorCursorGranularizer to throw errors when using granularities
   on non-time-ordered segments.

3) Update ScanQueryEngine to throw an error when using the time-ordering
  "order" parameter on non-time-ordered segments.

4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
   running on a non-time-ordered segment.

5) Add "sqlUseGranularity" context parameter that causes the SQL planner
   to avoid using granularities other than ALL.

Other changes:

1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
   and change the meaning subtly: it now returns true if the DimensionsSpec
   represents an unchanging list of dimensions, or false if there is
   some discovery happening. This is what call sites had expected anyway.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDim(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimThenTimeExplicitSort(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimThenTimeError(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByDimThenTimeError2(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByTimeThenDimExplicitSort(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.
Comment on lines +293 to +311
DataSchema schema = new DataSchema(
IdUtilsTest.VALID_ID_CHARS,
new TimestampSpec("time", "auto", null),
DimensionsSpec.builder()
.setDimensions(
ImmutableList.of(
new StringDimensionSchema("__time"),
new StringDimensionSchema("dimA"),
new StringDimensionSchema("dimB")
)
)
.setDimensionExclusions(ImmutableList.of("dimC"))
.build(),
null,
new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
null,
null,
jsonMapper
);

Check notice

Code scanning / CodeQL

Unread local variable

Variable 'DataSchema schema' is never read.
ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap = descending ? subMap.descendingMap() : subMap;
return rangeMap.keySet();
} else {
return Iterables.filter(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we need this right now, but i'm not sure we will need this after #16533. timeRangeIterable is primarily used to support query granularity buckets in topN (in my branch to support mark/resetToMark to move the cursor in the facts table to the correct granularity bucket without having to advance the cursor directly). i think we could just use iterator or expose an alternative iteralble for incremental index cursor if we aren't requesting time ordering

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's also used for intervals filtering even for non-granular cursors. It seems like it'd be useful for that at least.

}

@Override
public Iterator<IncrementalIndexRow> iterator(boolean descending)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

descending is pretty tightly coupled with time ordering, but i guess is harmless to implement

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed to make more sense to implement it here rather than throw an error. I suppose in theory there could be a use case, in the future, for situations like ORDER BY userId DESC when the segment is sorted by userId.

{
final List<String> baseSortOrder = baseAdapter.getSortOrder();

// Sorted the same way as the base segment, unless the unnested column shadows one of the base columns.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we will basically always be reading the unnested column, i guess the main expected utility of this will be if ordering by one of the columns that is not being unnested?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure the common case is that the unnested column will not be first in the sort order. So we'll return whatever prefix of the order is there up to the unnested column.

Comment on lines +149 to +154
List<String> getSortOrder();

default boolean isTimeOrdered()
{
return ColumnHolder.TIME_COLUMN_NAME.equals(Iterables.getFirst(getSortOrder(), null));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the context of #16533, this should probably live on CursorHolder instead of StorageAdapter, though it will require a bunch of tests to make and dispose of a cursor holder to check if their query against the sort order, but otherwise shouldn't be very disruptive. I wonder if we should include a direction similar to in that PR though, maybe re-using scan query ordering so that these two changes are compatible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a good idea, although in the interests of minimizing conflicts, it'd probably be good to do this after #16533 is merged (since it moves OrderBy around).

Comment on lines +118 to +122
public List<String> getSortOrder()
{
return sortOrder;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know it cannot be specified during ingest with the current mechanisms, but it seems like we should include the direction of the ordering as well, maybe re-use the scan query ordering?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (sortOrdersToMerge.stream().anyMatch(Objects::isNull)) {
return null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be an error if they aren't all null? seems like it should be pretty consistent across indexable adapters...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it lenient for two reasons:

  • Merging of the other parts of the Metadata is also lenient
  • It isn't documented that this method requires that all Metadatas are sourced from segments created with the same ingestion spec + the same version of the software

String column = null;

for (final List<String> sortOrder : sortOrdersToMerge) {
if (mergedSortOrder.size() >= sortOrder.size()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this happen from columns with all null values or something? maybe this method could use some comments to make the rationale behind the decisions clearer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments and also fixed a problem: null is now treated as [__time] (which makes sense, since that was the only possible sort order prior to the sortOrder field being added). Due to this change the method is no longer @Nullable.

Comment on lines +174 to +176
// It's possibly incorrect in some cases for sort order to be SORTED_BY_TIME_ONLY here, but for historical reasons,
// we're keeping this in place for now. The handling of "interval" in "makeCursors", which has been in place for
// some time, suggests we think the data is always sorted by time.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this referring to the RowWalker with its skipToDateTime?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

incrementalIndexSchema.getTimestampSpec(),
this.gran,
this.rollup,
ColumnHolder.TIME_COLUMN_NAME.equals(Iterables.getFirst(dimensionOrder, null)) ? null : dimensionOrder
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should always write this out instead of leaving it null? i guess it is done like this to make it easier to fill in older segments and new segments ordered by time first?

Copy link
Copy Markdown
Contributor Author

@gianm gianm Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't totally remember why this is conditional. It doesn't really make sense to be conditional IMO. I updated it to always write out.

if (index.timePosition == 0) {
return Metadata.SORTED_BY_TIME_ONLY;
} else {
return Collections.emptyList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suppose depending on the type of facts holder we could actually report something here (rollup should be ordered i think?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense. I added a comment about this being a possible change in the future.

@gianm
Copy link
Copy Markdown
Contributor Author

gianm commented Aug 8, 2024

From the discussion in #16849 (comment), after #16533 is merged we should revisit this one to (a) resolve conflict (b) replace List<String> with List<OrderBy> in the sort order field.

@vogievetsky vogievetsky added the Needs web console change Backend API changes that would benefit from frontend support in the web console label Aug 15, 2024
}

ColumnSelectorFactory selectorFactory = table.makeColumnSelectorFactory(joinableOffset, descending, closer);
ColumnSelectorFactory selectorFactory = table.makeColumnSelectorFactory(joinableOffset, closer);

Check notice

Code scanning / CodeQL

Possible confusion of local and field

Potentially confusing name: [IndexedTableJoinMatcher](1) also refers to field [selectorFactory](2) (as this.selectorFactory).
@gianm
Copy link
Copy Markdown
Contributor Author

gianm commented Aug 20, 2024

Pushed up a commit resolving conflicts with #16533. Main changes:

  • Changed frame cursors to report "no ordering" rather than time ordering.
  • Removed descending from Joinable#makeJoinMatcher. Now that descending has more or less been phased out in favor of ordering: [__time DESC], the joinable order no longer needs to be reversed. (All joined rows arising from the same left-hand row have the same value.)
  • Moved granularity validations (the ones that make the message Cannot use granularity[%s] on non-time-sorted data) into the granularizers.
  • Moved getMinTime, getMaxTime, and getMaxIngestedEventTime from StorageAdapter to TimeBoundaryInspector and MaxIngestedEventTimeInspector. This is mainly necessary for timeBoundary queries to be able to tell whether they can use getMinTime and getMaxTime, vs. needing to use a cursor. Previously, timeBoundary assumed that an adapter backing a TableDataSource was guaranteed to have an exact getMinTime and getMaxTime. But after this patch, that isn't necessarily going to be the case (in particular: a non-time-sorted segment won't be able to know its exact min/max time without a full scan.)

gianm added a commit to gianm/druid that referenced this pull request Aug 26, 2024
It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in apache#16849.
gianm added a commit that referenced this pull request Aug 27, 2024
* Place __time in signatures according to sort order.

Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:

- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
  the column order in segmentMetadata queries, and therefore in SQL
  schemas as well.

Follow-up to #16849.

* Fix compilation.

* Additional fixes.

* Fix.

* Fix style.

* Omit nonexistent columns from the row signature.

* Fix tests.
gianm added a commit that referenced this pull request Aug 27, 2024
* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in #16849.

* Fix test. Better warning message.
hevansDev pushed a commit to hevansDev/druid that referenced this pull request Aug 29, 2024
* Segments primarily sorted by non-time columns.

Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.

For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".

For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.

In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)

Changes on the write path:

1) DimensionsSpec can now optionally contain a __time dimension, which
   controls the placement of __time in the sort order. If not present,
   __time is considered to be first in the sort order, as it has always
   been.

2) IncrementalIndex and IndexMerger are updated to sort facts more
   flexibly; not always by time first.

3) Metadata (stored in metadata.drd) gains a "sortOrder" field.

4) MSQ can generate range-based shard specs even when not all columns are
   singly-valued strings. It merely stops accepting new clustering key
   fields when it encounters the first one that isn't a singly-valued
   string. This is useful because it enables range shard specs on
   "someDim" to be created for clauses like "CLUSTERED BY someDim, __time".

Changes on the read path:

1) Add StorageAdapter#getSortOrder so query engines can tell how a
   segment is sorted.

2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
   and VectorCursorGranularizer to throw errors when using granularities
   on non-time-ordered segments.

3) Update ScanQueryEngine to throw an error when using the time-ordering
  "order" parameter on non-time-ordered segments.

4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
   running on a non-time-ordered segment.

5) Add "sqlUseGranularity" context parameter that causes the SQL planner
   to avoid using granularities other than ALL.

Other changes:

1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
   and change the meaning subtly: it now returns true if the DimensionsSpec
   represents an unchanging list of dimensions, or false if there is
   some discovery happening. This is what call sites had expected anyway.

* Fixups from CI.

* Fixes.

* Fix missing arg.

* Additional changes.

* Fix logic.

* Fixes.

* Fix test.

* Adjust test.

* Remove throws.

* Fix styles.

* Fix javadocs.

* Cleanup.

* Smoother handling of null ordering.

* Fix tests.

* Missed a spot on the merge.

* Fixups.

* Avoid needless Filters.and.

* Add timeBoundaryInspector to test.

* Fix tests.

* Fix FrameStorageAdapterTest.

* Fix various tests.

* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.

* Pom fix.

* Fix doc.
hevansDev pushed a commit to hevansDev/druid that referenced this pull request Aug 29, 2024
* Place __time in signatures according to sort order.

Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:

- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
  the column order in segmentMetadata queries, and therefore in SQL
  schemas as well.

Follow-up to apache#16849.

* Fix compilation.

* Additional fixes.

* Fix.

* Fix style.

* Omit nonexistent columns from the row signature.

* Fix tests.
hevansDev pushed a commit to hevansDev/druid that referenced this pull request Aug 29, 2024
* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in apache#16849.

* Fix test. Better warning message.
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
* Segments primarily sorted by non-time columns.

Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.

For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".

For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.

In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)

Changes on the write path:

1) DimensionsSpec can now optionally contain a __time dimension, which
   controls the placement of __time in the sort order. If not present,
   __time is considered to be first in the sort order, as it has always
   been.

2) IncrementalIndex and IndexMerger are updated to sort facts more
   flexibly; not always by time first.

3) Metadata (stored in metadata.drd) gains a "sortOrder" field.

4) MSQ can generate range-based shard specs even when not all columns are
   singly-valued strings. It merely stops accepting new clustering key
   fields when it encounters the first one that isn't a singly-valued
   string. This is useful because it enables range shard specs on
   "someDim" to be created for clauses like "CLUSTERED BY someDim, __time".

Changes on the read path:

1) Add StorageAdapter#getSortOrder so query engines can tell how a
   segment is sorted.

2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
   and VectorCursorGranularizer to throw errors when using granularities
   on non-time-ordered segments.

3) Update ScanQueryEngine to throw an error when using the time-ordering
  "order" parameter on non-time-ordered segments.

4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
   running on a non-time-ordered segment.

5) Add "sqlUseGranularity" context parameter that causes the SQL planner
   to avoid using granularities other than ALL.

Other changes:

1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
   and change the meaning subtly: it now returns true if the DimensionsSpec
   represents an unchanging list of dimensions, or false if there is
   some discovery happening. This is what call sites had expected anyway.

* Fixups from CI.

* Fixes.

* Fix missing arg.

* Additional changes.

* Fix logic.

* Fixes.

* Fix test.

* Adjust test.

* Remove throws.

* Fix styles.

* Fix javadocs.

* Cleanup.

* Smoother handling of null ordering.

* Fix tests.

* Missed a spot on the merge.

* Fixups.

* Avoid needless Filters.and.

* Add timeBoundaryInspector to test.

* Fix tests.

* Fix FrameStorageAdapterTest.

* Fix various tests.

* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.

* Pom fix.

* Fix doc.
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
* Place __time in signatures according to sort order.

Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:

- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
  the column order in segmentMetadata queries, and therefore in SQL
  schemas as well.

Follow-up to apache#16849.

* Fix compilation.

* Additional fixes.

* Fix.

* Fix style.

* Omit nonexistent columns from the row signature.

* Fix tests.
edgar2020 pushed a commit to edgar2020/druid that referenced this pull request Sep 5, 2024
* MSQ: Fix validation of time position in collations.

It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.

This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.

Fixes a bug introduced in apache#16849.

* Fix test. Better warning message.
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
kfaraz pushed a commit that referenced this pull request Nov 27, 2024
This patch supports sorting segments by non-time columns (added in #16849) to MSQ compaction.
Specifically, if `forceSegmentSortByTime` is set in the data schema, either via the user-supplied
compaction config or in the inferred schema, the following steps are taken:
- Skip adding `__time` explicitly as the first column to the dimension schema since it already comes
as part of the schema
- Ensure column mappings propagate `__time` in the order specified by the schema
- Set `forceSegmentSortByTime` in the MSQ context.
@maytasm
Copy link
Copy Markdown
Contributor

maytasm commented Jan 29, 2025

@clintropolis @gianm
I think this PR broke dataSourceMetadata query.
Seems like MaxIngestedEventTimeInspector in DataSourceMetadataQueryRunner will always be null.
This can be reproduced with the wikipedia dataset.
Just run

{
"queryType" : "dataSourceMetadata",
"dataSource" : "wikipedia"
}

The "maxIngestedEventTime" returned is -146136543-09-08T08:23:32.096Z is wrong.

Doesn't look like this code in QueryableIndexSegment:

  @SuppressWarnings("unchecked")
  @Nullable
  @Override
  public <T> T as(@Nonnull Class<T> clazz)
  {
    final Function<QueryableIndexSegment, ?> fn = AS_MAP.get(clazz);
    if (fn != null) {
      final T fnApply = (T) fn.apply(this);
      if (fnApply != null) {
        return fnApply;
      }
    }

    if (TimeBoundaryInspector.class.equals(clazz)) {
      return (T) timeBoundaryInspector;
    } else if (Metadata.class.equals(clazz)) {
      return (T) index.getMetadata();
    } else if (PhysicalSegmentInspector.class.equals(clazz)) {
      return (T) new QueryableIndexPhysicalSegmentInspector(index);
    } else if (TopNOptimizationInspector.class.equals(clazz)) {
      return (T) new SimpleTopNOptimizationInspector(true);
    }

    return Segment.super.as(clazz);
  }

Is handling MaxIngestedEventTimeInspector?

@clintropolis
Copy link
Copy Markdown
Member

I think this PR broke dataSourceMetadata query.
Seems like MaxIngestedEventTimeInspector in DataSourceMetadataQueryRunner will always be null.

The intention of the new MaxIngestedEventTimeInspector is that it is only for use with realtime data, because every other implementation on the old StorageAdapter of getMaxIngestedEventTime() was just calling getMaxTime() which gives the end interval of the segment. Could use timeBoundary instead if there is no realtime data.

It does seem odd we return min datetime, i would have expected null. I don't think it makes much sense to have an implementation of MaxIngestedEventTimeInspector for published segments given that timeBoundary exists, but the docs could probably more clearly spell this out. Apologies for the behavior change.

@maytasm
Copy link
Copy Markdown
Contributor

maytasm commented Jan 30, 2025

@clintropolis
Thanks for the response.

  • Im a little confused on when you said that getMaxTime() gives the end interval of the segment. I believe that this query (dataSourceMetadata) was previously returning something like 2025-01-21T08:23:32.096Z for our non-realtime datasource. The code for getMaxTime() in QueryableIndexStorageAdapter looks something like:
  private void populateMinMaxTime()
  {
    // Compute and cache minTime, maxTime.
    final ColumnHolder columnHolder = index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME);
    try (NumericColumn column = (NumericColumn) columnHolder.getColumn()) {
      this.minTime = DateTimes.utc(column.getLongSingleValueRow(0));
      this.maxTime = DateTimes.utc(column.getLongSingleValueRow(column.length() - 1));
    }
  }

which does seems to return the actual time (min and max) of the rows/data (not the end interval of the segment)... unless I am missing something.

@maytasm
Copy link
Copy Markdown
Contributor

maytasm commented Jan 30, 2025

  • Ah, the timeBoundary does return the actual time (min and max) of the rows/data which matches with the previous dataSourceMetadata. We can switch to the timeBoundary query as a workaround. Thanks!

@clintropolis
Copy link
Copy Markdown
Member

clintropolis commented Jan 30, 2025

Im a little confused on when you said that getMaxTime() gives the end interval of the segment.

Oh sorry, i meant the max timestamp within the segment interval, that code is the same (ish), its just on TimeBoundaryInspector now, https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/QueryableIndexTimeBoundaryInspector.java#L85

gianm added a commit to gianm/druid that referenced this pull request Jan 30, 2025
PR apache#16849 changed the behavior such that maxIngestedEventTime is not
updated for non-real-time data. This patch restores the old behavior for
non-real-time data by using a TimeBoundaryInspector when
MaxIngestedEventTimeInspector is not present.
@gianm
Copy link
Copy Markdown
Contributor Author

gianm commented Jan 30, 2025

Oops, the behavior change was unintentional (at least I didn't intend it, and this was my PR). This should restore the old behavior: #17686

gianm added a commit that referenced this pull request Jan 30, 2025
…7686)

PR #16849 changed the behavior such that maxIngestedEventTime is not
updated for non-real-time data. This patch restores the old behavior for
non-real-time data by using a TimeBoundaryInspector when
MaxIngestedEventTimeInspector is not present.
317brian pushed a commit to 317brian/druid that referenced this pull request Feb 3, 2025
…ache#17686)

PR apache#16849 changed the behavior such that maxIngestedEventTime is not
updated for non-real-time data. This patch restores the old behavior for
non-real-time data by using a TimeBoundaryInspector when
MaxIngestedEventTimeInspector is not present.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Dependencies Area - Documentation Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying Area - Segment Format and Ser/De Needs web console change Backend API changes that would benefit from frontend support in the web console

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants