Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean a
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.withMinTimestamp(CursorFactoryProjectionTest.TIMESTAMP.getMillis())
.withMinTimestamp(CursorFactoryProjectionTest.UTC_MIDNIGHT.getMillis())
.withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTimeZone;

import javax.annotation.Nullable;
import java.util.Arrays;
Expand All @@ -47,7 +49,7 @@

/**
* API type to specify an aggregating projection on {@link org.apache.druid.segment.incremental.IncrementalIndexSchema}
*
* <p>
* Decorated with {@link JsonTypeInfo} annotations as a future-proofing mechanism in the event we add other types of
* projections and need to extract out a base interface from this class.
*/
Expand Down Expand Up @@ -208,7 +210,10 @@ public String toString()
'}';
}

private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns, List<DimensionSchema> groupingColumns)
private static ProjectionOrdering computeOrdering(
VirtualColumns virtualColumns,
List<DimensionSchema> groupingColumns
)
{
if (groupingColumns.isEmpty()) {
// no ordering since there is only 1 row for this projection
Expand All @@ -218,24 +223,30 @@ private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns,

String timeColumnName = null;
Granularity granularity = null;
// try to find the __time column equivalent, which might be a time_floor expression to model granularity
// bucketing. The time column is decided as the finest granularity on __time detected. If the projection does
// not have a time-like column, the granularity will be handled as ALL for the projection and all projection
// rows will use a synthetic timestamp of the minimum timestamp of the incremental index

// determine the granularity and time column name for the projection, based on the finest time-like grouping column.
for (final DimensionSchema dimension : groupingColumns) {
ordering.add(OrderBy.ascending(dimension.getName()));
if (ColumnHolder.TIME_COLUMN_NAME.equals(dimension.getName())) {
timeColumnName = dimension.getName();
granularity = Granularities.NONE;
// already found exact __time grouping, skip assigning, granularity = Granularities.NONE;
break;
} else {
final VirtualColumn vc = virtualColumns.getVirtualColumn(dimension.getName());
final Granularity maybeGranularity = Granularities.fromVirtualColumn(vc);
if (granularity == null && maybeGranularity != null) {
granularity = maybeGranularity;
if (maybeGranularity == null || maybeGranularity.equals(Granularities.ALL)) {
// no __time in inputs or not supported, skip
} else if (Granularities.NONE.equals(maybeGranularity)) {
timeColumnName = dimension.getName();
} else if (granularity != null && maybeGranularity != null && maybeGranularity.isFinerThan(granularity)) {
granularity = maybeGranularity;
// already found exact __time grouping, skip assigning, granularity = Granularities.NONE;
break;
} else if (maybeGranularity.getClass().equals(PeriodGranularity.class)
&& maybeGranularity.getTimeZone().equals(DateTimeZone.UTC)
&& ((PeriodGranularity) maybeGranularity).getOrigin() == null
&& (granularity == null || maybeGranularity.isFinerThan(granularity))) {
// found a finer period granularity than the existing granularity, or it's the first one
timeColumnName = dimension.getName();
granularity = maybeGranularity;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.chrono.ISOChronology;

import javax.annotation.Nullable;
import java.util.Arrays;
Expand Down Expand Up @@ -145,10 +144,11 @@ public static CursorBuildSpec decorateCursorBuildSpec(Query<?> query, CursorBuil

/**
* Translates a {@link Granularity} to a {@link ExpressionVirtualColumn} on {@link ColumnHolder#TIME_COLUMN_NAME} of
* the equivalent grouping column. If granularity is {@link #ALL}, this method returns null since we are not grouping
* on time. If granularity is a {@link PeriodGranularity} with UTC timezone and no origin, this method returns a
* virtual column with {@link TimestampFloorExprMacro.TimestampFloorExpr} of the specified period. If granularity is
* {@link #NONE}, or any other kind of granularity (duration, period with non-utc timezone or origin) this method
* the equivalent grouping column.
* <ul>
* <li>If granularity is {@link #ALL}, this method returns null since we are not grouping on time.
* <li>If granularity is a {@link PeriodGranularity}, we'd map it to {@link TimestampFloorExprMacro.TimestampFloorExpr}.
* <li>If granularity is {@link #NONE}, or any other kind of granularity (duration, period with non-utc timezone or origin) this method
* returns a virtual column with {@link org.apache.druid.math.expr.IdentifierExpr} specifying
* {@link ColumnHolder#TIME_COLUMN_NAME} directly.
*/
Expand All @@ -158,16 +158,14 @@ public static ExpressionVirtualColumn toVirtualColumn(Granularity granularity, S
if (ALL.equals(granularity)) {
return null;
}

final String expression;
if (NONE.equals(granularity) || granularity instanceof DurationGranularity) {
expression = ColumnHolder.TIME_COLUMN_NAME;
} else {
if (granularity instanceof PeriodGranularity) {
PeriodGranularity period = (PeriodGranularity) granularity;
if (!ISOChronology.getInstanceUTC().getZone().equals(period.getTimeZone()) || period.getOrigin() != null) {
expression = ColumnHolder.TIME_COLUMN_NAME;
} else {
expression = TimestampFloorExprMacro.forQueryGranularity(period.getPeriod());
}
expression = TimestampFloorExprMacro.forQueryGranularity(period);
} else {
// DurationGranularity or any other granularity that is not a PeriodGranularity
expression = ColumnHolder.TIME_COLUMN_NAME;
}

return new ExpressionVirtualColumn(
Expand All @@ -194,14 +192,26 @@ public static ExpressionVirtualColumn toVirtualColumn(Granularity granularity, S
public static Granularity fromVirtualColumn(VirtualColumn virtualColumn)
{
if (virtualColumn instanceof ExpressionVirtualColumn) {
final ExpressionVirtualColumn expressionVirtualColumn = (ExpressionVirtualColumn) virtualColumn;
final Expr expr = expressionVirtualColumn.getParsedExpression().get();
if (expr instanceof TimestampFloorExprMacro.TimestampFloorExpr) {
final TimestampFloorExprMacro.TimestampFloorExpr gran = (TimestampFloorExprMacro.TimestampFloorExpr) expr;
if (gran.getArg().getBindingIfIdentifier() != null) {
return gran.getGranularity();
}
}
return fromExpr(((ExpressionVirtualColumn) virtualColumn).getParsedExpression().get());
}
return null;
}

@Nullable
private static Granularity fromExpr(Expr expr)
{
String identifier = expr.getBindingIfIdentifier();
if (identifier != null) {
// If the grouping is based on __time directly, return None.
// Otherwise, grouping based on non-time columns, return ALL.
return identifier.equals(ColumnHolder.TIME_COLUMN_NAME)
? Granularities.NONE
: Granularities.ALL;
Comment thread
cecemei marked this conversation as resolved.
}

if (expr instanceof TimestampFloorExprMacro.TimestampFloorExpr) {
final TimestampFloorExprMacro.TimestampFloorExpr gran = (TimestampFloorExprMacro.TimestampFloorExpr) expr;
return gran.getGranularity();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.zone.ZoneOffsetTransition;
import java.time.zone.ZoneRules;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* PeriodGranularity buckets data based on any custom time period
Expand Down Expand Up @@ -216,6 +225,169 @@ public String toString()
'}';
}

/**
* Returns true if this granularity can be mapped to the target granularity. A granularity can be mapped when each
* interval of the source fits entirely within a single interval of the target under the given time zone.
*
* <p>Examples:
* <ul>
* <li>{@code Period("PT1H")} in UTC can be mapped to {@code Period("P1D")} in UTC,
* since every hourly interval is fully contained within some day interval.</li>
* <li>{@code Period("PT1H")} in {@code America/Los_Angeles} can be mapped to
* {@code Period("PT1H")} in UTC, since each hour in local time still fits inside
* a corresponding hour in UTC (even though offsets can differ due to daylight saving).</li>
* <li>{@code Period("P1D")} in {@code America/Los_Angeles} cannot be mapped to
* {@code Period("P1D")} in UTC, since local day boundaries may cross UTC days and
* are not fully contained within a single UTC day.</li>
* <li>{@code Period("PT1H")} in {@code Asia/Kolkata} cannot be mapped to
* {@code Period("PT1H")} in UTC, since the 30-minute offset causes local hour
* intervals to straddle two UTC hour intervals.</li>
* </ul>
*
* @param target the target granularity to check against
* @return {@code true} if this granularity is fully contained within the target granularity; {@code false} otherwise
*/
public boolean canBeMappedTo(PeriodGranularity target)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

admittedly i'm still digesting how exactly this method works, but it seems kind of expensive to do to match for every projection we consider of every segment when the timezones don't match (at least getUtcMappablePeriodSecondsOrThrow seems expensive). Perhaps we should make a cache of these conversions so we can re-use the work we've done since its likely going to be a lot of the same checks over and over?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also these methods feel like they could use some additional comments to make it clearer what is going on and why this works

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some additional comments. also added PERIOD_GRAN_CACHE in Projections.java.

{
if (hasOrigin || target.hasOrigin) {
return false;
}

if (getTimeZone().equals(target.getTimeZone())) {
int periodMonths = period.getYears() * 12 + period.getMonths();
int targetMonths = target.period.getYears() * 12 + target.period.getMonths();
if (targetMonths == 0 && periodMonths != 0) {
// cannot map if target has no month, but period has month, e.x. P1M cannot be mapped to P1D or P1W
return false;
}

Optional<Long> periodStandardSeconds = getStandardSeconds(period.withYears(0).withMonths(0));
if (periodStandardSeconds.isEmpty()) {
// millisecond precision period is not supported
return false;
}
Optional<Long> targetStandardSeconds = getStandardSeconds(target.period.withYears(0).withMonths(0));
if (targetStandardSeconds.isEmpty()) {
// millisecond precision period is not supported
return false;
}
if (targetMonths == 0 && periodMonths == 0) {
// both periods have zero months, we only need to check standard seconds
// e.x. PT1H can be mapped to PT3H, PT15M can be mapped to PT1H
return targetStandardSeconds.get() % periodStandardSeconds.get() == 0;
}
// if we reach here, targetMonths != 0
if (periodMonths == 0) {
// can map if 1.target not have week/day/hour/minute/second, and 2.period can be mapped to day
// e.x PT3H can be mapped to P1M
return targetStandardSeconds.get() == 0 && (3600 * 24) % periodStandardSeconds.get() == 0;
} else {
// can map if 1.target&period not have week/day/hour/minute/second, and 2.period month can be mapped to target month
// e.x. P1M can be mapped to P3M, P1M can be mapped to P1Y
return targetMonths % periodMonths == 0
&& targetStandardSeconds.get() == 0
&& periodStandardSeconds.get() == 0;
}
}

// different time zones, we'd map to UTC first, then check if the target can cover the UTC-mapped period
Optional<Long> standardSeconds = getStandardSeconds(period);
if (standardSeconds.isEmpty()) {
// must be in whole seconds, i.e. no years, months, or milliseconds.
return false;
}
Optional<Long> utcMappablePeriodSeconds = getUtcMappablePeriodSeconds();
if (utcMappablePeriodSeconds.isEmpty()) {
return false;
}
if (!standardSeconds.get().equals(utcMappablePeriodSeconds.get())) {
// the period cannot be mapped to UTC with the same period, e.x. PT1H in Asia/Kolkata cannot be mapped to PT1H in UTC
return false;
}
if (target.period.getYears() == 0 && target.period.getMonths() == 0) {
Optional<Long> targetUtcMappablePeriodSeconds = target.getUtcMappablePeriodSeconds();
if (targetUtcMappablePeriodSeconds.isEmpty()) {
return false;
}
// both periods have zero months, we only need to check standard seconds
// e.x. PT30M in Asia/Kolkata can be mapped to PT1H in America/Los_Angeles
return targetUtcMappablePeriodSeconds.get() % standardSeconds.get() == 0;
} else {
// can map if 1.target not have week/day/hour/minute/second, and 2.period can be mapped to day
// e.x PT1H in America/Los_Angeles can be mapped to P1M in Asia/Shanghai
Optional<Long> targetStandardSecondsIgnoringMonth = getStandardSeconds(target.period.withYears(0).withMonths(0));
return targetStandardSecondsIgnoringMonth.isPresent()
&& targetStandardSecondsIgnoringMonth.get() == 0
&& (3600 * 24) % standardSeconds.get() == 0;
}
}

/**
* Returns the maximum possible period seconds that this granularity can be mapped to UTC.
* <p>
* Returns empty if the period cannot be mapped to whole seconds, i.e. it has years or months, or milliseconds.
*/
private Optional<Long> getUtcMappablePeriodSeconds()
{
Optional<Long> periodSeconds = PeriodGranularity.getStandardSeconds(period);
if (periodSeconds.isEmpty()) {
return Optional.empty();
}

if (ISOChronology.getInstanceUTC().getZone().equals(getTimeZone())) {
return periodSeconds;
}
ZoneRules rules = ZoneId.of(getTimeZone().getID()).getRules();
Set<Integer> offsets = Stream.concat(
Stream.of(rules.getStandardOffset(Instant.now())),
rules.getTransitions()
.stream()
.filter(t -> t.getInstant().isAfter(Instant.EPOCH)) // timezone transitions before epoch are patchy
.map(ZoneOffsetTransition::getOffsetBefore)
).map(ZoneOffset::getTotalSeconds).collect(Collectors.toSet());

if (offsets.isEmpty()) {
// no offsets
return periodSeconds;
}

if (offsets.stream().allMatch(o -> o % periodSeconds.get() == 0)) {
// all offsets are multiples of the period, e.x. PT8H and PT2H in Asia/Shanghai
return periodSeconds;
} else if (periodSeconds.get() % 3600 == 0 && offsets.stream().allMatch(o -> o % 3600 == 0)) {
// fall back to hour if period is a multiple of hour and all offsets are multiples of hour, e.x. PT1H in America/Los_Angeles
return Optional.of(3600L);
} else if (periodSeconds.get() % 1800 == 0 && offsets.stream().allMatch(o -> o % 1800 == 0)) {
// fall back to 30 minutes if period is a multiple of 30 minutes and all offsets are multiples of 30 minutes, e.x. PT30M in Asia/Kolkata
return Optional.of(1800L);
} else if (periodSeconds.get() % 60 == 0 && offsets.stream().allMatch(o -> o % 60 == 0)) {
// fall back to minute if period is a multiple of minute and all offsets are multiples of minute
return Optional.of(60L);
} else {
// default to second
return Optional.of(1L);
}
}

/**
* Returns the standard whole seconds for the given period.
* <p>
* Returns empty if the period cannot be mapped to whole seconds, i.e. one of the following applies:
* <ul>
* <li>it has years or months
* <li>it has milliseconds
*/
private static Optional<Long> getStandardSeconds(Period period)
{
if (period.getYears() == 0 && period.getMonths() == 0) {
long millis = period.toStandardDuration().getMillis();
return millis % 1000 == 0
? Optional.of(millis / 1000)
: Optional.empty();
}
return Optional.empty();
}

private static boolean isCompoundPeriod(Period period)
{
int[] values = period.getValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.expression;

import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
Expand All @@ -29,7 +30,6 @@
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.math.expr.vector.LongUnivariateLongFunctionVectorProcessor;
import org.apache.druid.segment.column.ColumnHolder;
import org.joda.time.Period;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -38,9 +38,13 @@

public class TimestampFloorExprMacro implements ExprMacroTable.ExprMacro
{
public static String forQueryGranularity(Period period)
public static String forQueryGranularity(PeriodGranularity period)
{
return FN_NAME + "(" + ColumnHolder.TIME_COLUMN_NAME + ",'" + period + "')";
return FN_NAME + "(" + ColumnHolder.TIME_COLUMN_NAME
+ "," + StringUtils.format("'%s'", period.getPeriod())
+ "," + (period.getOrigin() == null ? "null" : StringUtils.format("'%s'", period.getOrigin()))
+ "," + StringUtils.format("'%s'", period.getTimeZone())
+ ")";
}

private static final String FN_NAME = "timestamp_floor";
Expand Down
Loading