From d62807e9fbfd854b6a3cc3880fc1f42f47388087 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 13:50:18 +0300 Subject: [PATCH 01/15] Refactored ResponseContext and aggregated its keys into Enum --- .../MovingAverageQueryRunner.java | 18 ++- ...portTimelineMissingSegmentQueryRunner.java | 11 +- .../apache/druid/query/RetryQueryRunner.java | 4 +- .../druid/query/context/ResponseContext.java | 141 +++++++++++++----- .../druid/query/scan/ScanQueryEngine.java | 19 +-- .../query/scan/ScanQueryRunnerFactory.java | 8 +- .../spec/SpecificSegmentQueryRunner.java | 15 +- .../druid/query/RetryQueryRunnerTest.java | 80 +++++----- .../druid/query/UnionQueryRunnerTest.java | 14 +- .../DataSourceMetadataQueryTest.java | 2 +- .../spec/SpecificSegmentQueryRunnerTest.java | 2 +- .../TimeBoundaryQueryRunnerTest.java | 4 +- .../druid/query/topn/TopNQueryRunnerTest.java | 4 +- .../druid/client/CachingClusteredClient.java | 6 +- .../druid/client/DirectDruidClient.java | 8 +- .../query/ResultLevelCachingQueryRunner.java | 2 +- .../apache/druid/server/QueryResource.java | 8 +- ...chingClusteredClientFunctionalityTest.java | 6 +- .../client/CachingClusteredClientTest.java | 6 +- 19 files changed, 204 insertions(+), 154 deletions(-) diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java index 53c4422575a2..289d2ed6e3d4 100644 --- a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java +++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java @@ -67,10 +67,6 @@ */ public class MovingAverageQueryRunner implements QueryRunner { - - public static final String QUERY_FAIL_TIME = "queryFailTime"; - public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered"; - private final QuerySegmentWalker walker; private final RequestLogger requestLogger; @@ -126,8 +122,11 @@ public Sequence run(QueryPlus query, ResponseContext responseContext) GroupByQuery gbq = builder.build(); ResponseContext gbqResponseContext = ResponseContext.createEmpty(); - gbqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq)); - gbqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + gbqResponseContext.put( + ResponseContext.Key.QUERY_FAIL_TIME, + System.currentTimeMillis() + QueryContexts.getTimeout(gbq) + ); + gbqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); Sequence results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponseContext); try { @@ -164,8 +163,11 @@ public Sequence run(QueryPlus query, ResponseContext responseContext) maq.getContext() ); ResponseContext tsqResponseContext = ResponseContext.createEmpty(); - tsqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq)); - tsqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + tsqResponseContext.put( + ResponseContext.Key.QUERY_FAIL_TIME, + System.currentTimeMillis() + QueryContexts.getTimeout(tsq) + ); + tsqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); Sequence> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponseContext); try { diff --git a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java index 97b6aa27fd08..7d30c560938c 100644 --- a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -23,8 +23,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.context.ResponseContext; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; /** */ @@ -40,13 +39,7 @@ public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) @Override public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { - List missingSegments = - (List) responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS); - if (missingSegments == null) { - missingSegments = new ArrayList<>(); - responseContext.put(ResponseContext.CTX_MISSING_SEGMENTS, missingSegments); - } - missingSegments.add(descriptor); + responseContext.merge(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor)); return Sequences.empty(); } } diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java index 28bcf0b69933..6b991b870575 100644 --- a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -72,7 +72,7 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat for (int i = 0; i < config.getNumTries(); i++) { log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( new MultipleSpecificSegmentSpec( missingSegments @@ -102,7 +102,7 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat private List getMissingSegments(final ResponseContext context) { - final Object maybeMissingSegments = context.get(ResponseContext.CTX_MISSING_SEGMENTS); + final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS); if (maybeMissingSegments == null) { return new ArrayList<>(); } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 93841f482fd8..5c95fe28c81e 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -23,49 +23,98 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.joda.time.Interval; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.function.BiFunction; /** * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s. * The context is also transferred between Druid nodes with all the data it contains. - * All the keys associated with data inside the context should be stored here. - * CTX_* keys might be aggregated into an enum. Consider refactoring that. */ @PublicApi public abstract class ResponseContext { /** - * Lists intervals for which NO segment is present. + * Keys associated with objects in the context. */ - public static final String CTX_UNCOVERED_INTERVALS = "uncoveredIntervals"; - /** - * Indicates if the number of uncovered intervals exceeded the limit (true/false). - */ - public static final String CTX_UNCOVERED_INTERVALS_OVERFLOWED = "uncoveredIntervalsOverflowed"; - /** - * Lists missing segments. - */ - public static final String CTX_MISSING_SEGMENTS = "missingSegments"; - /** - * Entity tag. A part of HTTP cache validation mechanism. - * Is being removed from the context before sending and used as a separate HTTP header. - */ - public static final String CTX_ETAG = "ETag"; - /** - * Query total bytes gathered. - */ - public static final String CTX_QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered"; - /** - * This variable indicates when a running query should be expired, - * and is effective only when 'timeout' of queryContext has a positive value. - */ - public static final String CTX_TIMEOUT_AT = "timeoutAt"; - /** - * The number of scanned rows. - */ - public static final String CTX_COUNT = "count"; + public enum Key + { + /** + * Lists intervals for which NO segment is present. + */ + UNCOVERED_INTERVALS( + "uncoveredIntervals", + (oldValue, newValue) -> { + ((List) oldValue).addAll((List) newValue); + return oldValue; + } + ), + /** + * Indicates if the number of uncovered intervals exceeded the limit (true/false). + */ + UNCOVERED_INTERVALS_OVERFLOWED( + "uncoveredIntervalsOverflowed", + (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue + ), + /** + * Lists missing segments. + */ + MISSING_SEGMENTS( + "missingSegments", + (oldValue, newValue) -> { + ((List) oldValue).addAll((List) newValue); + return oldValue; + } + ), + /** + * Entity tag. A part of HTTP cache validation mechanism. + * Is being removed from the context before sending and used as a separate HTTP header. + */ + ETAG("ETag"), + /** + * Query fail time (current time + timeout). + */ + QUERY_FAIL_TIME("queryFailTime"), + /** + * Query total bytes gathered. + */ + QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"), + /** + * This variable indicates when a running query should be expired, + * and is effective only when 'timeout' of queryContext has a positive value. + */ + TIMEOUT_AT("timeoutAt"), + /** + * The number of scanned rows. + */ + COUNT( + "count", + (oldValue, newValue) -> (long) oldValue + (long) newValue + ); + + private final String name; + /** + * Merge function associated with a key: Object (Object oldValue, Object newValue) + */ + private final BiFunction mergeFunction; + + Key(String name) + { + this.name = name; + this.mergeFunction = (oldValue, newValue) -> newValue; + } + + Key(String name, BiFunction mergeFunction) + { + this.name = name; + this.mergeFunction = mergeFunction; + } + + } /** * Create an empty DefaultResponseContext instance @@ -78,29 +127,41 @@ public static ResponseContext createEmpty() protected abstract Map getDelegate(); - public Object put(String key, Object value) + public Object put(Key key, Object value) { - return getDelegate().put(key, value); + return getDelegate().put(key.name, value); } - public Object get(String key) + public Object get(Key key) { - return getDelegate().get(key); + return getDelegate().get(key.name); } - public Object remove(String key) + public Object remove(Key key) { - return getDelegate().remove(key); + return getDelegate().remove(key.name); } - public void putAll(Map m) + /** + * Merges a new value associated with a key with an old value. + */ + public Object merge(Key key, Object value) { - getDelegate().putAll(m); + return getDelegate().merge(key.name, value, key.mergeFunction); } - public void putAll(ResponseContext responseContext) + /** + * Merges a response context into current. + * This method merges only keys from the enum {@link Key}. + */ + public void merge(ResponseContext responseContext) { - getDelegate().putAll(responseContext.getDelegate()); + for (Key key : Key.values()) { + final Object newValue = responseContext.get(key); + if (newValue != null) { + merge(key, newValue); + } + } } public int size() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 8d0bf512961a..fbd452df6bb4 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -67,14 +67,14 @@ public Sequence process( // "legacy" should be non-null due to toolChest.mergeResults final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "WTF?! Expected non-null legacy"); - if (responseContext.get(ResponseContext.CTX_COUNT) != null) { - long count = (long) responseContext.get(ResponseContext.CTX_COUNT); + if (responseContext.get(ResponseContext.Key.COUNT) != null) { + long count = (long) responseContext.get(ResponseContext.Key.COUNT); if (count >= query.getLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) { return Sequences.empty(); } } final boolean hasTimeout = QueryContexts.hasTimeout(query); - final long timeoutAt = (long) responseContext.get(ResponseContext.CTX_TIMEOUT_AT); + final long timeoutAt = (long) responseContext.get(ResponseContext.Key.TIMEOUT_AT); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -121,9 +121,7 @@ public Sequence process( final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); - if (responseContext.get(ResponseContext.CTX_COUNT) == null) { - responseContext.put(ResponseContext.CTX_COUNT, 0L); - } + responseContext.merge(ResponseContext.Key.COUNT, 0L); final long limit = calculateLimit(query, responseContext); return Sequences.concat( adapter @@ -187,13 +185,10 @@ public ScanResultValue next() } else { throw new UOE("resultFormat[%s] is not supported", resultFormat.toString()); } - responseContext.put( - ResponseContext.CTX_COUNT, - (long) responseContext.get(ResponseContext.CTX_COUNT) + (offset - lastOffset) - ); + responseContext.merge(ResponseContext.Key.COUNT, offset - lastOffset); if (hasTimeout) { responseContext.put( - ResponseContext.CTX_TIMEOUT_AT, + ResponseContext.Key.TIMEOUT_AT, timeoutAt - (System.currentTimeMillis() - start) ); } @@ -266,7 +261,7 @@ public void cleanup(Iterator iterFromMake) private long calculateLimit(ScanQuery query, ResponseContext responseContext) { if (query.getOrder().equals(ScanQuery.Order.NONE)) { - return query.getLimit() - (long) responseContext.get(ResponseContext.CTX_COUNT); + return query.getLimit() - (long) responseContext.get(ResponseContext.Key.COUNT); } return query.getLimit(); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 570819a3bc52..7bae4bfc75b3 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -92,9 +92,9 @@ public QueryRunner mergeRunners( ScanQuery query = (ScanQuery) queryPlus.getQuery(); // Note: this variable is effective only when queryContext has a timeout. - // See the comment of CTX_TIMEOUT_AT. + // See the comment of ResponseContext.Key.TIMEOUT_AT. final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); - responseContext.put(ResponseContext.CTX_TIMEOUT_AT, timeoutAt); + responseContext.put(ResponseContext.Key.TIMEOUT_AT, timeoutAt); if (query.getOrder().equals(ScanQuery.Order.NONE)) { // Use normal strategy @@ -370,9 +370,9 @@ public Sequence run(QueryPlus queryPlus, Respo } // it happens in unit tests - final Number timeoutAt = (Number) responseContext.get(ResponseContext.CTX_TIMEOUT_AT); + final Number timeoutAt = (Number) responseContext.get(ResponseContext.Key.TIMEOUT_AT); if (timeoutAt == null || timeoutAt.longValue() == 0L) { - responseContext.put(ResponseContext.CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); + responseContext.put(ResponseContext.Key.TIMEOUT_AT, JodaUtils.MAX_INSTANT); } return engine.process((ScanQuery) query, segment, responseContext); } diff --git a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java index 94c5f8fc8e67..b19e0514c4a6 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java @@ -31,13 +31,11 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.segment.SegmentMissingException; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; /** */ @@ -152,13 +150,10 @@ public RetType wrap(Supplier sequenceProcessing) private void appendMissingSegment(ResponseContext responseContext) { - List missingSegments = - (List) responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS); - if (missingSegments == null) { - missingSegments = new ArrayList<>(); - responseContext.put(ResponseContext.CTX_MISSING_SEGMENTS, missingSegments); - } - missingSegments.add(specificSpec.getDescriptor()); + responseContext.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(specificSpec.getDescriptor()) + ); } private RetType doNamed(Thread currThread, String currName, String newName, Supplier toRun) diff --git a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java index 0b143e3bd3a0..7576670ab0db 100644 --- a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java @@ -90,15 +90,16 @@ public boolean isReturnPartialResults() public void testRunWithMissingSegments() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override public Sequence> run(QueryPlus queryPlus, ResponseContext context) { - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1) + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); return Sequences.empty(); } @@ -124,7 +125,7 @@ public boolean isReturnPartialResults() Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 1 + ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1 ); Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); } @@ -134,8 +135,8 @@ public boolean isReturnPartialResults() public void testRetry() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put("count", 0); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -145,11 +146,12 @@ public Sequence> run( ResponseContext context ) { - if ((int) context.get("count") == 0) { - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1) + if ((int) context.get(ResponseContext.Key.COUNT) == 0) { + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); - context.put("count", 1); + context.put(ResponseContext.Key.COUNT, 1); return Sequences.empty(); } else { return Sequences.simple( @@ -174,7 +176,7 @@ public Sequence> run( Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0 + ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 ); } @@ -182,8 +184,8 @@ public Sequence> run( public void testRetryMultiple() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put("count", 0); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -193,11 +195,12 @@ public Sequence> run( ResponseContext context ) { - if ((int) context.get("count") < 3) { - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1) + if ((int) context.get(ResponseContext.Key.COUNT) < 3) { + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); - context.put("count", (int) context.get("count") + 1); + context.put(ResponseContext.Key.COUNT, (int) context.get(ResponseContext.Key.COUNT) + 1); return Sequences.empty(); } else { return Sequences.simple( @@ -222,7 +225,7 @@ public Sequence> run( Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0 + ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 ); } @@ -230,7 +233,7 @@ public Sequence> run( public void testException() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -240,8 +243,9 @@ public Sequence> run( ResponseContext context ) { - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1) + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); return Sequences.empty(); } @@ -254,7 +258,7 @@ public Sequence> run( Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 1 + ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1 ); } @@ -262,8 +266,8 @@ public Sequence> run( public void testNoDuplicateRetry() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put("count", 0); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @@ -274,15 +278,16 @@ public Sequence> run( ) { final Query> query = queryPlus.getQuery(); - if ((int) context.get("count") == 0) { + if ((int) context.get(ResponseContext.Key.COUNT) == 0) { // assume 2 missing segments at first run - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1) - ); - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2) + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Arrays.asList( + new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1), + new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2) + ) ); - context.put("count", 1); + context.put(ResponseContext.Key.COUNT, 1); return Sequences.simple( Collections.singletonList( new Result<>( @@ -293,14 +298,15 @@ public Sequence> run( ) ) ); - } else if ((int) context.get("count") == 1) { + } else if ((int) context.get(ResponseContext.Key.COUNT) == 1) { // this is first retry Assert.assertTrue("Should retry with 2 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 2); // assume only left 1 missing at first retry - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).add( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2) + context.merge( + ResponseContext.Key.MISSING_SEGMENTS, + Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)) ); - context.put("count", 2); + context.put(ResponseContext.Key.COUNT, 2); return Sequences.simple( Collections.singletonList( new Result<>( @@ -315,7 +321,7 @@ public Sequence> run( // this is second retry Assert.assertTrue("Should retry with 1 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 1); // assume no more missing at second retry - context.put("count", 3); + context.put(ResponseContext.Key.COUNT, 3); return Sequences.simple( Collections.singletonList( new Result<>( @@ -338,7 +344,7 @@ public Sequence> run( Assert.assertTrue("Should return a list with 3 elements", ((List) actualResults).size() == 3); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.CTX_MISSING_SEGMENTS)).size() == 0 + ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 ); } } diff --git a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java index 78b7712ec4cc..a64c31301f83 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionQueryRunnerTest.java @@ -28,12 +28,15 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class UnionQueryRunnerTest { @Test public void testUnionQueryRunner() { + AtomicBoolean ds1 = new AtomicBoolean(false); + AtomicBoolean ds2 = new AtomicBoolean(false); QueryRunner baseRunner = new QueryRunner() { @Override @@ -43,10 +46,10 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource); String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames()); if ("ds1".equals(dsName)) { - responseContext.put("ds1", "ds1"); + ds1.compareAndSet(false, true); return Sequences.simple(Arrays.asList(1, 2, 3)); } else if ("ds2".equals(dsName)) { - responseContext.put("ds2", "ds2"); + ds2.compareAndSet(false, true); return Sequences.simple(Arrays.asList(4, 5, 6)); } else { throw new AssertionError("Unexpected DataSource"); @@ -71,11 +74,8 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) Sequence result = runner.run(QueryPlus.wrap(q), responseContext); List res = result.toList(); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); - - // verify response context - Assert.assertEquals(2, responseContext.size()); - Assert.assertEquals("ds1", responseContext.get("ds1")); - Assert.assertEquals("ds2", responseContext.get("ds2")); + Assert.assertEquals(true, ds1.get()); + Assert.assertEquals(true, ds2.get()); } } diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index dd5bedf7d275..bb71f8e023e7 100644 --- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -139,7 +139,7 @@ public void testMaxIngestedEventTime() throws Exception .dataSource("testing") .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); Iterable> results = runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList(); DataSourceMetadataResultValue val = results.iterator().next().getValue(); diff --git a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java index f1b185aa77d4..f4e1c31a2186 100644 --- a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -197,7 +197,7 @@ public void run() private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, ResponseContext responseContext) throws IOException { - Object missingSegments = responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS); + Object missingSegments = responseContext.get(ResponseContext.Key.MISSING_SEGMENTS); Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments instanceof List); diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 5e3a63495079..325a477e8e41 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -216,7 +216,7 @@ public void testTimeBoundaryMax() .bound(TimeBoundaryQuery.MAX_TIME) .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); Iterable> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList(); TimeBoundaryResultValue val = results.iterator().next().getValue(); DateTime minTime = val.getMinTime(); @@ -235,7 +235,7 @@ public void testTimeBoundaryMin() .bound(TimeBoundaryQuery.MIN_TIME) .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>()); + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); Iterable> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList(); TimeBoundaryResultValue val = results.iterator().next().getValue(); DateTime minTime = val.getMinTime(); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 6a06d7354d68..6aa98412a8f0 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -1301,9 +1301,7 @@ public void testTopNBySegment() ) ); - final ResponseContext responseContext = ResponseContext.createEmpty(); - responseContext.putAll(specialContext); - Sequence> results = runWithMerge(query, responseContext); + Sequence> results = runWithMerge(query); List> resultList = results .map((Result input) -> { // Stupid type erasure diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 06c45309c6ad..09871b8dce58 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -358,8 +358,8 @@ private void computeUncoveredIntervals(TimelineLookup ti // Which is not necessarily an indication that the data doesn't exist or is // incomplete. The data could exist and just not be loaded yet. In either // case, though, this query will not include any data from the identified intervals. - responseContext.put(ResponseContext.CTX_UNCOVERED_INTERVALS, uncoveredIntervals); - responseContext.put(ResponseContext.CTX_UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); + responseContext.merge(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals); + responseContext.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); } } @@ -396,7 +396,7 @@ private String computeCurrentEtag(final Set segments, @Nullable hasher.putBytes(queryCacheKey == null ? strategy.computeCacheKey(query) : queryCacheKey); String currEtag = StringUtils.encodeBase64String(hasher.hash().asBytes()); - responseContext.put(ResponseContext.CTX_ETAG, currEtag); + responseContext.put(ResponseContext.Key.ETAG, currEtag); return currEtag; } else { return null; diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 98802402ec8a..717b4ae5aab0 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -101,13 +101,13 @@ public class DirectDruidClient implements QueryRunner */ public static void removeMagicResponseContextFields(ResponseContext responseContext) { - responseContext.remove(ResponseContext.CTX_QUERY_TOTAL_BYTES_GATHERED); + responseContext.remove(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED); } public static ResponseContext makeResponseContextForQuery() { final ResponseContext responseContext = ConcurrentResponseContext.createEmpty(); - responseContext.put(ResponseContext.CTX_QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + responseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); return responseContext; } @@ -156,7 +156,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext conte final long requestStartTimeNs = System.nanoTime(); final long timeoutAt = query.getContextValue(QUERY_FAIL_TIME); final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); - final AtomicLong totalBytesGathered = (AtomicLong) context.get(ResponseContext.CTX_QUERY_TOTAL_BYTES_GATHERED); + final AtomicLong totalBytesGathered = (AtomicLong) context.get(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED); final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, 0); final boolean usingBackpressure = maxQueuedBytes > 0; @@ -230,7 +230,7 @@ public ClientResponse handleResponse(HttpResponse response, Traffic final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT); // context may be null in case of error or query timeout if (responseContext != null) { - context.putAll(ResponseContext.deserialize(responseContext, objectMapper)); + context.merge(ResponseContext.deserialize(responseContext, objectMapper)); } continueReading = enqueue(response.getContent(), 0L); } diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index addad8defcb4..ac1636c1e644 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -92,7 +92,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) QueryPlus.wrap(query), responseContext ); - String newResultSetId = (String) responseContext.get(ResponseContext.CTX_ETAG); + String newResultSetId = (String) responseContext.get(ResponseContext.Key.ETAG); if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 15124864a36a..c76a81561af3 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -199,7 +199,7 @@ public Response doPost( final ResponseContext responseContext = queryResponse.getResponseContext(); final String prevEtag = getPreviousEtag(req); - if (prevEtag != null && prevEtag.equals(responseContext.get(ResponseContext.CTX_ETAG))) { + if (prevEtag != null && prevEtag.equals(responseContext.get(ResponseContext.Key.ETAG))) { queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1); successfulQueryCount.incrementAndGet(); return Response.notModified().build(); @@ -252,9 +252,9 @@ public void write(OutputStream outputStream) throws WebApplicationException ) .header("X-Druid-Query-Id", queryId); - if (responseContext.get(ResponseContext.CTX_ETAG) != null) { - builder.header(HEADER_ETAG, responseContext.get(ResponseContext.CTX_ETAG)); - responseContext.remove(ResponseContext.CTX_ETAG); + if (responseContext.get(ResponseContext.Key.ETAG) != null) { + builder.header(HEADER_ETAG, responseContext.get(ResponseContext.Key.ETAG)); + responseContext.remove(ResponseContext.Key.ETAG); } DirectDruidClient.removeMagicResponseContextFields(responseContext); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index b261442647dd..9a7bb900a5fa 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -125,7 +125,7 @@ public void testUncoveredInterval() ResponseContext responseContext = ResponseContext.createEmpty(); runQuery(client, builder.build(), responseContext); - Assert.assertNull(responseContext.get("uncoveredIntervals")); + Assert.assertNull(responseContext.get(ResponseContext.Key.UNCOVERED_INTERVALS)); builder.intervals("2015-01-01/2015-01-03"); responseContext = ResponseContext.createEmpty(); @@ -174,8 +174,8 @@ private void assertUncovered(ResponseContext context, boolean uncoveredIntervals for (String interval : intervals) { expectedList.add(Intervals.of(interval)); } - Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals")); - Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed")); + Assert.assertEquals((Object) expectedList, context.get(ResponseContext.Key.UNCOVERED_INTERVALS)); + Assert.assertEquals(uncoveredIntervalsOverflowed, context.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); } private void addToTimeline(Interval interval, String version) diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 6d59d7f14d36..24bb8f531d30 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -3157,7 +3157,7 @@ public void testIfNoneMatch() ResponseContext responseContext = ResponseContext.createEmpty(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.CTX_ETAG)); + Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG)); } @Test @@ -3203,9 +3203,9 @@ public void testEtagforDifferentQueryInterval() final ResponseContext responseContext = ResponseContext.createEmpty(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - final Object etag1 = responseContext.get("ETag"); + final Object etag1 = responseContext.get(ResponseContext.Key.ETAG); getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); - final Object etag2 = responseContext.get("ETag"); + final Object etag2 = responseContext.get(ResponseContext.Key.ETAG); Assert.assertNotEquals(etag1, etag2); } From 7c0d71d52354cd9196c312890827d01a2166cbfa Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 17:33:05 +0300 Subject: [PATCH 02/15] Added unit tests for ResponseContext and refactored the serialization --- .../druid/query/CPUTimeMetricQueryRunner.java | 1 + .../druid/query/context/ResponseContext.java | 117 +++++++++++++++--- .../query/context/ResponseContextTest.java | 117 ++++++++++++++++++ .../apache/druid/server/QueryResource.java | 16 ++- 4 files changed, 229 insertions(+), 22 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java diff --git a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java index 594a3273e8ed..5e2785b6c8c1 100644 --- a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java @@ -84,6 +84,7 @@ public void after(boolean isDone, Throwable thrown) if (report) { final long cpuTimeNs = cpuTimeAccumulator.get(); if (cpuTimeNs > 0) { + responseContext.merge(ResponseContext.Key.CPU_CONSUMED, cpuTimeNs); queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 5c95fe28c81e..c4fb9f658ebd 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -21,14 +21,19 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.function.BiFunction; /** @@ -49,8 +54,9 @@ public enum Key UNCOVERED_INTERVALS( "uncoveredIntervals", (oldValue, newValue) -> { - ((List) oldValue).addAll((List) newValue); - return oldValue; + final ArrayList result = new ArrayList((List) oldValue); + result.addAll((List) newValue); + return result; } ), /** @@ -66,8 +72,9 @@ public enum Key MISSING_SEGMENTS( "missingSegments", (oldValue, newValue) -> { - ((List) oldValue).addAll((List) newValue); - return oldValue; + final ArrayList result = new ArrayList((List) oldValue); + result.addAll((List) newValue); + return result; } ), /** @@ -94,6 +101,13 @@ public enum Key COUNT( "count", (oldValue, newValue) -> (long) oldValue + (long) newValue + ), + /** + * CPU consumed while processing a request. + */ + CPU_CONSUMED( + "cpuConsumed", + (oldValue, newValue) -> (long) oldValue + (long) newValue ); private final String name; @@ -113,7 +127,6 @@ public enum Key this.name = name; this.mergeFunction = mergeFunction; } - } /** @@ -125,6 +138,22 @@ public static ResponseContext createEmpty() return DefaultResponseContext.createEmpty(); } + public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException + { + final Map delegate = objectMapper.readValue( + responseContext, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + return new ResponseContext() + { + @Override + protected Map getDelegate() + { + return delegate; + } + }; + } + protected abstract Map getDelegate(); public Object put(Key key, Object value) @@ -174,19 +203,73 @@ public String serializeWith(ObjectMapper objectMapper) throws JsonProcessingExce return objectMapper.writeValueAsString(getDelegate()); } - public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException + /** + * Serializes the context given that the resulting string length is less than the provided limit. + * The method removes max-length fields one by one if the resulting string length is greater than the limit. + * The resulting string might be correctly deserialized as a {@link ResponseContext}. + */ + public SerializationResult serializeWith(ObjectMapper objectMapper, int maxLength) throws JsonProcessingException { - final Map delegate = objectMapper.readValue( - responseContext, - JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - return new ResponseContext() - { - @Override - protected Map getDelegate() - { - return delegate; + final String fullSerializedString = objectMapper.writeValueAsString(getDelegate()); + if (fullSerializedString.length() <= maxLength) { + return new SerializationResult(fullSerializedString, fullSerializedString, false); + } else { + final HashMap copiedMap = new HashMap<>(getDelegate()); + final PriorityQueue> serializedPairs = new PriorityQueue<>((o1, o2) -> { + Preconditions.checkNotNull(o1.rhs); + Preconditions.checkNotNull(o2.rhs); + return o2.rhs.length() - o1.rhs.length(); + }); + for (Map.Entry e : copiedMap.entrySet()) { + serializedPairs.add(new Pair<>(e.getKey(), objectMapper.writeValueAsString(e.getValue()))); } - }; + while (!copiedMap.isEmpty()) { + final Pair maxLengthPair = serializedPairs.poll(); + Preconditions.checkNotNull(maxLengthPair); + copiedMap.remove(maxLengthPair.lhs); + final String reducedSerializedString = objectMapper.writeValueAsString(copiedMap); + if (reducedSerializedString.length() <= maxLength) { + return new SerializationResult(reducedSerializedString, fullSerializedString, true); + } + } + final String serializedEmptyMap = objectMapper.writeValueAsString(copiedMap); + return new SerializationResult(serializedEmptyMap, fullSerializedString, true); + } + } + + /** + * Serialization result of {@link ResponseContext}. + * Response context might be serialized using max legth limit, in this case the context might be reduced + * by removing max-length fields one by one unless serialization result length is less than the limit. + * This structure has a reduced serialization result along with full result and boolean property + * indicating if some fields were removed from the context. + */ + public class SerializationResult + { + private final String result; + private final String fullResult; + private final Boolean isReduced; + + SerializationResult(String result, String fullResult, Boolean isReduced) + { + this.result = result; + this.fullResult = fullResult; + this.isReduced = isReduced; + } + + public String getResult() + { + return result; + } + + public String getFullResult() + { + return fullResult; + } + + public Boolean isReduced() + { + return isReduced; + } } } diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java new file mode 100644 index 000000000000..274a13906267 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -0,0 +1,117 @@ +package org.apache.druid.query.context; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.query.SegmentDescriptor; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class ResponseContextTest +{ + + @Test + public void mergeValueTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.merge(ResponseContext.Key.ETAG, "dummy-etag"); + Assert.assertEquals("dummy-etag", ctx.get(ResponseContext.Key.ETAG)); + ctx.merge(ResponseContext.Key.ETAG, "new-dummy-etag"); + Assert.assertEquals("new-dummy-etag", ctx.get(ResponseContext.Key.ETAG)); + + final Interval interval01 = new Interval(0L, 1L); + ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); + Assert.assertArrayEquals( + Collections.singletonList(interval01).toArray(), + ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() + ); + final Interval interval12 = new Interval(1L, 2L); + final Interval interval23 = new Interval(2L, 3L); + ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23)); + Assert.assertArrayEquals( + Arrays.asList(interval01, interval12, interval23).toArray(), + ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() + ); + + final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); + ctx.merge(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); + Assert.assertArrayEquals( + Collections.singletonList(sd01).toArray(), + ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ); + final SegmentDescriptor sd12 = new SegmentDescriptor(interval12, "12", 1); + final SegmentDescriptor sd23 = new SegmentDescriptor(interval23, "23", 2); + ctx.merge(ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList(sd12, sd23)); + Assert.assertArrayEquals( + Arrays.asList(sd01, sd12, sd23).toArray(), + ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ); + + ctx.merge(ResponseContext.Key.COUNT, 0L); + Assert.assertEquals(0L, ctx.get(ResponseContext.Key.COUNT)); + ctx.merge(ResponseContext.Key.COUNT, 1L); + Assert.assertEquals(1L, ctx.get(ResponseContext.Key.COUNT)); + ctx.merge(ResponseContext.Key.COUNT, 3L); + Assert.assertEquals(4L, ctx.get(ResponseContext.Key.COUNT)); + + ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); + Assert.assertEquals(false, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); + ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, true); + Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); + ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); + Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); + } + + @Test + public void mergeResponseContextTest() + { + final ResponseContext ctx1 = ResponseContext.createEmpty(); + ctx1.put(ResponseContext.Key.ETAG, "dummy-etag-1"); + final Interval interval01 = new Interval(0L, 1L); + ctx1.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); + ctx1.put(ResponseContext.Key.COUNT, 1L); + + final ResponseContext ctx2 = ResponseContext.createEmpty(); + ctx2.put(ResponseContext.Key.ETAG, "dummy-etag-2"); + final Interval interval12 = new Interval(1L, 2L); + ctx2.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval12)); + final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); + ctx2.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); + ctx2.put(ResponseContext.Key.COUNT, 2L); + + ctx1.merge(ctx2); + Assert.assertEquals("dummy-etag-2", ctx1.get(ResponseContext.Key.ETAG)); + Assert.assertEquals(3L, ctx1.get(ResponseContext.Key.COUNT)); + Assert.assertArrayEquals( + Arrays.asList(interval01, interval12).toArray(), + ((List) ctx1.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() + ); + Assert.assertArrayEquals( + Collections.singletonList(sd01).toArray(), + ((List) ctx1.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ); + } + + @Test + public void serializeWith() throws JsonProcessingException + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.put(ResponseContext.Key.COUNT, 100L); + ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); + final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); + final String fullString = ctx.serializeWith(objectMapper); + final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, 1000); + Assert.assertEquals(fullString, res1.getResult()); + final ResponseContext reducedCtx = ResponseContext.createEmpty(); + reducedCtx.merge(ctx); + final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 20); + reducedCtx.remove(ResponseContext.Key.ETAG); + Assert.assertEquals(reducedCtx.serializeWith(objectMapper), res2.getResult()); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index c76a81561af3..b8214cbabd3a 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -262,14 +262,20 @@ public void write(OutputStream outputStream) throws WebApplicationException //Limit the response-context header, see https://github.com/apache/incubator-druid/issues/2331 //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() //and encodes the string using ASCII, so 1 char is = 1 byte - String responseCtxString = responseContext.serializeWith(jsonMapper); - if (responseCtxString.length() > RESPONSE_CTX_HEADER_LEN_LIMIT) { - log.warn("Response Context truncated for id [%s] . Full context is [%s].", queryId, responseCtxString); - responseCtxString = responseCtxString.substring(0, RESPONSE_CTX_HEADER_LEN_LIMIT); + final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith( + jsonMapper, + RESPONSE_CTX_HEADER_LEN_LIMIT + ); + if (serializationResult.isReduced()) { + log.warn( + "Response Context truncated for id [%s] . Full context is [%s].", + queryId, + serializationResult.getFullResult() + ); } return builder - .header(HEADER_RESPONSE_CONTEXT, responseCtxString) + .header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult()) .build(); } catch (Exception e) { From 051b5e31551e4a3431c0b969c8f38ae4fc084130 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 17:37:14 +0300 Subject: [PATCH 03/15] Removed unused methods --- .../apache/druid/query/context/ResponseContext.java | 10 ---------- .../druid/query/context/ResponseContextTest.java | 6 ++---- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index c4fb9f658ebd..36624fa2bda0 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -193,16 +193,6 @@ public void merge(ResponseContext responseContext) } } - public int size() - { - return getDelegate().size(); - } - - public String serializeWith(ObjectMapper objectMapper) throws JsonProcessingException - { - return objectMapper.writeValueAsString(getDelegate()); - } - /** * Serializes the context given that the resulting string length is less than the provided limit. * The method removes max-length fields one by one if the resulting string length is greater than the limit. diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 274a13906267..40b7ade6f285 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -11,8 +11,6 @@ import java.util.Collections; import java.util.List; -import static org.junit.Assert.*; - public class ResponseContextTest { @@ -105,13 +103,13 @@ public void serializeWith() throws JsonProcessingException ctx.put(ResponseContext.Key.COUNT, 100L); ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); - final String fullString = ctx.serializeWith(objectMapper); + final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, 1000); Assert.assertEquals(fullString, res1.getResult()); final ResponseContext reducedCtx = ResponseContext.createEmpty(); reducedCtx.merge(ctx); final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 20); reducedCtx.remove(ResponseContext.Key.ETAG); - Assert.assertEquals(reducedCtx.serializeWith(objectMapper), res2.getResult()); + Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getResult()); } } \ No newline at end of file From 1c4fd67cf7323f7911c29cef3334e02bf8591d8a Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 17:47:25 +0300 Subject: [PATCH 04/15] Fixed code style --- .../druid/query/context/ResponseContext.java | 2 +- .../query/context/ResponseContextTest.java | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 36624fa2bda0..2a9e80261c66 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -107,7 +107,7 @@ public enum Key */ CPU_CONSUMED( "cpuConsumed", - (oldValue, newValue) -> (long) oldValue + (long) newValue + (oldValue, newValue) -> (long) oldValue + (long) newValue ); private final String name; diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 40b7ade6f285..55575b0e2b00 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.druid.query.context; import com.fasterxml.jackson.core.JsonProcessingException; @@ -112,4 +130,4 @@ public void serializeWith() throws JsonProcessingException reducedCtx.remove(ResponseContext.Key.ETAG); Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getResult()); } -} \ No newline at end of file +} From 8b30da2ac845ca1e94de5e2a62786398c2690e39 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 18:05:38 +0300 Subject: [PATCH 05/15] Fixed code style --- .../java/org/apache/druid/query/context/ResponseContextTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 55575b0e2b00..3580d9067ecc 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.druid.query.context; import com.fasterxml.jackson.core.JsonProcessingException; From 9ad6f2618866f08d729d20dd7c0fed7e3819e594 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 18:34:37 +0300 Subject: [PATCH 06/15] Fixed code style --- .../druid/query/context/ResponseContextTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 3580d9067ecc..f339ee90454e 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import org.junit.Assert; @@ -42,14 +43,14 @@ public void mergeValueTest() ctx.merge(ResponseContext.Key.ETAG, "new-dummy-etag"); Assert.assertEquals("new-dummy-etag", ctx.get(ResponseContext.Key.ETAG)); - final Interval interval01 = new Interval(0L, 1L); + final Interval interval01 = Intervals.of("2019-01-01/P1D"); ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); Assert.assertArrayEquals( Collections.singletonList(interval01).toArray(), ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() ); - final Interval interval12 = new Interval(1L, 2L); - final Interval interval23 = new Interval(2L, 3L); + final Interval interval12 = Intervals.of("2019-01-02/P1D"); + final Interval interval23 = Intervals.of("2019-01-03/P1D"); ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23)); Assert.assertArrayEquals( Arrays.asList(interval01, interval12, interval23).toArray(), @@ -90,13 +91,13 @@ public void mergeResponseContextTest() { final ResponseContext ctx1 = ResponseContext.createEmpty(); ctx1.put(ResponseContext.Key.ETAG, "dummy-etag-1"); - final Interval interval01 = new Interval(0L, 1L); + final Interval interval01 = Intervals.of("2019-01-01/P1D"); ctx1.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); ctx1.put(ResponseContext.Key.COUNT, 1L); final ResponseContext ctx2 = ResponseContext.createEmpty(); ctx2.put(ResponseContext.Key.ETAG, "dummy-etag-2"); - final Interval interval12 = new Interval(1L, 2L); + final Interval interval12 = Intervals.of("2019-01-02/P1D"); ctx2.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval12)); final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); ctx2.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); From 1d4df6f8a74c88edafb333f7b33332eaae484435 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 25 Jul 2019 19:22:35 +0300 Subject: [PATCH 07/15] Made SerializationResult static --- .../java/org/apache/druid/query/context/ResponseContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 2a9e80261c66..bd530ebb986d 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -234,7 +234,7 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxLengt * This structure has a reduced serialization result along with full result and boolean property * indicating if some fields were removed from the context. */ - public class SerializationResult + public static class SerializationResult { private final String result; private final String fullResult; From e9257b9b453be3303aaa9d373e7a75200ebc6880 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 26 Jul 2019 20:58:23 +0300 Subject: [PATCH 08/15] Updated according to the PR discussion: Renamed an argument Updated comparator Replaced Pair usage with Map.Entry Added a comment about quadratic complexity Removed boolean field with an expression Renamed SerializationResult field Renamed the method merge to add and renamed several context keys Renamed field and method related to scanRowsLimit Updated a comment Simplified a block of code Renamed a variable --- .../MovingAverageQueryRunner.java | 4 +- .../druid/query/CPUTimeMetricQueryRunner.java | 2 +- .../java/org/apache/druid/query/Druids.java | 2 +- ...portTimelineMissingSegmentQueryRunner.java | 2 +- .../druid/query/context/ResponseContext.java | 72 ++++++++++--------- .../apache/druid/query/scan/ScanQuery.java | 19 ++--- .../druid/query/scan/ScanQueryEngine.java | 19 ++--- .../query/scan/ScanQueryLimitRowIterator.java | 2 +- .../query/scan/ScanQueryQueryToolChest.java | 2 +- .../query/scan/ScanQueryRunnerFactory.java | 16 ++--- .../spec/SpecificSegmentQueryRunner.java | 2 +- .../druid/query/RetryQueryRunnerTest.java | 36 +++++----- .../query/context/ResponseContextTest.java | 42 +++++------ .../scan/ScanQueryRunnerFactoryTest.java | 8 +-- .../druid/client/CachingClusteredClient.java | 6 +- .../apache/druid/server/QueryResource.java | 12 ++-- 16 files changed, 128 insertions(+), 118 deletions(-) diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java index 289d2ed6e3d4..3f88b77fa412 100644 --- a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java +++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java @@ -123,7 +123,7 @@ public Sequence run(QueryPlus query, ResponseContext responseContext) ResponseContext gbqResponseContext = ResponseContext.createEmpty(); gbqResponseContext.put( - ResponseContext.Key.QUERY_FAIL_TIME, + ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS, System.currentTimeMillis() + QueryContexts.getTimeout(gbq) ); gbqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); @@ -164,7 +164,7 @@ public Sequence run(QueryPlus query, ResponseContext responseContext) ); ResponseContext tsqResponseContext = ResponseContext.createEmpty(); tsqResponseContext.put( - ResponseContext.Key.QUERY_FAIL_TIME, + ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS, System.currentTimeMillis() + QueryContexts.getTimeout(tsq) ); tsqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); diff --git a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java index 5e2785b6c8c1..7953d563f58f 100644 --- a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java @@ -84,7 +84,7 @@ public void after(boolean isDone, Throwable thrown) if (report) { final long cpuTimeNs = cpuTimeAccumulator.get(); if (cpuTimeNs > 0) { - responseContext.merge(ResponseContext.Key.CPU_CONSUMED, cpuTimeNs); + responseContext.add(ResponseContext.Key.CPU_CONSUMED_NANOS, cpuTimeNs); queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } } diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 47e3ede9a339..2e35891fafc4 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -966,7 +966,7 @@ public static ScanQueryBuilder copy(ScanQuery query) .virtualColumns(query.getVirtualColumns()) .resultFormat(query.getResultFormat()) .batchSize(query.getBatchSize()) - .limit(query.getLimit()) + .limit(query.getScanRowsLimit()) .filters(query.getFilter()) .columns(query.getColumns()) .legacy(query.isLegacy()) diff --git a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java index 7d30c560938c..b360d228f1a7 100644 --- a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -39,7 +39,7 @@ public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) @Override public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { - responseContext.merge(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor)); + responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor)); return Sequences.empty(); } } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index bd530ebb986d..47dee0aecdef 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -23,13 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.guice.annotations.PublicApi; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,8 +85,9 @@ public enum Key ETAG("ETag"), /** * Query fail time (current time + timeout). + * The final value in comparison to continuously updated TIMEOUT_AT. */ - QUERY_FAIL_TIME("queryFailTime"), + QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"), /** * Query total bytes gathered. */ @@ -93,19 +95,24 @@ public enum Key /** * This variable indicates when a running query should be expired, * and is effective only when 'timeout' of queryContext has a positive value. + * Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine} + * by reducing its value on the time of every scan iteration. */ TIMEOUT_AT("timeoutAt"), /** * The number of scanned rows. + * For backward compatibility the context key name still equals to "count". */ - COUNT( + NUM_SCANNED_ROWS( "count", (oldValue, newValue) -> (long) oldValue + (long) newValue ), /** - * CPU consumed while processing a request. + * The total CPU time for threads related to Sequence processing of the query. + * Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes. + * For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner} */ - CPU_CONSUMED( + CPU_CONSUMED_NANOS( "cpuConsumed", (oldValue, newValue) -> (long) oldValue + (long) newValue ); @@ -172,9 +179,10 @@ public Object remove(Key key) } /** - * Merges a new value associated with a key with an old value. + * Adds (merges) a new value associated with a key to an old value. + * See merge function of a context key for a specific implementation. */ - public Object merge(Key key, Object value) + public Object add(Key key, Object value) { return getDelegate().merge(key.name, value, key.mergeFunction); } @@ -188,7 +196,7 @@ public void merge(ResponseContext responseContext) for (Key key : Key.values()) { final Object newValue = responseContext.get(key); if (newValue != null) { - merge(key, newValue); + add(key, newValue); } } } @@ -198,32 +206,34 @@ public void merge(ResponseContext responseContext) * The method removes max-length fields one by one if the resulting string length is greater than the limit. * The resulting string might be correctly deserialized as a {@link ResponseContext}. */ - public SerializationResult serializeWith(ObjectMapper objectMapper, int maxLength) throws JsonProcessingException + public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException { final String fullSerializedString = objectMapper.writeValueAsString(getDelegate()); - if (fullSerializedString.length() <= maxLength) { - return new SerializationResult(fullSerializedString, fullSerializedString, false); + if (fullSerializedString.length() <= maxCharsNumber) { + return new SerializationResult(fullSerializedString, fullSerializedString); } else { final HashMap copiedMap = new HashMap<>(getDelegate()); - final PriorityQueue> serializedPairs = new PriorityQueue<>((o1, o2) -> { - Preconditions.checkNotNull(o1.rhs); - Preconditions.checkNotNull(o2.rhs); - return o2.rhs.length() - o1.rhs.length(); - }); + final PriorityQueue> serializedValueEntries = new PriorityQueue<>( + Comparator.comparing((Map.Entry e) -> e.getValue().length()).reversed() + ); for (Map.Entry e : copiedMap.entrySet()) { - serializedPairs.add(new Pair<>(e.getKey(), objectMapper.writeValueAsString(e.getValue()))); + serializedValueEntries.add(new AbstractMap.SimpleImmutableEntry<>( + e.getKey(), + objectMapper.writeValueAsString(e.getValue()) + )); } - while (!copiedMap.isEmpty()) { - final Pair maxLengthPair = serializedPairs.poll(); - Preconditions.checkNotNull(maxLengthPair); - copiedMap.remove(maxLengthPair.lhs); + // quadratic complexity: while loop with map serialization on each iteration + while (!copiedMap.isEmpty() && !serializedValueEntries.isEmpty()) { + final Map.Entry maxLengthEntry = serializedValueEntries.poll(); + Preconditions.checkNotNull(maxLengthEntry); + copiedMap.remove(maxLengthEntry.getKey()); final String reducedSerializedString = objectMapper.writeValueAsString(copiedMap); - if (reducedSerializedString.length() <= maxLength) { - return new SerializationResult(reducedSerializedString, fullSerializedString, true); + if (reducedSerializedString.length() <= maxCharsNumber) { + return new SerializationResult(reducedSerializedString, fullSerializedString); } } final String serializedEmptyMap = objectMapper.writeValueAsString(copiedMap); - return new SerializationResult(serializedEmptyMap, fullSerializedString, true); + return new SerializationResult(serializedEmptyMap, fullSerializedString); } } @@ -236,20 +246,18 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxLengt */ public static class SerializationResult { - private final String result; + private final String truncatedResult; private final String fullResult; - private final Boolean isReduced; - SerializationResult(String result, String fullResult, Boolean isReduced) + SerializationResult(String truncatedResult, String fullResult) { - this.result = result; + this.truncatedResult = truncatedResult; this.fullResult = fullResult; - this.isReduced = isReduced; } - public String getResult() + public String getTruncatedResult() { - return result; + return truncatedResult; } public String getFullResult() @@ -259,7 +267,7 @@ public String getFullResult() public Boolean isReduced() { - return isReduced; + return !truncatedResult.equals(fullResult); } } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 7b314ce45702..cbdd4f933bd6 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -110,7 +110,7 @@ public static Order fromString(String name) private final VirtualColumns virtualColumns; private final ResultFormat resultFormat; private final int batchSize; - private final long limit; + private final long scanRowsLimit; private final DimFilter dimFilter; private final List columns; private final Boolean legacy; @@ -125,7 +125,7 @@ public ScanQuery( @JsonProperty("virtualColumns") VirtualColumns virtualColumns, @JsonProperty("resultFormat") ResultFormat resultFormat, @JsonProperty("batchSize") int batchSize, - @JsonProperty("limit") long limit, + @JsonProperty("limit") long scanRowsLimit, @JsonProperty("order") Order order, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("columns") List columns, @@ -141,9 +141,9 @@ public ScanQuery( this.batchSize > 0, "batchSize must be greater than 0" ); - this.limit = (limit == 0) ? Long.MAX_VALUE : limit; + this.scanRowsLimit = (scanRowsLimit == 0) ? Long.MAX_VALUE : scanRowsLimit; Preconditions.checkArgument( - this.limit > 0, + this.scanRowsLimit > 0, "limit must be greater than 0" ); this.dimFilter = dimFilter; @@ -201,9 +201,9 @@ public int getBatchSize() } @JsonProperty - public long getLimit() + public long getScanRowsLimit() { - return limit; + return scanRowsLimit; } @JsonProperty @@ -311,7 +311,7 @@ public boolean equals(final Object o) } final ScanQuery scanQuery = (ScanQuery) o; return batchSize == scanQuery.batchSize && - limit == scanQuery.limit && + scanRowsLimit == scanQuery.scanRowsLimit && Objects.equals(legacy, scanQuery.legacy) && Objects.equals(virtualColumns, scanQuery.virtualColumns) && Objects.equals(resultFormat, scanQuery.resultFormat) && @@ -322,7 +322,8 @@ public boolean equals(final Object o) @Override public int hashCode() { - return Objects.hash(super.hashCode(), virtualColumns, resultFormat, batchSize, limit, dimFilter, columns, legacy); + return Objects.hash(super.hashCode(), virtualColumns, resultFormat, batchSize, + scanRowsLimit, dimFilter, columns, legacy); } @Override @@ -334,7 +335,7 @@ public String toString() ", virtualColumns=" + getVirtualColumns() + ", resultFormat='" + resultFormat + '\'' + ", batchSize=" + batchSize + - ", limit=" + limit + + ", limit=" + scanRowsLimit + ", dimFilter=" + dimFilter + ", columns=" + columns + ", legacy=" + legacy + diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index fbd452df6bb4..d4155fac26a3 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -67,9 +67,10 @@ public Sequence process( // "legacy" should be non-null due to toolChest.mergeResults final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "WTF?! Expected non-null legacy"); - if (responseContext.get(ResponseContext.Key.COUNT) != null) { - long count = (long) responseContext.get(ResponseContext.Key.COUNT); - if (count >= query.getLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) { + final Object numScannedRows = responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS); + if (numScannedRows != null) { + long count = (long) numScannedRows; + if (count >= query.getScanRowsLimit() && query.getOrder().equals(ScanQuery.Order.NONE)) { return Sequences.empty(); } } @@ -121,8 +122,8 @@ public Sequence process( final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); - responseContext.merge(ResponseContext.Key.COUNT, 0L); - final long limit = calculateLimit(query, responseContext); + responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L); + final long limit = calculateRemainingScanRowsLimit(query, responseContext); return Sequences.concat( adapter .makeCursors( @@ -185,7 +186,7 @@ public ScanResultValue next() } else { throw new UOE("resultFormat[%s] is not supported", resultFormat.toString()); } - responseContext.merge(ResponseContext.Key.COUNT, offset - lastOffset); + responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset); if (hasTimeout) { responseContext.put( ResponseContext.Key.TIMEOUT_AT, @@ -258,11 +259,11 @@ public void cleanup(Iterator iterFromMake) * If we're performing time-ordering, we want to scan through the first `limit` rows in each segment ignoring the number * of rows already counted on other segments. */ - private long calculateLimit(ScanQuery query, ResponseContext responseContext) + private long calculateRemainingScanRowsLimit(ScanQuery query, ResponseContext responseContext) { if (query.getOrder().equals(ScanQuery.Order.NONE)) { - return query.getLimit() - (long) responseContext.get(ResponseContext.Key.COUNT); + return query.getScanRowsLimit() - (long) responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS); } - return query.getLimit(); + return query.getScanRowsLimit(); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java index 4e30e869aa18..b603dd54d7d9 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java @@ -65,7 +65,7 @@ public ScanQueryLimitRowIterator( { this.query = (ScanQuery) queryPlus.getQuery(); this.resultFormat = query.getResultFormat(); - this.limit = query.getLimit(); + this.limit = query.getScanRowsLimit(); Query historicalQuery = queryPlus.getQuery().withOverriddenContext(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)); Sequence baseSequence = baseRunner.run(QueryPlus.wrap(historicalQuery), responseContext); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 6d6758b19260..95006cee5766 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -61,7 +61,7 @@ public QueryRunner mergeResults(final QueryRunner queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); - if (scanQuery.getLimit() == Long.MAX_VALUE) { + if (scanQuery.getScanRowsLimit() == Long.MAX_VALUE) { return runner.run(queryPlusWithNonNullLegacy, responseContext); } return new BaseSequence<>( diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 7bae4bfc75b3..645f545ef280 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -104,8 +104,8 @@ public QueryRunner mergeRunners( input -> input.run(queryPlus, responseContext) ) ); - if (query.getLimit() <= Integer.MAX_VALUE) { - return returnedRows.limit(Math.toIntExact(query.getLimit())); + if (query.getScanRowsLimit() <= Integer.MAX_VALUE) { + return returnedRows.limit(Math.toIntExact(query.getScanRowsLimit())); } else { return returnedRows; } @@ -120,7 +120,7 @@ public QueryRunner mergeRunners( int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null ? scanQueryConfig.getMaxRowsQueuedForOrdering() : query.getMaxRowsQueuedForOrdering()); - if (query.getLimit() <= maxRowsQueuedForOrdering) { + if (query.getScanRowsLimit() <= maxRowsQueuedForOrdering) { // Use priority queue strategy return priorityQueueSortAndLimit( Sequences.concat(Sequences.map( @@ -189,7 +189,7 @@ public QueryRunner mergeRunners( + " Try reducing the scope of the query to scan fewer partitions than the configurable limit of" + " %,d partitions or lower the row limit below %,d.", maxNumPartitionsInSegment, - query.getLimit(), + query.getScanRowsLimit(), scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(), scanQueryConfig.getMaxRowsQueuedForOrdering() ); @@ -207,16 +207,16 @@ Sequence priorityQueueSortAndLimit( { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); - if (scanQuery.getLimit() > Integer.MAX_VALUE) { + if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) { throw new UOE( "Limit of %,d rows not supported for priority queue strategy of time-ordering scan results", - scanQuery.getLimit() + scanQuery.getScanRowsLimit() ); } // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) - int limit = Math.toIntExact(scanQuery.getLimit()); + int limit = Math.toIntExact(scanQuery.getScanRowsLimit()); PriorityQueue q = new PriorityQueue<>(limit, priorityQComparator); @@ -337,7 +337,7 @@ Sequence nWayMergeAndLimit( ) ) ); - long limit = ((ScanQuery) (queryPlus.getQuery())).getLimit(); + long limit = ((ScanQuery) (queryPlus.getQuery())).getScanRowsLimit(); if (limit == Long.MAX_VALUE) { return resultSequence; } diff --git a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java index b19e0514c4a6..625f0325229e 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java @@ -150,7 +150,7 @@ public RetType wrap(Supplier sequenceProcessing) private void appendMissingSegment(ResponseContext responseContext) { - responseContext.merge( + responseContext.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(specificSpec.getDescriptor()) ); diff --git a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java index 7576670ab0db..9826c75afef6 100644 --- a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java @@ -97,7 +97,7 @@ public void testRunWithMissingSegments() @Override public Sequence> run(QueryPlus queryPlus, ResponseContext context) { - context.merge( + context.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); @@ -135,7 +135,7 @@ public boolean isReturnPartialResults() public void testRetry() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() @@ -146,12 +146,12 @@ public Sequence> run( ResponseContext context ) { - if ((int) context.get(ResponseContext.Key.COUNT) == 0) { - context.merge( + if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) { + context.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); - context.put(ResponseContext.Key.COUNT, 1); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1); return Sequences.empty(); } else { return Sequences.simple( @@ -184,7 +184,7 @@ public Sequence> run( public void testRetryMultiple() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() @@ -195,12 +195,12 @@ public Sequence> run( ResponseContext context ) { - if ((int) context.get(ResponseContext.Key.COUNT) < 3) { - context.merge( + if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) < 3) { + context.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); - context.put(ResponseContext.Key.COUNT, (int) context.get(ResponseContext.Key.COUNT) + 1); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, (int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) + 1); return Sequences.empty(); } else { return Sequences.simple( @@ -243,7 +243,7 @@ public Sequence> run( ResponseContext context ) { - context.merge( + context.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) ); @@ -266,7 +266,7 @@ public Sequence> run( public void testNoDuplicateRetry() { ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.COUNT, 0); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() @@ -278,16 +278,16 @@ public Sequence> run( ) { final Query> query = queryPlus.getQuery(); - if ((int) context.get(ResponseContext.Key.COUNT) == 0) { + if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) { // assume 2 missing segments at first run - context.merge( + context.add( ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList( new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1), new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2) ) ); - context.put(ResponseContext.Key.COUNT, 1); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1); return Sequences.simple( Collections.singletonList( new Result<>( @@ -298,15 +298,15 @@ public Sequence> run( ) ) ); - } else if ((int) context.get(ResponseContext.Key.COUNT) == 1) { + } else if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 1) { // this is first retry Assert.assertTrue("Should retry with 2 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 2); // assume only left 1 missing at first retry - context.merge( + context.add( ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)) ); - context.put(ResponseContext.Key.COUNT, 2); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2); return Sequences.simple( Collections.singletonList( new Result<>( @@ -321,7 +321,7 @@ public Sequence> run( // this is second retry Assert.assertTrue("Should retry with 1 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 1); // assume no more missing at second retry - context.put(ResponseContext.Key.COUNT, 3); + context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 3); return Sequences.simple( Collections.singletonList( new Result<>( diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index f339ee90454e..94968fa71a41 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -38,51 +38,51 @@ public class ResponseContextTest public void mergeValueTest() { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.merge(ResponseContext.Key.ETAG, "dummy-etag"); + ctx.add(ResponseContext.Key.ETAG, "dummy-etag"); Assert.assertEquals("dummy-etag", ctx.get(ResponseContext.Key.ETAG)); - ctx.merge(ResponseContext.Key.ETAG, "new-dummy-etag"); + ctx.add(ResponseContext.Key.ETAG, "new-dummy-etag"); Assert.assertEquals("new-dummy-etag", ctx.get(ResponseContext.Key.ETAG)); final Interval interval01 = Intervals.of("2019-01-01/P1D"); - ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); + ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); Assert.assertArrayEquals( Collections.singletonList(interval01).toArray(), ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() ); final Interval interval12 = Intervals.of("2019-01-02/P1D"); final Interval interval23 = Intervals.of("2019-01-03/P1D"); - ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23)); + ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23)); Assert.assertArrayEquals( Arrays.asList(interval01, interval12, interval23).toArray(), ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() ); final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); - ctx.merge(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); + ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); Assert.assertArrayEquals( Collections.singletonList(sd01).toArray(), ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() ); final SegmentDescriptor sd12 = new SegmentDescriptor(interval12, "12", 1); final SegmentDescriptor sd23 = new SegmentDescriptor(interval23, "23", 2); - ctx.merge(ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList(sd12, sd23)); + ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList(sd12, sd23)); Assert.assertArrayEquals( Arrays.asList(sd01, sd12, sd23).toArray(), ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() ); - ctx.merge(ResponseContext.Key.COUNT, 0L); - Assert.assertEquals(0L, ctx.get(ResponseContext.Key.COUNT)); - ctx.merge(ResponseContext.Key.COUNT, 1L); - Assert.assertEquals(1L, ctx.get(ResponseContext.Key.COUNT)); - ctx.merge(ResponseContext.Key.COUNT, 3L); - Assert.assertEquals(4L, ctx.get(ResponseContext.Key.COUNT)); + ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L); + Assert.assertEquals(0L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); + ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 1L); + Assert.assertEquals(1L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); + ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 3L); + Assert.assertEquals(4L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); - ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); + ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); Assert.assertEquals(false, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); - ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, true); + ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, true); Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); - ctx.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); + ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); } @@ -93,7 +93,7 @@ public void mergeResponseContextTest() ctx1.put(ResponseContext.Key.ETAG, "dummy-etag-1"); final Interval interval01 = Intervals.of("2019-01-01/P1D"); ctx1.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); - ctx1.put(ResponseContext.Key.COUNT, 1L); + ctx1.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1L); final ResponseContext ctx2 = ResponseContext.createEmpty(); ctx2.put(ResponseContext.Key.ETAG, "dummy-etag-2"); @@ -101,11 +101,11 @@ public void mergeResponseContextTest() ctx2.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval12)); final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); ctx2.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); - ctx2.put(ResponseContext.Key.COUNT, 2L); + ctx2.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2L); ctx1.merge(ctx2); Assert.assertEquals("dummy-etag-2", ctx1.get(ResponseContext.Key.ETAG)); - Assert.assertEquals(3L, ctx1.get(ResponseContext.Key.COUNT)); + Assert.assertEquals(3L, ctx1.get(ResponseContext.Key.NUM_SCANNED_ROWS)); Assert.assertArrayEquals( Arrays.asList(interval01, interval12).toArray(), ((List) ctx1.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() @@ -120,16 +120,16 @@ public void mergeResponseContextTest() public void serializeWith() throws JsonProcessingException { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.put(ResponseContext.Key.COUNT, 100L); + ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100L); ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, 1000); - Assert.assertEquals(fullString, res1.getResult()); + Assert.assertEquals(fullString, res1.getTruncatedResult()); final ResponseContext reducedCtx = ResponseContext.createEmpty(); reducedCtx.merge(ctx); final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 20); reducedCtx.remove(ResponseContext.Key.ETAG); - Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getResult()); + Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getTruncatedResult()); } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index cf76f3750482..287733d441b4 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -145,13 +145,13 @@ public void testSortAndLimitScanResultValues() DateTimes.of("2019-01-01").plusHours(1) )) ).toList(); - if (query.getLimit() > Integer.MAX_VALUE) { + if (query.getScanRowsLimit() > Integer.MAX_VALUE) { Assert.fail("Unsupported exception should have been thrown due to high limit"); } validateSortedOutput(output, expectedEventTimestamps); } catch (UOE e) { - if (query.getLimit() <= Integer.MAX_VALUE) { + if (query.getScanRowsLimit() <= Integer.MAX_VALUE) { Assert.fail("Unsupported operation exception should not have been thrown here"); } } @@ -247,7 +247,7 @@ private void validateSortedOutput(List output, List expec } // check total # of rows <= limit - Assert.assertTrue(output.size() <= query.getLimit()); + Assert.assertTrue(output.size() <= query.getScanRowsLimit()); // check ordering is correct for (int i = 1; i < output.size(); i++) { @@ -261,7 +261,7 @@ private void validateSortedOutput(List output, List expec } // check the values are correct - for (int i = 0; i < query.getLimit() && i < output.size(); i++) { + for (int i = 0; i < query.getScanRowsLimit() && i < output.size(); i++) { Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 09871b8dce58..18a4a028b367 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -354,12 +354,12 @@ private void computeUncoveredIntervals(TimelineLookup ti } if (!uncoveredIntervals.isEmpty()) { - // This returns intervals for which NO segment is present. + // Record in the response context the interval for which NO segment is present. // Which is not necessarily an indication that the data doesn't exist or is // incomplete. The data could exist and just not be loaded yet. In either // case, though, this query will not include any data from the identified intervals. - responseContext.merge(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals); - responseContext.merge(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); + responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals); + responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); } } diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index b8214cbabd3a..9c4b8d248f95 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -213,7 +213,7 @@ public Response doPost( QueryContexts.isSerializeDateTimeAsLong(query, false) || (!shouldFinalize && QueryContexts.isSerializeDateTimeAsLongInner(query, false)); final ObjectWriter jsonWriter = ioReaderWriter.newOutputWriter(serializeDateTimeAsLong); - Response.ResponseBuilder builder = Response + Response.ResponseBuilder responseBuilder = Response .ok( new StreamingOutput() { @@ -252,9 +252,9 @@ public void write(OutputStream outputStream) throws WebApplicationException ) .header("X-Druid-Query-Id", queryId); - if (responseContext.get(ResponseContext.Key.ETAG) != null) { - builder.header(HEADER_ETAG, responseContext.get(ResponseContext.Key.ETAG)); - responseContext.remove(ResponseContext.Key.ETAG); + Object entityTag = responseContext.remove(ResponseContext.Key.ETAG); + if (entityTag != null) { + responseBuilder.header(HEADER_ETAG, entityTag); } DirectDruidClient.removeMagicResponseContextFields(responseContext); @@ -274,8 +274,8 @@ public void write(OutputStream outputStream) throws WebApplicationException ); } - return builder - .header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult()) + return responseBuilder + .header(HEADER_RESPONSE_CONTEXT, serializationResult.getTruncatedResult()) .build(); } catch (Exception e) { From 69c747fa25c59d3dd234d2725cae77e785713c7a Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Sat, 27 Jul 2019 07:25:56 +0300 Subject: [PATCH 09/15] Added JsonProperty annotation to renamed ScanQuery field --- .../src/main/java/org/apache/druid/query/scan/ScanQuery.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index cbdd4f933bd6..651f98fbd351 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -110,6 +110,7 @@ public static Order fromString(String name) private final VirtualColumns virtualColumns; private final ResultFormat resultFormat; private final int batchSize; + @JsonProperty("limit") private final long scanRowsLimit; private final DimFilter dimFilter; private final List columns; From 7ed508927b32f882ffbc73185d93c64ab3684ead Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Mon, 29 Jul 2019 19:24:56 +0300 Subject: [PATCH 10/15] Extension-friendly context key implementation --- .../druid/query/context/ResponseContext.java | 117 ++++++++++++++++-- .../query/context/ResponseContextTest.java | 73 +++++++++++ 2 files changed, 177 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 47dee0aecdef..882fdb0bfb63 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -30,11 +30,13 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.TreeMap; import java.util.function.BiFunction; /** @@ -45,9 +47,47 @@ public abstract class ResponseContext { /** - * Keys associated with objects in the context. + * The base interface of a response context key. + * Should be implemented by every context key. */ - public enum Key + public interface BaseKey + { + String getName(); + /** + * Merge function associated with a key: Object (Object oldValue, Object newValue) + */ + BiFunction getMergeFunction(); + } + + /** + * Keys associated with objects in the context. The enum is extension-friendly. + *

If it's necessary to have some new keys in the context then they could be described in a separate enum: + *

{@code
+   * public enum ExtensionResponseContextKey implements BaseKey
+   * {
+   *   EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
+   *
+   *   static {
+   *     for (ResponseContextKey key : values()) ResponseContext.Key.addKey(key);
+   *   }
+   *
+   *   private final String name;
+   *   private final BiFunction mergeFunction;
+   *
+   *   ExtensionResponseContextKey(String name)
+   *   {
+   *     this.name = name;
+   *     this.mergeFunction = (oldValue, newValue) -> newValue;
+   *   }
+   *
+   *   @Override public String getName() { return name; }
+   *
+   *   @Override public BiFunction getMergeFunction() { return mergeFunction; }
+   * }
+   * }
+ * Make sure all extension enum values added with Key.addKey method. + */ + public enum Key implements BaseKey { /** * Lists intervals for which NO segment is present. @@ -117,10 +157,49 @@ public enum Key (oldValue, newValue) -> (long) oldValue + (long) newValue ); - private final String name; /** - * Merge function associated with a key: Object (Object oldValue, Object newValue) + * TreeMap is used to have the natural ordering of its keys + */ + private static Map map = new TreeMap<>(); + + static { + for (BaseKey key : values()) { + addKey(key); + } + } + + /** + * The primary way of registering context keys. + * Only the keys registered this way are considered during the context merge. + */ + public static void addKey(BaseKey key) + { + Preconditions.checkState( + !map.containsKey(key.getName()), + "ResponseContext keys already has the key with [%s] name", + key.getName() + ); + map.put(key.getName(), key); + } + + /** + * Returns a key associated with the name if the key was added via addKey method + */ + public static BaseKey keyOf(String name) + { + return map.get(name); + } + + /** + * Returns all keys the enum contains and the added via addKey method */ + public static Collection getKeys() + { + return map.values(); + } + + private final String name; + private final BiFunction mergeFunction; Key(String name) @@ -134,6 +213,18 @@ public enum Key this.name = name; this.mergeFunction = mergeFunction; } + + @Override + public String getName() + { + return name; + } + + @Override + public BiFunction getMergeFunction() + { + return mergeFunction; + } } /** @@ -163,28 +254,28 @@ protected Map getDelegate() protected abstract Map getDelegate(); - public Object put(Key key, Object value) + public Object put(BaseKey key, Object value) { - return getDelegate().put(key.name, value); + return getDelegate().put(key.getName(), value); } - public Object get(Key key) + public Object get(BaseKey key) { - return getDelegate().get(key.name); + return getDelegate().get(key.getName()); } - public Object remove(Key key) + public Object remove(BaseKey key) { - return getDelegate().remove(key.name); + return getDelegate().remove(key.getName()); } /** * Adds (merges) a new value associated with a key to an old value. * See merge function of a context key for a specific implementation. */ - public Object add(Key key, Object value) + public Object add(BaseKey key, Object value) { - return getDelegate().merge(key.name, value, key.mergeFunction); + return getDelegate().merge(key.getName(), value, key.getMergeFunction()); } /** @@ -193,7 +284,7 @@ public Object add(Key key, Object value) */ public void merge(ResponseContext responseContext) { - for (Key key : Key.values()) { + for (BaseKey key : Key.getKeys()) { final Object newValue = responseContext.get(key); if (newValue != null) { add(key, newValue); diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 94968fa71a41..df9e7f88037c 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -30,10 +30,50 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.BiFunction; public class ResponseContextTest { + public enum ExtensionResponseContextKey implements ResponseContext.BaseKey + { + EXTENSION_KEY_1("extension_key_1"), + EXTENSION_KEY_2("extension_key_2", (oldValue, newValue) -> (long) oldValue + (long) newValue); + + static { + for (ResponseContext.BaseKey key : values()) { + ResponseContext.Key.addKey(key); + } + } + + private final String name; + private final BiFunction mergeFunction; + + ExtensionResponseContextKey(String name) + { + this.name = name; + this.mergeFunction = (oldValue, newValue) -> newValue; + } + + ExtensionResponseContextKey(String name, BiFunction mergeFunction) + { + this.name = name; + this.mergeFunction = mergeFunction; + } + + @Override + public String getName() + { + return name; + } + + @Override + public BiFunction getMergeFunction() + { + return mergeFunction; + } + } + @Test public void mergeValueTest() { @@ -132,4 +172,37 @@ public void serializeWith() throws JsonProcessingException reducedCtx.remove(ResponseContext.Key.ETAG); Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getTruncatedResult()); } + + @Test + public void extensionEnumIntegrityTest() + { + Assert.assertEquals( + ExtensionResponseContextKey.EXTENSION_KEY_1, + ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_1.getName()) + ); + Assert.assertEquals( + ExtensionResponseContextKey.EXTENSION_KEY_2, + ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_2.getName()) + ); + for (ResponseContext.BaseKey key : ExtensionResponseContextKey.values()) { + Assert.assertTrue(ResponseContext.Key.getKeys().contains(key)); + } + } + + @Test + public void extensionEnumMergeTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.add(ResponseContext.Key.ETAG, "etag"); + ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "string-value"); + ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 2L); + final ResponseContext ctxFinal = ResponseContext.createEmpty(); + ctxFinal.add(ResponseContext.Key.ETAG, "old-etag"); + ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "old-string-value"); + ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 1L); + ctxFinal.merge(ctx); + Assert.assertEquals("etag", ctxFinal.get(ResponseContext.Key.ETAG)); + Assert.assertEquals("string-value", ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_1)); + Assert.assertEquals(1L + 2L, ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_2)); + } } From 9b001f318e72b79f6db5c3d343b7fdadcc560f17 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Wed, 31 Jul 2019 20:38:52 +0300 Subject: [PATCH 11/15] Refactored ResponseContext: updated delegate type, comments and exceptions Reducing serialized context length by removing some of its' collection elements --- .../context/ConcurrentResponseContext.java | 4 +- .../query/context/DefaultResponseContext.java | 4 +- .../druid/query/context/ResponseContext.java | 172 +++++++++++------- .../apache/druid/query/scan/ScanQuery.java | 2 +- .../query/context/ResponseContextTest.java | 138 +++++++++++++- .../apache/druid/server/QueryResource.java | 2 +- 6 files changed, 243 insertions(+), 79 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java index 48838f171917..b1e648467a78 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java @@ -35,10 +35,10 @@ public static ConcurrentResponseContext createEmpty() return new ConcurrentResponseContext(); } - private final ConcurrentHashMap delegate = new ConcurrentHashMap<>(); + private final ConcurrentHashMap delegate = new ConcurrentHashMap<>(); @Override - protected Map getDelegate() + protected Map getDelegate() { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java index adff1ff6b3fe..33724c1bf044 100644 --- a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java @@ -35,10 +35,10 @@ public static DefaultResponseContext createEmpty() return new DefaultResponseContext(); } - private final HashMap delegate = new HashMap<>(); + private final HashMap delegate = new HashMap<>(); @Override - protected Map getDelegate() + protected Map getDelegate() { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 882fdb0bfb63..f8122ead674d 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -19,23 +19,25 @@ package org.apache.druid.query.context; +import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import java.io.IOException; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.TreeMap; import java.util.function.BiFunction; @@ -52,6 +54,7 @@ public abstract class ResponseContext */ public interface BaseKey { + @JsonValue String getName(); /** * Merge function associated with a key: Object (Object oldValue, Object newValue) @@ -60,15 +63,16 @@ public interface BaseKey } /** - * Keys associated with objects in the context. The enum is extension-friendly. - *

