From 69e075a98ead3b23795e64fee76ff83680aef3eb Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Thu, 11 Apr 2019 14:41:29 -0700 Subject: [PATCH 01/14] Update scan query runner factory to accept SpecificSegmentSpec --- .../query/scan/ScanQueryRunnerFactory.java | 36 +- .../spec/MultipleSpecificSegmentSpec.java | 9 +- .../scan/ScanQueryRunnerFactoryTest.java | 396 ++++++++++-------- 3 files changed, 257 insertions(+), 184 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 5f49a6669448..d5c69a9a85ad 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -40,11 +40,14 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; import org.joda.time.Interval; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.LinkedHashMap; @@ -111,17 +114,7 @@ public QueryRunner mergeRunners( return returnedRows; } } else { - // Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need - // to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense - // the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals - // and query runners. - if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) { - throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" - + "of type MultipleSpecificSegmentSpec"); - } - // Ascending time order for both descriptors and query runners by default - List descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); + List descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec()); List> queryRunnersOrdered = Lists.newArrayList(queryRunners); if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { @@ -286,6 +279,27 @@ public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue i return Sequences.simple(sortedElements); } + @VisibleForTesting + List getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec) + { + // Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need + // to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense + // the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals + // and query runners. + List descriptorsOrdered; + + if (spec instanceof MultipleSpecificSegmentSpec) { + // Ascending time order for both descriptors and query runners by default + descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors(); + } else if (spec instanceof SpecificSegmentSpec) { + descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor()); + } else { + throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" + + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec"); + } + return descriptorsOrdered; + } + @VisibleForTesting Sequence nWayMergeAndLimit( List>> groupedRunners, diff --git a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java index 626233580dd1..07bf9ca7b838 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -64,14 +64,7 @@ public List getIntervals() intervals = JodaUtils.condenseIntervals( Iterables.transform( descriptors, - new Function() - { - @Override - public Interval apply(SegmentDescriptor input) - { - return input.getInterval(); - } - } + input -> input.getInterval() ) ); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index a7cf3c60e3fd..5de2aea986b8 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -31,9 +31,15 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,12 +49,9 @@ import java.util.List; -@RunWith(Parameterized.class) +@RunWith(Enclosed.class) public class ScanQueryRunnerFactoryTest { - private int numElements; - private ScanQuery query; - private ScanQuery.ResultFormat resultFormat; private static final ScanQueryRunnerFactory factory = new ScanQueryRunnerFactory( new ScanQueryQueryToolChest( @@ -59,200 +62,263 @@ public class ScanQueryRunnerFactoryTest new ScanQueryConfig() ); - public ScanQueryRunnerFactoryTest( - final int numElements, - final int batchSize, - final long limit, - final ScanQuery.ResultFormat resultFormat, - final ScanQuery.Order order - ) + @RunWith(Parameterized.class) + public static class ScanQueryRunnerFactoryParameterizedTest { - this.numElements = numElements; - this.query = Druids.newScanQueryBuilder() - .batchSize(batchSize) - .limit(limit) - .order(order) - .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) - .dataSource("some datasource") - .resultFormat(resultFormat) - .build(); - this.resultFormat = resultFormat; - } + private int numElements; + private ScanQuery query; + private ScanQuery.ResultFormat resultFormat; - @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}") - public static Iterable constructorFeeder() - { - List numsElements = ImmutableList.of(0, 10, 100); - List batchSizes = ImmutableList.of(1, 100); - List limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE); - List resultFormats = ImmutableList.of( - ScanQuery.ResultFormat.RESULT_FORMAT_LIST, - ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST - ); - List order = ImmutableList.of( - ScanQuery.Order.ASCENDING, - ScanQuery.Order.DESCENDING - ); - - return QueryRunnerTestHelper.cartesian( - numsElements, - batchSizes, - limits, - resultFormats, - order - ); - } + public ScanQueryRunnerFactoryParameterizedTest( + final int numElements, + final int batchSize, + final long limit, + final ScanQuery.ResultFormat resultFormat, + final ScanQuery.Order order + ) + { + this.numElements = numElements; + this.query = Druids.newScanQueryBuilder() + .batchSize(batchSize) + .limit(limit) + .order(order) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .dataSource("some datasource") + .resultFormat(resultFormat) + .build(); + this.resultFormat = resultFormat; + } - @Test - public void testSortAndLimitScanResultValues() - { - List srvs = new ArrayList<>(numElements); - List expectedEventTimestamps = new ArrayList<>(); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis(); - expectedEventTimestamps.add(timestamp); - srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}") + public static Iterable constructorFeeder() + { + List numsElements = ImmutableList.of(0, 10, 100); + List batchSizes = ImmutableList.of(1, 100); + List limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE); + List resultFormats = ImmutableList.of( + ScanQuery.ResultFormat.RESULT_FORMAT_LIST, + ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST + ); + List order = ImmutableList.of( + ScanQuery.Order.ASCENDING, + ScanQuery.Order.DESCENDING + ); + + return QueryRunnerTestHelper.cartesian( + numsElements, + batchSizes, + limits, + resultFormats, + order + ); } - expectedEventTimestamps.sort((o1, o2) -> { - int retVal = 0; - if (o1 > o2) { - retVal = 1; - } else if (o1 < o2) { - retVal = -1; + + @Test + public void testSortAndLimitScanResultValues() + { + List srvs = new ArrayList<>(numElements); + List expectedEventTimestamps = new ArrayList<>(); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis(); + expectedEventTimestamps.add(timestamp); + srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); } - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - return retVal * -1; + expectedEventTimestamps.sort((o1, o2) -> { + int retVal = 0; + if (o1 > o2) { + retVal = 1; + } else if (o1 < o2) { + retVal = -1; + } + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + return retVal * -1; + } + return retVal; + }); + Sequence inputSequence = Sequences.simple(srvs); + try { + List output = factory.priorityQueueSortAndLimit( + inputSequence, + query, + ImmutableList.of(new SegmentDescriptor(new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ), "1", 0)) + ).toList(); + if (query.getLimit() > Integer.MAX_VALUE) { + Assert.fail("Unsupported exception should have been thrown due to high limit"); + } + validateSortedOutput(output, expectedEventTimestamps); } - return retVal; - }); - Sequence inputSequence = Sequences.simple(srvs); - try { - List output = factory.priorityQueueSortAndLimit( - inputSequence, - query, - ImmutableList.of(new SegmentDescriptor(new Interval( - DateTimes.of("2010-01-01"), - DateTimes.of("2019-01-01").plusHours(1) - ), "1", 0)) - ).toList(); - if (query.getLimit() > Integer.MAX_VALUE) { - Assert.fail("Unsupported exception should have been thrown due to high limit"); + catch (UOE e) { + if (query.getLimit() <= Integer.MAX_VALUE) { + Assert.fail("Unsupported operation exception should not have been thrown here"); + } } - validateSortedOutput(output, expectedEventTimestamps); } - catch (UOE e) { - if (query.getLimit() <= Integer.MAX_VALUE) { - Assert.fail("Unsupported operation exception should not have been thrown here"); + + @Test + public void testNWayMerge() + { + List expectedEventTimestamps = new ArrayList<>(numElements * 3); + + List scanResultValues1 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); } - } - } - @Test - public void testNWayMerge() - { - List expectedEventTimestamps = new ArrayList<>(numElements * 3); + List scanResultValues2 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } - List scanResultValues1 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); - } + List scanResultValues3 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } - List scanResultValues2 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); - } + if (query.getOrder() == ScanQuery.Order.DESCENDING) { + Collections.reverse(scanResultValues1); + Collections.reverse(scanResultValues2); + Collections.reverse(scanResultValues3); + } - List scanResultValues3 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); - } + QueryRunner runnerSegment1Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues1); + + QueryRunner runnerSegment1Partition2 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues2); - if (query.getOrder() == ScanQuery.Order.DESCENDING) { - Collections.reverse(scanResultValues1); - Collections.reverse(scanResultValues2); - Collections.reverse(scanResultValues3); - } - QueryRunner runnerSegment1Partition1 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues1); + QueryRunner runnerSegment2Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues3); - QueryRunner runnerSegment1Partition2 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues2); + QueryRunner runnerSegment2Partition2 = + (queryPlus, responseContext) -> Sequences.empty(); + List>> groupedRunners = new ArrayList<>(2); - QueryRunner runnerSegment2Partition1 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues3); + if (query.getOrder() == ScanQuery.Order.DESCENDING) { + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + } else { + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + } - QueryRunner runnerSegment2Partition2 = - (queryPlus, responseContext) -> Sequences.empty(); + expectedEventTimestamps.sort((o1, o2) -> { + int retVal = 0; + if (o1 > o2) { + retVal = 1; + } else if (o1 < o2) { + retVal = -1; + } + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + return retVal * -1; + } + return retVal; + }); - List>> groupedRunners = new ArrayList<>(2); + List output = + factory.nWayMergeAndLimit( + groupedRunners, + QueryPlus.wrap(query), + ImmutableMap.of() + ).toList(); - if (query.getOrder() == ScanQuery.Order.DESCENDING) { - groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); - groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); - } else { - groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); - groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + validateSortedOutput(output, expectedEventTimestamps); } - expectedEventTimestamps.sort((o1, o2) -> { - int retVal = 0; - if (o1 > o2) { - retVal = 1; - } else if (o1 < o2) { - retVal = -1; + private void validateSortedOutput(List output, List expectedEventTimestamps) + { + // check each scan result value has one event + for (ScanResultValue srv : output) { + if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { + Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1); + } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { + Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1); + } } - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - return retVal * -1; + + // check total # of rows <= limit + Assert.assertTrue(output.size() <= query.getLimit()); + + // check ordering is correct + for (int i = 1; i < output.size(); i++) { + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) < + output.get(i - 1).getFirstEventTimestamp(resultFormat)); + } else { + Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) > + output.get(i - 1).getFirstEventTimestamp(resultFormat)); + } } - return retVal; - }); - - List output = - factory.nWayMergeAndLimit( - groupedRunners, - QueryPlus.wrap(query), - ImmutableMap.of() - ).toList(); - validateSortedOutput(output, expectedEventTimestamps); + // check the values are correct + for (int i = 0; i < query.getLimit() && i < output.size(); i++) { + Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); + } + } } - private void validateSortedOutput(List output, List expectedEventTimestamps) + public static class ScanQueryRunnerFactoryNonParameterizedTest { - // check each scan result value has one event - for (ScanResultValue srv : output) { - if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { - Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1); - } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1); - } - } + private SegmentDescriptor descriptor = new SegmentDescriptor(new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ), "1", 0); - // check total # of rows <= limit - Assert.assertTrue(output.size() <= query.getLimit()); + @Test + public void testGetValidSegmentDescriptorsFromSpec() + { + /* valid */ + QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec( + Collections.singletonList( + descriptor + ) + ); + QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor); - // check ordering is correct - for (int i = 1; i < output.size(); i++) { - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) < - output.get(i - 1).getFirstEventTimestamp(resultFormat)); - } else { - Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) > - output.get(i - 1).getFirstEventTimestamp(resultFormat)); - } + List descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec); + Assert.assertEquals(1, descriptors.size()); + Assert.assertEquals(descriptor, descriptors.get(0)); + + descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec); + Assert.assertEquals(1, descriptors.size()); + Assert.assertEquals(descriptor, descriptors.get(0)); + } + + @Test(expected = UOE.class) + public void testGetSegmentDescriptorsFromInvalidIntervalSpec() + { + /* invalid */ + QuerySegmentSpec multiIntervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList( + new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ) + ) + ); + factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec); } - // check the values are correct - for (int i = 0; i < query.getLimit() && i < output.size(); i++) { - Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); + @Test(expected = UOE.class) + public void testGetSegmentDescriptorsFromInvalidLegacySpec() + { + QuerySegmentSpec legacySpec = new LegacySegmentSpec( + new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ) + ); + factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec); } } } From 8539588e3542b848ffd61e38828c36d383d95f31 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Thu, 11 Apr 2019 14:53:23 -0700 Subject: [PATCH 02/14] nit --- .../org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 5de2aea986b8..6559c65b70fb 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -277,7 +277,6 @@ public static class ScanQueryRunnerFactoryNonParameterizedTest @Test public void testGetValidSegmentDescriptorsFromSpec() { - /* valid */ QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec( Collections.singletonList( descriptor @@ -297,7 +296,6 @@ public void testGetValidSegmentDescriptorsFromSpec() @Test(expected = UOE.class) public void testGetSegmentDescriptorsFromInvalidIntervalSpec() { - /* invalid */ QuerySegmentSpec multiIntervalSpec = new MultipleIntervalSegmentSpec( Collections.singletonList( new Interval( From 7bf259b8c2ed49cabf368975450e6600ebc73abc Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Thu, 11 Apr 2019 16:45:23 -0700 Subject: [PATCH 03/14] Sorry travis --- .../org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java index 07bf9ca7b838..34a458d2aaf3 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.query.Query; From 19c1485a6683de31c691ae1eb9d1cdecb87d5ab7 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Thu, 11 Apr 2019 16:58:33 -0700 Subject: [PATCH 04/14] Improve logging and fix doc --- .../druid/query/scan/ScanQueryRunnerFactory.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index d5c69a9a85ad..0a9f3b9b52d4 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -282,10 +282,10 @@ public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue i @VisibleForTesting List getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec) { - // Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need - // to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense - // the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals - // and query runners. + // Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because + // segment descriptors need to be present for a 1:1 matching of intervals with query runners. + // The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating + // the 1:1 relationship between intervals and query runners. List descriptorsOrdered; if (spec instanceof MultipleSpecificSegmentSpec) { @@ -295,7 +295,8 @@ List getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentS descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor()); } else { throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" - + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec"); + + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", + spec.getClass().getSimpleName()); } return descriptorsOrdered; } From 795c2c487be92d099b1b9a510d4cba1b90cd7d16 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Thu, 11 Apr 2019 19:30:40 -0700 Subject: [PATCH 05/14] Bug fix --- .../main/java/org/apache/druid/query/scan/ScanQuery.java | 6 +++++- .../query/scan/ScanResultValueTimestampComparator.java | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 2b6fc82b71d3..796f9d9e5dbb 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -256,7 +256,11 @@ public Boolean isLegacy() @Override public Ordering getResultOrdering() { - return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); + if (order == Order.NONE) { + return Ordering.natural(); + } else { + return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); + } } public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index dcf3bade136f..69f780fca706 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -42,8 +42,7 @@ public ScanResultValueTimestampComparator(ScanQuery scanQuery) @Override public int compare(ScanResultValue o1, ScanResultValue o2) { - int comparison; - comparison = Longs.compare( + int comparison = Longs.compare( o1.getFirstEventTimestamp(scanQuery.getResultFormat()), o2.getFirstEventTimestamp(scanQuery.getResultFormat())); if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) { From 49f06da1e25355b80c0a0a1850899f5c60a0cfe9 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 12:49:25 -0700 Subject: [PATCH 06/14] Friendlier error msgs and tests to cover bug --- .../apache/druid/query/scan/ScanQuery.java | 3 +- .../druid/query/scan/ScanResultValue.java | 10 +- .../druid/query/scan/ScanQueryTest.java | 179 ++++++++++++++++++ ...canResultValueTimestampComparatorTest.java | 8 +- 4 files changed, 193 insertions(+), 7 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 796f9d9e5dbb..009d1428d07b 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -258,9 +258,8 @@ public Ordering getResultOrdering() { if (order == Order.NONE) { return Ordering.natural(); - } else { - return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); } + return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); } public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java index 7bfcf025e1bc..a05e71507666 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.segment.column.ColumnHolder; @@ -78,9 +79,16 @@ public Object getEvents() public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat) { if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - return (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + Long timestamp = (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + if (timestamp == null) { + throw new ISE("Unable to compare timestamp for rows without a time column"); + } + return timestamp; } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); + if (timeColumnIndex == -1) { + throw new ISE("Unable to compare timestamp for rows without a time column"); + } List firstEvent = (List) ((List) this.getEvents()).get(0); return (Long) firstEvent.get(timeColumnIndex); } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java new file mode 100644 index 000000000000..94ec9cecfdc8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Druids; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.ColumnHolder; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public class ScanQueryTest +{ + private static QuerySegmentSpec intervalSpec; + private static ScanResultValue s1; + private static ScanResultValue s2; + private static ScanResultValue s3; + + @BeforeClass + public static void setup() + { + intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList( + new Interval(DateTimes.of("2012-01-01"), DateTimes.of("2012-01-01").plusHours(1)) + ) + ); + + ArrayList> events1 = new ArrayList<>(); + HashMap event1 = new HashMap<>(); + event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42)); + events1.add(event1); + + s1 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events1 + ); + + ArrayList> events2 = new ArrayList<>(); + HashMap event2 = new HashMap<>(); + event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43)); + events2.add(event2); + + s2 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events2 + ); + + // ScanResultValue s3 has no time column + ArrayList> events3 = new ArrayList<>(); + HashMap event3 = new HashMap<>(); + event3.put("yah", "yeet"); + events3.add(event3); + + s3 = new ScanResultValue( + "segmentId", + Collections.singletonList("yah"), + events3 + ); + } + + // Validates that getResultOrdering will work for the broker n-way merge + @Test + public void testMergeSequenceForResults() + { + // Should be able to handle merging s1, s2, s3 + ScanQuery noOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + + // Should only handle s1 and s2 + ScanQuery descendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + + // Should only handle s1 and s2 + ScanQuery ascendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.ASCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + // No Order + Sequence noOrderSeq = + Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1, s3)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, noOrderScan.getResultOrdering()); + + List noOrderList = noOrderSeq.toList(); + Assert.assertEquals(3, noOrderList.size()); + + + // Ascending + Sequence ascendingOrderSeq = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, ascendingOrderScan.getResultOrdering()); + + List ascendingList = ascendingOrderSeq.toList(); + Assert.assertEquals(2, ascendingList.size()); + Assert.assertEquals(s1, ascendingList.get(0)); + Assert.assertEquals(s2, ascendingList.get(1)); + + // Descending + Sequence descendingOrderSeq = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering()); + + List descendingList = descendingOrderSeq.toList(); + Assert.assertEquals(2, descendingList.size()); + Assert.assertEquals(s2, descendingList.get(0)); + Assert.assertEquals(s1, descendingList.get(1)); + } + + @Test(expected = ISE.class) + public void testTimeOrderingWithoutTimeColumn() + { + ScanQuery descendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + // This should fail because s3 doesn't have a timestamp + Sequence borkedSequence = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2, s3)) + ) + ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering()); + + // This should throw an ISE + List res = borkedSequence.toList(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java index 70f2e080b9f2..465794a28316 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java @@ -49,7 +49,7 @@ public static void setup() } @Test - public void comparisonDescendingListTest() + public void testComparisonDescendingList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.DESCENDING) @@ -86,7 +86,7 @@ public void comparisonDescendingListTest() } @Test - public void comparisonAscendingListTest() + public void testComparisonAscendingList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.ASCENDING) @@ -123,7 +123,7 @@ public void comparisonAscendingListTest() } @Test - public void comparisonDescendingCompactedListTest() + public void testComparisonDescendingCompactedList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.DESCENDING) @@ -158,7 +158,7 @@ public void comparisonDescendingCompactedListTest() } @Test - public void comparisonAscendingCompactedListTest() + public void testAscendingCompactedList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.ASCENDING) From b648b85bc0ee29c3156161db2168553b18ba993c Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:02:58 -0700 Subject: [PATCH 07/14] Address Gian's comments --- docs/content/querying/scan-query.md | 2 +- .../main/java/org/apache/druid/query/scan/ScanQuery.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index 7ba5c6097d9f..b1f5898b688e 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -61,7 +61,7 @@ The following are the main parameters for Scan queries: |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|How many rows buffered before return to client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| -|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none| +|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering` and the timestamp column is included in the `columns` field. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none| |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no| diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 009d1428d07b..73f315f550b5 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -34,6 +34,7 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; import java.util.List; @@ -149,6 +150,12 @@ public ScanQuery( this.columns = columns; this.legacy = legacy; this.order = (order == null) ? Order.NONE : order; + if (order != Order.NONE) { + Preconditions.checkArgument( + (columns.contains(ColumnHolder.TIME_COLUMN_NAME)), + "The timestamp column must be selected if the results are time-ordered." + ); + } this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering(); this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory(); } From 2a42e317ad5643e263d616863882c117dedf02ab Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:12:30 -0700 Subject: [PATCH 08/14] Fix doc --- docs/content/querying/scan-query.md | 2 +- .../src/main/java/org/apache/druid/query/scan/ScanQuery.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index b1f5898b688e..64210475a922 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -61,7 +61,7 @@ The following are the main parameters for Scan queries: |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|How many rows buffered before return to client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| -|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering` and the timestamp column is included in the `columns` field. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none| +|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none| |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no| diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 73f315f550b5..ceb4cd576acd 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -153,7 +153,7 @@ public ScanQuery( if (order != Order.NONE) { Preconditions.checkArgument( (columns.contains(ColumnHolder.TIME_COLUMN_NAME)), - "The timestamp column must be selected if the results are time-ordered." + "The __time column must be selected if the results are time-ordered." ); } this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering(); From 5f497ce6527d582f8c279e24f28769aabf64bd77 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:45:29 -0700 Subject: [PATCH 09/14] Added tests for empty and null column list --- .../apache/druid/query/scan/ScanQuery.java | 2 +- .../druid/query/scan/ScanQueryTest.java | 89 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index ceb4cd576acd..0701a67b6acd 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -152,7 +152,7 @@ public ScanQuery( this.order = (order == null) ? Order.NONE : order; if (order != Order.NONE) { Preconditions.checkArgument( - (columns.contains(ColumnHolder.TIME_COLUMN_NAME)), + columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME), "The __time column must be selected if the results are time-ordered." ); } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java index 94ec9cecfdc8..78f7de8d95d0 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -20,6 +20,8 @@ package org.apache.druid.query.scan; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.sun.javaws.exceptions.InvalidArgumentException; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequence; @@ -37,6 +39,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; public class ScanQueryTest { @@ -89,6 +92,92 @@ public static void setup() ); } + @Test(expected = IllegalArgumentException.class) + public void testAscendingScanQueryWithInvalidColumns() + { + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.ASCENDING) + .columns(ImmutableList.of("not time", "also not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDescendingScanQueryWithInvalidColumns() + { + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .columns(ImmutableList.of("not time", "also not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + + // No assertions because we're checking that no IllegalArgumentExceptions are thrown + @Test + public void testValidScanQueryInitialization() + { + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .columns(ImmutableList.of("not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .columns(ImmutableList.of()) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .columns(ImmutableList.of("__time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Set orders = ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING); + + for (ScanQuery.Order order : orders) { + Druids.newScanQueryBuilder() + .order(order) + .columns((List) null) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of()) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("__time", "col2")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + } + // Validates that getResultOrdering will work for the broker n-way merge @Test public void testMergeSequenceForResults() From 3a5048facb1d3f7d38b8ab00262d9e9a6e809dbd Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:46:36 -0700 Subject: [PATCH 10/14] Style --- .../src/test/java/org/apache/druid/query/scan/ScanQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java index 78f7de8d95d0..47042fcf7c0d 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.sun.javaws.exceptions.InvalidArgumentException; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequence; From cd36e18a068b1b832cd9d20591c1164091422ca2 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:54:03 -0700 Subject: [PATCH 11/14] Fix checking wrong order (looking at query param when it should be looking at the null-handled order) --- .../src/main/java/org/apache/druid/query/scan/ScanQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 0701a67b6acd..3f6d4074ea71 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -150,7 +150,7 @@ public ScanQuery( this.columns = columns; this.legacy = legacy; this.order = (order == null) ? Order.NONE : order; - if (order != Order.NONE) { + if (this.order != Order.NONE) { Preconditions.checkArgument( columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME), "The __time column must be selected if the results are time-ordered." From c34115c5684a6d24a2d5567b4372cccbfb68c9ee Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 14:58:18 -0700 Subject: [PATCH 12/14] Add test case for null order --- .../druid/query/scan/ScanQueryTest.java | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java index 47042fcf7c0d..1854883ca5b9 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -117,37 +118,41 @@ public void testDescendingScanQueryWithInvalidColumns() @Test public void testValidScanQueryInitialization() { - Druids.newScanQueryBuilder() - .order(ScanQuery.Order.NONE) - .columns(ImmutableList.of("not time")) - .dataSource("source") - .intervals(intervalSpec) - .build(); + List nonOrderedOrders = Arrays.asList(null, ScanQuery.Order.NONE); - Druids.newScanQueryBuilder() - .order(ScanQuery.Order.NONE) - .dataSource("source") - .intervals(intervalSpec) - .build(); + for (ScanQuery.Order order : nonOrderedOrders) { + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + Druids.newScanQueryBuilder() + .order(order) + .dataSource("source") + .intervals(intervalSpec) + .build(); - Druids.newScanQueryBuilder() - .order(ScanQuery.Order.NONE) - .columns(ImmutableList.of()) - .dataSource("source") - .intervals(intervalSpec) - .build(); - Druids.newScanQueryBuilder() - .order(ScanQuery.Order.NONE) - .columns(ImmutableList.of("__time")) - .dataSource("source") - .intervals(intervalSpec) - .build(); + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of()) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("__time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } - Set orders = ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING); + Set orderedOrders = ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING); - for (ScanQuery.Order order : orders) { + for (ScanQuery.Order order : orderedOrders) { Druids.newScanQueryBuilder() .order(order) .columns((List) null) From 596585de462f69863ff4ae5d10a3c5101d1048a2 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 16:50:40 -0700 Subject: [PATCH 13/14] Fix ScanQueryRunnerTest --- .../druid/query/scan/ScanQueryRunnerTest.java | 269 +++++++++++++----- 1 file changed, 190 insertions(+), 79 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index b3a0d0069ac1..c3cba7913682 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -67,6 +67,7 @@ import java.util.Set; /** + * */ @RunWith(Parameterized.class) public class ScanQueryRunnerTest @@ -143,11 +144,11 @@ public ScanQueryRunnerTest(final QueryRunner runner, final boolean legacy) private Druids.ScanQueryBuilder newTestQuery() { return Druids.newScanQueryBuilder() - .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) - .columns(Collections.emptyList()) - .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) - .limit(3) - .legacy(legacy); + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .columns(Collections.emptyList()) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .limit(3) + .legacy(legacy); } @Test @@ -524,7 +525,11 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat() ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .limit(limit) .order(ScanQuery.Order.ASCENDING) .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) @@ -556,7 +561,7 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat() }; final List>> ascendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -565,9 +570,35 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat() }, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); + + if (legacy) { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + } + } + } + List ascendingExpectedResults = toExpected( ascendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -583,7 +614,11 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat() ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .limit(limit) .order(ScanQuery.Order.DESCENDING) .build(); @@ -616,7 +651,7 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat() ArrayUtils.reverse(expectedRet); final List>> descendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -625,9 +660,34 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat() }, expectedRet ); + if (legacy) { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + } + } + } List descendingExpectedResults = toExpected( descendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -666,7 +726,11 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedList ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .order(ScanQuery.Order.ASCENDING) .limit(limit) @@ -676,7 +740,7 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedList Iterable results = runner.run(QueryPlus.wrap(query), context).toList(); final List>> ascendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -685,9 +749,34 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedList }, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); + if (legacy) { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + } + } + } List ascendingExpectedResults = toExpected( ascendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -727,7 +816,11 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .order(ScanQuery.Order.DESCENDING) .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) @@ -740,7 +833,7 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis ArrayUtils.reverse(expectedRet); final List>> descendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -749,9 +842,34 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis }, expectedRet //segments in reverse order from above ); + if (legacy) { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + } + } + } List descendingExpectedResults = toExpected( descendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -760,7 +878,6 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis } } - private List>> toFullEvents(final String[]... valueSet) { return toEvents( @@ -799,71 +916,66 @@ private List>> toEvents(final String[] dimSpecs, final Lists.newArrayList( Iterables.transform( values, - new Function>() - { - @Override - public Map apply(String input) - { - Map event = new HashMap<>(); - String[] values = input.split("\\t"); - for (int i = 0; i < dimSpecs.length; i++) { - if (dimSpecs[i] == null || i >= dimSpecs.length) { - continue; - } - - // For testing metrics and virtual columns we have some special handling here, since - // they don't appear in the source data. - if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) { - event.put( - EXPR_COLUMN.getOutputName(), - (double) event.get(QueryRunnerTestHelper.indexMetric) * 2 - ); - continue; - } else if (dimSpecs[i].equals("indexMin")) { - event.put("indexMin", (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexFloat")) { - event.put("indexFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexMaxPlusTen")) { - event.put("indexMaxPlusTen", (double) event.get(QueryRunnerTestHelper.indexMetric) + 10); - continue; - } else if (dimSpecs[i].equals("indexMinFloat")) { - event.put("indexMinFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexMaxFloat")) { - event.put("indexMaxFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("quality_uniques")) { - final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); - collector.add( - Hashing.murmur3_128() - .hashBytes(StringUtils.toUtf8((String) event.get("quality"))) - .asBytes() - ); - event.put("quality_uniques", collector); - } - - if (i >= values.length) { - continue; - } - - String[] specs = dimSpecs[i].split(":"); + input -> { + Map event = new HashMap<>(); + String[] values1 = input.split("\\t"); + for (int i = 0; i < dimSpecs.length; i++) { + if (dimSpecs[i] == null || i >= dimSpecs.length) { + continue; + } + // For testing metrics and virtual columns we have some special handling here, since + // they don't appear in the source data. + if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) { event.put( - specs[0], - specs.length == 1 || specs[1].equals("STRING") ? values[i] : - specs[1].equals("TIME") ? toTimestamp(values[i]) : - specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : - specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : - specs[1].equals("LONG") ? Long.valueOf(values[i]) : - specs[1].equals("NULL") ? null : - specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) : - values[i] + EXPR_COLUMN.getOutputName(), + (double) event.get(QueryRunnerTestHelper.indexMetric) * 2 ); + continue; + } else if (dimSpecs[i].equals("indexMin")) { + event.put("indexMin", (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexFloat")) { + event.put("indexFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxPlusTen")) { + event.put("indexMaxPlusTen", (double) event.get(QueryRunnerTestHelper.indexMetric) + 10); + continue; + } else if (dimSpecs[i].equals("indexMinFloat")) { + event.put("indexMinFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxFloat")) { + event.put("indexMaxFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("quality_uniques")) { + final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + collector.add( + Hashing.murmur3_128() + .hashBytes(StringUtils.toUtf8((String) event.get("quality"))) + .asBytes() + ); + event.put("quality_uniques", collector); + } + + if (i >= values1.length) { + continue; } - return event; + + String[] specs = dimSpecs[i].split(":"); + + event.put( + specs[0], + specs.length == 1 || specs[1].equals("STRING") ? values1[i] : + specs[1].equals("TIME") ? toTimestamp(values1[i]) : + specs[1].equals("FLOAT") ? Float.valueOf(values1[i]) : + specs[1].equals("DOUBLE") ? Double.valueOf(values1[i]) : + specs[1].equals("LONG") ? Long.valueOf(values1[i]) : + specs[1].equals("NULL") ? null : + specs[1].equals("STRINGS") ? Arrays.asList(values1[i].split("\u0001")) : + values1[i] + ); } + return event; } ) ) @@ -969,7 +1081,6 @@ public static void verify( } else { Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal); } - } } From 67f34768dbca3df896f005204f05fc035432475c Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 17:59:39 -0700 Subject: [PATCH 14/14] Forbidden APIs fixed --- .../org/apache/druid/query/scan/ScanQueryRunnerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index c3cba7913682..2928ca9460ed 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -580,7 +580,7 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat() } else { for (List> batch : ascendingEvents) { for (Map event : batch) { - event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis()); } } } @@ -669,7 +669,7 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat() } else { for (List> batch : descendingEvents) { for (Map event : batch) { - event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis()); } } } @@ -758,7 +758,7 @@ public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedList } else { for (List> batch : ascendingEvents) { for (Map event : batch) { - event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis())); } } } @@ -851,7 +851,7 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis } else { for (List> batch : descendingEvents) { for (Map event : batch) { - event.put("__time", (new DateTime(event.get("__time"))).getMillis()); + event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis())); } } }