If it's necessary to have some new keys in the context then they could be described in a separate enum: + * Keys associated with objects in the context. + *

+ * If it's necessary to have some new keys in the context then they might be listed in a separate enum: *

{@code
    * public enum ExtensionResponseContextKey implements BaseKey
    * {
    *   EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
    *
    *   static {
-   *     for (ResponseContextKey key : values()) ResponseContext.Key.addKey(key);
+   *     for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
    *   }
    *
    *   private final String name;
@@ -85,7 +89,7 @@ public interface BaseKey
    *   @Override public BiFunction getMergeFunction() { return mergeFunction; }
    * }
    * }
- * Make sure all extension enum values added with Key.addKey method. + * Make sure all extension enum values added with {@link Key#registerKey} method. */ public enum Key implements BaseKey { @@ -125,7 +129,7 @@ public enum Key implements BaseKey ETAG("ETag"), /** * Query fail time (current time + timeout). - * The final value in comparison to continuously updated TIMEOUT_AT. + * It is not updated continuously as TIMEOUT_AT. */ QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"), /** @@ -155,47 +159,60 @@ public enum Key implements BaseKey CPU_CONSUMED_NANOS( "cpuConsumed", (oldValue, newValue) -> (long) oldValue + (long) newValue + ), + /** + * Indicates if a {@link ResponseContext} was truncated during serialization. + */ + TRUNCATED( + "trancated", + (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue ); /** * TreeMap is used to have the natural ordering of its keys */ - private static Map map = new TreeMap<>(); + private static final Map registeredKeys = new TreeMap<>(); static { for (BaseKey key : values()) { - addKey(key); + registerKey(key); } } /** - * The primary way of registering context keys. - * Only the keys registered this way are considered during the context merge. + * Primary way of registering context keys. + * @throws IllegalArgumentException if the key has already been registered. */ - public static void addKey(BaseKey key) + public static void registerKey(BaseKey key) { - Preconditions.checkState( - !map.containsKey(key.getName()), - "ResponseContext keys already has the key with [%s] name", + Preconditions.checkArgument( + !registeredKeys.containsKey(key.getName()), + "Key [%s] has already been registered as a context key", key.getName() ); - map.put(key.getName(), key); + registeredKeys.put(key.getName(), key); } /** - * Returns a key associated with the name if the key was added via addKey method + * Returns a registered key associated with the name {@param name}. + * @throws IllegalStateException if a corresponding key has not been registered. */ public static BaseKey keyOf(String name) { - return map.get(name); + Preconditions.checkState( + registeredKeys.containsKey(name), + "Key [%s] has not yet been registered as a context key", + name + ); + return registeredKeys.get(name); } /** - * Returns all keys the enum contains and the added via addKey method + * Returns all keys registered via {@link Key#registerKey}. */ - public static Collection getKeys() + public static Collection getAllRegisteredKeys() { - return map.values(); + return registeredKeys.values(); } private final String name; @@ -236,66 +253,74 @@ public static ResponseContext createEmpty() return DefaultResponseContext.createEmpty(); } + /** + * Deserializes a string into {@link ResponseContext} using given {@link ObjectMapper}. + * @throws IllegalStateException if one of the deserialized map keys has not been registered. + */ public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException { - final Map delegate = objectMapper.readValue( + final Map keyNameToObjects = objectMapper.readValue( responseContext, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - return new ResponseContext() - { - @Override - protected Map getDelegate() - { - return delegate; - } - }; + final ResponseContext context = ResponseContext.createEmpty(); + keyNameToObjects.forEach((keyName, value) -> { + final BaseKey key = Key.keyOf(keyName); + context.add(key, value); + }); + return context; } - protected abstract Map getDelegate(); + protected abstract Map getDelegate(); + /** + * Associates the specified object with the specified key. + * @throws IllegalStateException if the key has not been registered. + */ public Object put(BaseKey key, Object value) { - return getDelegate().put(key.getName(), value); + final BaseKey registeredKey = Key.keyOf(key.getName()); + return getDelegate().put(registeredKey, value); } public Object get(BaseKey key) { - return getDelegate().get(key.getName()); + return getDelegate().get(key); } public Object remove(BaseKey key) { - return getDelegate().remove(key.getName()); + return getDelegate().remove(key); } /** * Adds (merges) a new value associated with a key to an old value. * See merge function of a context key for a specific implementation. + * @throws IllegalStateException if the key has not been registered. */ public Object add(BaseKey key, Object value) { - return getDelegate().merge(key.getName(), value, key.getMergeFunction()); + final BaseKey registeredKey = Key.keyOf(key.getName()); + return getDelegate().merge(registeredKey, value, key.getMergeFunction()); } /** - * Merges a response context into current. - * This method merges only keys from the enum {@link Key}. + * Merges a response context into the current. + * @throws IllegalStateException If a key of the {@code responseContext} has not been registered. */ public void merge(ResponseContext responseContext) { - for (BaseKey key : Key.getKeys()) { - final Object newValue = responseContext.get(key); + responseContext.getDelegate().forEach((key, newValue) -> { if (newValue != null) { add(key, newValue); } - } + }); } /** * Serializes the context given that the resulting string length is less than the provided limit. - * The method removes max-length fields one by one if the resulting string length is greater than the limit. - * The resulting string might be correctly deserialized as a {@link ResponseContext}. + * This method tries to remove some elements from context collections if it's needed to satisfy the limit. + * The resulting string might be correctly deserialized to {@link ResponseContext}. */ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException { @@ -303,28 +328,49 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars if (fullSerializedString.length() <= maxCharsNumber) { return new SerializationResult(fullSerializedString, fullSerializedString); } else { - final HashMap copiedMap = new HashMap<>(getDelegate()); - final PriorityQueue> serializedValueEntries = new PriorityQueue<>( - Comparator.comparing((Map.Entry e) -> e.getValue().length()).reversed() - ); - for (Map.Entry e : copiedMap.entrySet()) { - serializedValueEntries.add(new AbstractMap.SimpleImmutableEntry<>( - e.getKey(), - objectMapper.writeValueAsString(e.getValue()) - )); - } - // quadratic complexity: while loop with map serialization on each iteration - while (!copiedMap.isEmpty() && !serializedValueEntries.isEmpty()) { - final Map.Entry maxLengthEntry = serializedValueEntries.poll(); - Preconditions.checkNotNull(maxLengthEntry); - copiedMap.remove(maxLengthEntry.getKey()); - final String reducedSerializedString = objectMapper.writeValueAsString(copiedMap); - if (reducedSerializedString.length() <= maxCharsNumber) { - return new SerializationResult(reducedSerializedString, fullSerializedString); + // Indicates that the context is truncated during serialization. + add(Key.TRUNCATED, true); + final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate()); + final ArrayList> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields()); + final Comparator> valueLengthReversedComparator = + Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); + sortedNodesByLength.sort(valueLengthReversedComparator); + int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber; + // In general, the complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size + for (Map.Entry e : sortedNodesByLength) { + final String fieldName = e.getKey(); + final JsonNode node = e.getValue(); + if (!node.isArray() || needToRemoveCharsNumber >= node.toString().length()) { + if (node.isArray()) { + final int lengthBeforeRemove = node.toString().length(); + // Empty array could be correctly deserialized so we remove only its elements. + ((ArrayNode) node).removeAll(); + final int lengthAfterRemove = node.toString().length(); + needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; + } else { + // In general, a context should not contain nulls so we completely remove the field. + contextJsonNode.remove(fieldName); + // Since the field is completely removed (name + value) we need to do a recalculation + needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; + } + } else { + final ArrayNode arrNode = (ArrayNode) node; + while (node.size() > 0 && needToRemoveCharsNumber > 0) { + final int lengthBeforeRemove = arrNode.toString().length(); + // Reducing complexity by removing half of array's elements + final int removeUntil = node.size() / 2; + for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { + arrNode.remove(removeAt); + } + final int lengthAfterRemove = arrNode.toString().length(); + needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; + } + } + if (needToRemoveCharsNumber <= 0) { + break; } } - final String serializedEmptyMap = objectMapper.writeValueAsString(copiedMap); - return new SerializationResult(serializedEmptyMap, fullSerializedString); + return new SerializationResult(contextJsonNode.toString(), fullSerializedString); } } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 651f98fbd351..719f5f27e6f7 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -336,7 +336,7 @@ public String toString() ", virtualColumns=" + getVirtualColumns() + ", resultFormat='" + resultFormat + '\'' + ", batchSize=" + batchSize + - ", limit=" + scanRowsLimit + + ", scanRowsLimit=" + scanRowsLimit + ", dimFilter=" + dimFilter + ", columns=" + columns + ", legacy=" + legacy + diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index df9e7f88037c..559217bae86f 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.context; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.SegmentDescriptor; @@ -27,22 +28,24 @@ import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; public class ResponseContextTest { - public enum ExtensionResponseContextKey implements ResponseContext.BaseKey + enum ExtensionResponseContextKey implements ResponseContext.BaseKey { EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2", (oldValue, newValue) -> (long) oldValue + (long) newValue); static { for (ResponseContext.BaseKey key : values()) { - ResponseContext.Key.addKey(key); + ResponseContext.Key.registerKey(key); } } @@ -74,6 +77,39 @@ public BiFunction getMergeFunction() } } + private final ResponseContext.BaseKey nonregisteredKey = new ResponseContext.BaseKey() + { + @Override + public String getName() + { + return "non-registered-key"; + } + + @Override + public BiFunction getMergeFunction() + { + return (Object a, Object b) -> a; + } + }; + + @Test(expected = IllegalStateException.class) + public void putISETest() + { + ResponseContext.createEmpty().put(nonregisteredKey, new Object()); + } + + @Test(expected = IllegalStateException.class) + public void addISETest() + { + ResponseContext.createEmpty().add(nonregisteredKey, new Object()); + } + + @Test(expected = IllegalArgumentException.class) + public void registerKeyIAETest() + { + ResponseContext.Key.registerKey(ResponseContext.Key.NUM_SCANNED_ROWS); + } + @Test public void mergeValueTest() { @@ -156,21 +192,103 @@ public void mergeResponseContextTest() ); } + @Test(expected = IllegalStateException.class) + public void mergeISETest() + { + final ResponseContext ctx = new ResponseContext() + { + @Override + protected Map getDelegate() + { + return ImmutableMap.of(nonregisteredKey, "non-registered-key"); + } + }; + ResponseContext.createEmpty().merge(ctx); + } + + @Test + public void serializeWithCorrectnessTest() throws JsonProcessingException + { + final ResponseContext ctx1 = ResponseContext.createEmpty(); + ctx1.add(ResponseContext.Key.ETAG, "string-value"); + final DefaultObjectMapper mapper = new DefaultObjectMapper(); + Assert.assertEquals( + mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value")), + ctx1.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult() + ); + + final ResponseContext ctx2 = ResponseContext.createEmpty(); + ctx2.add(ResponseContext.Key.NUM_SCANNED_ROWS, 100); + Assert.assertEquals( + mapper.writeValueAsString(ImmutableMap.of("count", 100)), + ctx2.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult() + ); + } + @Test - public void serializeWith() throws JsonProcessingException + public void serializeWithTruncateValueTest() throws JsonProcessingException { final ResponseContext ctx = ResponseContext.createEmpty(); ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100L); ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); - final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, 1000); + final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); + Assert.assertEquals(fullString, res1.getTruncatedResult()); + final ResponseContext ctxCopy = ResponseContext.createEmpty(); + ctxCopy.merge(ctx); + final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30); + ctxCopy.remove(ResponseContext.Key.ETAG); + ctxCopy.put(ResponseContext.Key.TRUNCATED, true); + Assert.assertEquals(objectMapper.writeValueAsString(ctxCopy.getDelegate()), res2.getTruncatedResult()); + } + + @Test + public void serializeWithTruncateArrayTest() throws JsonProcessingException + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100L); + ctx.put( + ResponseContext.Key.UNCOVERED_INTERVALS, + Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + ); + ctx.put( + ResponseContext.Key.MISSING_SEGMENTS, + Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + ); + final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); + final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); + final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); Assert.assertEquals(fullString, res1.getTruncatedResult()); - final ResponseContext reducedCtx = ResponseContext.createEmpty(); - reducedCtx.merge(ctx); - final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 20); - reducedCtx.remove(ResponseContext.Key.ETAG); - Assert.assertEquals(objectMapper.writeValueAsString(reducedCtx.getDelegate()), res2.getTruncatedResult()); + final ResponseContext ctxCopy = ResponseContext.createEmpty(); + ctxCopy.merge(ctx); + final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70); + ctxCopy.put(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4)); + ctxCopy.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.emptyList()); + ctxCopy.put(ResponseContext.Key.TRUNCATED, true); + Assert.assertEquals(objectMapper.writeValueAsString(ctxCopy.getDelegate()), res2.getTruncatedResult()); + } + + @Test + public void deserializeTest() throws IOException + { + final DefaultObjectMapper mapper = new DefaultObjectMapper(); + final ResponseContext ctx = ResponseContext.deserialize( + mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value", "count", 100)), + mapper + ); + Assert.assertEquals("string-value", ctx.get(ResponseContext.Key.ETAG)); + Assert.assertEquals(100, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); + } + + @Test(expected = IllegalStateException.class) + public void deserializeISETest() throws IOException + { + final DefaultObjectMapper mapper = new DefaultObjectMapper(); + ResponseContext.deserialize( + mapper.writeValueAsString(ImmutableMap.of("ETag_unexpected", "string-value")), + mapper + ); } @Test @@ -185,7 +303,7 @@ public void extensionEnumIntegrityTest() ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_2.getName()) ); for (ResponseContext.BaseKey key : ExtensionResponseContextKey.values()) { - Assert.assertTrue(ResponseContext.Key.getKeys().contains(key)); + Assert.assertTrue(ResponseContext.Key.getAllRegisteredKeys().contains(key)); } } diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 9c4b8d248f95..300a5bb48e50 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -267,7 +267,7 @@ public void write(OutputStream outputStream) throws WebApplicationException RESPONSE_CTX_HEADER_LEN_LIMIT ); if (serializationResult.isReduced()) { - log.warn( + log.info( "Response Context truncated for id [%s] . Full context is [%s].", queryId, serializationResult.getFullResult() From 2106869b09aebec6e320b7803facbf773c47f91a Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 1 Aug 2019 01:28:34 +0300 Subject: [PATCH 12/15] Fixed tests --- .../druid/query/context/ResponseContext.java | 2 +- .../query/context/ResponseContextTest.java | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index f8122ead674d..09b45cba3fe7 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -164,7 +164,7 @@ public enum Key implements BaseKey * Indicates if a {@link ResponseContext} was truncated during serialization. */ TRUNCATED( - "trancated", + "truncated", (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue ); diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 559217bae86f..de8d0f4713a1 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -226,10 +226,10 @@ public void serializeWithCorrectnessTest() throws JsonProcessingException } @Test - public void serializeWithTruncateValueTest() throws JsonProcessingException + public void serializeWithTruncateValueTest() throws IOException { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100L); + ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100); ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); @@ -240,14 +240,17 @@ public void serializeWithTruncateValueTest() throws JsonProcessingException final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30); ctxCopy.remove(ResponseContext.Key.ETAG); ctxCopy.put(ResponseContext.Key.TRUNCATED, true); - Assert.assertEquals(objectMapper.writeValueAsString(ctxCopy.getDelegate()), res2.getTruncatedResult()); + Assert.assertEquals( + ctxCopy.getDelegate(), + ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate() + ); } @Test - public void serializeWithTruncateArrayTest() throws JsonProcessingException + public void serializeWithTruncateArrayTest() throws IOException { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100L); + ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100); ctx.put( ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) @@ -266,7 +269,10 @@ public void serializeWithTruncateArrayTest() throws JsonProcessingException ctxCopy.put(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4)); ctxCopy.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.emptyList()); ctxCopy.put(ResponseContext.Key.TRUNCATED, true); - Assert.assertEquals(objectMapper.writeValueAsString(ctxCopy.getDelegate()), res2.getTruncatedResult()); + Assert.assertEquals( + ctxCopy.getDelegate(), + ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate() + ); } @Test From 91e7c229e247b25440d978bb6a066c3bd9658774 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 1 Aug 2019 14:56:22 +0300 Subject: [PATCH 13/15] Simplified response context truncation during serialization --- .../druid/query/context/ResponseContext.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 09b45cba3fe7..27b95e8917d9 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -336,35 +336,35 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); sortedNodesByLength.sort(valueLengthReversedComparator); int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber; - // In general, the complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size + // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size for (Map.Entry e : sortedNodesByLength) { final String fieldName = e.getKey(); final JsonNode node = e.getValue(); - if (!node.isArray() || needToRemoveCharsNumber >= node.toString().length()) { - if (node.isArray()) { + if (node.isArray()) { + if (needToRemoveCharsNumber >= node.toString().length()) { final int lengthBeforeRemove = node.toString().length(); // Empty array could be correctly deserialized so we remove only its elements. ((ArrayNode) node).removeAll(); final int lengthAfterRemove = node.toString().length(); needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; } else { - // In general, a context should not contain nulls so we completely remove the field. - contextJsonNode.remove(fieldName); - // Since the field is completely removed (name + value) we need to do a recalculation - needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; - } - } else { - final ArrayNode arrNode = (ArrayNode) node; - while (node.size() > 0 && needToRemoveCharsNumber > 0) { - final int lengthBeforeRemove = arrNode.toString().length(); - // Reducing complexity by removing half of array's elements - final int removeUntil = node.size() / 2; - for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { - arrNode.remove(removeAt); + final ArrayNode arrNode = (ArrayNode) node; + while (node.size() > 0 && needToRemoveCharsNumber > 0) { + final int lengthBeforeRemove = arrNode.toString().length(); + // Reducing complexity by removing half of array's elements + final int removeUntil = node.size() / 2; + for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { + arrNode.remove(removeAt); + } + final int lengthAfterRemove = arrNode.toString().length(); + needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; } - final int lengthAfterRemove = arrNode.toString().length(); - needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; - } + } // node is not an array + } else { + // In general, a context should not contain nulls so we completely remove the field. + contextJsonNode.remove(fieldName); + // Since the field is completely removed (name + value) we need to do a recalculation + needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; } if (needToRemoveCharsNumber <= 0) { break; From 180075c8a7d2ac4cae86af40ea3659566ada2b53 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 2 Aug 2019 15:58:08 +0300 Subject: [PATCH 14/15] Extracted a method of removing elements from a response context and added some comments --- .../druid/query/context/ResponseContext.java | 72 +++++++++++++------ .../query/context/ResponseContextTest.java | 2 +- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 27b95e8917d9..b9a7e1c06161 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -183,7 +184,7 @@ public enum Key implements BaseKey * Primary way of registering context keys. * @throws IllegalArgumentException if the key has already been registered. */ - public static void registerKey(BaseKey key) + public static synchronized void registerKey(BaseKey key) { Preconditions.checkArgument( !registeredKeys.containsKey(key.getName()), @@ -212,7 +213,7 @@ public static BaseKey keyOf(String name) */ public static Collection getAllRegisteredKeys() { - return registeredKeys.values(); + return Collections.unmodifiableCollection(registeredKeys.values()); } private final String name; @@ -244,6 +245,11 @@ public BiFunction getMergeFunction() } } + protected abstract Map getDelegate(); + + private final Comparator> valueLengthReversedComparator = + Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); + /** * Create an empty DefaultResponseContext instance * @return empty DefaultResponseContext instance @@ -271,8 +277,6 @@ public static ResponseContext deserialize(String responseContext, ObjectMapper o return context; } - protected abstract Map getDelegate(); - /** * Associates the specified object with the specified key. * @throws IllegalStateException if the key has not been registered. @@ -319,7 +323,11 @@ public void merge(ResponseContext responseContext) /** * Serializes the context given that the resulting string length is less than the provided limit. - * This method tries to remove some elements from context collections if it's needed to satisfy the limit. + * This method removes some elements from context collections if it's needed to satisfy the limit. + * There is no explicit priorities of keys which values are being truncated because for now there are only + * two potential limit breaking keys (UNCOVERED_INTERVALS and MISSING_SEGMENTS) and their values are arrays. + * Thus current implementation considers these arrays as equal prioritized and starts removing elements from + * the array which serialized value length is the biggest. * The resulting string might be correctly deserialized to {@link ResponseContext}. */ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException @@ -332,8 +340,6 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars add(Key.TRUNCATED, true); final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate()); final ArrayList> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields()); - final Comparator> valueLengthReversedComparator = - Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); sortedNodesByLength.sort(valueLengthReversedComparator); int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber; // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size @@ -342,26 +348,22 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars final JsonNode node = e.getValue(); if (node.isArray()) { if (needToRemoveCharsNumber >= node.toString().length()) { - final int lengthBeforeRemove = node.toString().length(); - // Empty array could be correctly deserialized so we remove only its elements. - ((ArrayNode) node).removeAll(); - final int lengthAfterRemove = node.toString().length(); - needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; + // We need to remove more chars than the field's lenght so removing it completely + contextJsonNode.remove(fieldName); + // Since the field is completely removed (name + value) we need to do a recalculation + needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; } else { - final ArrayNode arrNode = (ArrayNode) node; - while (node.size() > 0 && needToRemoveCharsNumber > 0) { - final int lengthBeforeRemove = arrNode.toString().length(); - // Reducing complexity by removing half of array's elements - final int removeUntil = node.size() / 2; - for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { - arrNode.remove(removeAt); - } - final int lengthAfterRemove = arrNode.toString().length(); - needToRemoveCharsNumber -= lengthBeforeRemove - lengthAfterRemove; + final ArrayNode arrayNode = (ArrayNode) node; + needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber); + if (arrayNode.size() == 0) { + // The field is empty, removing it. + contextJsonNode.remove(fieldName); + // Since the field is completely removed (name + value) we need to do a recalculation + needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; } } // node is not an array } else { - // In general, a context should not contain nulls so we completely remove the field. + // A context should not contain nulls so we completely remove the field. contextJsonNode.remove(fieldName); // Since the field is completely removed (name + value) we need to do a recalculation needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; @@ -374,6 +376,30 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars } } + /** + * Removes {@code node}'s elements which total lenght of serialized values is greater or equal to the passed limit. + * If it is impossible to satisfy the limit the method removes all {@code node}'s elements. + * On every iteration it removes exactly half of the remained elements to reduce the overall complexity. + * @param node {@link ArrayNode} which elements are being removed. + * @param needToRemoveCharsNumber the number of chars need to be removed. + * @return the number of removed chars. + */ + private int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber) + { + int removedCharsNumber = 0; + while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) { + final int lengthBeforeRemove = node.toString().length(); + // Reducing complexity by removing half of array's elements + final int removeUntil = node.size() / 2; + for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { + node.remove(removeAt); + } + final int lengthAfterRemove = node.toString().length(); + removedCharsNumber += lengthBeforeRemove - lengthAfterRemove; + } + return removedCharsNumber; + } + /** * Serialization result of {@link ResponseContext}. * Response context might be serialized using max legth limit, in this case the context might be reduced diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index de8d0f4713a1..f1354c3ea6b3 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -267,7 +267,7 @@ public void serializeWithTruncateArrayTest() throws IOException ctxCopy.merge(ctx); final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70); ctxCopy.put(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4)); - ctxCopy.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.emptyList()); + ctxCopy.remove(ResponseContext.Key.MISSING_SEGMENTS); ctxCopy.put(ResponseContext.Key.TRUNCATED, true); Assert.assertEquals( ctxCopy.getDelegate(), From f1d41b462c8d219351efc84d4461c1ae766bc11b Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 2 Aug 2019 20:16:36 +0300 Subject: [PATCH 15/15] Fixed typos and updated comments --- .../druid/query/context/ResponseContext.java | 16 +++++++++------- .../druid/segment/StringDimensionHandler.java | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index b9a7e1c06161..269a1e564776 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -130,7 +130,7 @@ public enum Key implements BaseKey ETAG("ETag"), /** * Query fail time (current time + timeout). - * It is not updated continuously as TIMEOUT_AT. + * It is not updated continuously as {@link Key#TIMEOUT_AT}. */ QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"), /** @@ -247,7 +247,7 @@ public BiFunction getMergeFunction() protected abstract Map getDelegate(); - private final Comparator> valueLengthReversedComparator = + private static final Comparator> valueLengthReversedComparator = Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); /** @@ -325,7 +325,8 @@ public void merge(ResponseContext responseContext) * Serializes the context given that the resulting string length is less than the provided limit. * This method removes some elements from context collections if it's needed to satisfy the limit. * There is no explicit priorities of keys which values are being truncated because for now there are only - * two potential limit breaking keys (UNCOVERED_INTERVALS and MISSING_SEGMENTS) and their values are arrays. + * two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS} + * and {@link Key#MISSING_SEGMENTS}) and their values are arrays. * Thus current implementation considers these arrays as equal prioritized and starts removing elements from * the array which serialized value length is the biggest. * The resulting string might be correctly deserialized to {@link ResponseContext}. @@ -348,7 +349,7 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars final JsonNode node = e.getValue(); if (node.isArray()) { if (needToRemoveCharsNumber >= node.toString().length()) { - // We need to remove more chars than the field's lenght so removing it completely + // We need to remove more chars than the field's length so removing it completely contextJsonNode.remove(fieldName); // Since the field is completely removed (name + value) we need to do a recalculation needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; @@ -356,7 +357,8 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars final ArrayNode arrayNode = (ArrayNode) node; needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber); if (arrayNode.size() == 0) { - // The field is empty, removing it. + // The field is empty, removing it because an empty array field may be misleading + // for the recipients of the truncated response context. contextJsonNode.remove(fieldName); // Since the field is completely removed (name + value) we need to do a recalculation needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; @@ -377,14 +379,14 @@ public SerializationResult serializeWith(ObjectMapper objectMapper, int maxChars } /** - * Removes {@code node}'s elements which total lenght of serialized values is greater or equal to the passed limit. + * Removes {@code node}'s elements which total length of serialized values is greater or equal to the passed limit. * If it is impossible to satisfy the limit the method removes all {@code node}'s elements. * On every iteration it removes exactly half of the remained elements to reduce the overall complexity. * @param node {@link ArrayNode} which elements are being removed. * @param needToRemoveCharsNumber the number of chars need to be removed. * @return the number of removed chars. */ - private int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber) + private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber) { int removedCharsNumber = 0; while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) { diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java index ff7809351dbe..c14bd319bb13 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java @@ -58,7 +58,7 @@ public class StringDimensionHandler implements DimensionHandler