From 70356b351ff7060d381c9a4a81dc7fbce13d51ca Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala Date: Tue, 17 Sep 2024 10:14:40 +0530 Subject: [PATCH 1/2] Add serde for ColumnBasedRowsAndColumns to fix window queries without group by (#16658) Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine. --- .../DruidDefaultSerializersModule.java | 18 +-- ...WindowOperatorQueryQueryRunnerFactory.java | 18 +-- .../rowsandcols/AppendableMapOfColumns.java | 10 -- .../rowsandcols/ConcatRowsAndColumns.java | 7 -- .../CursorFactoryRowsAndColumns.java | 2 +- .../rowsandcols/EmptyRowsAndColumns.java | 8 -- .../LazilyDecoratedRowsAndColumns.java | 14 +-- .../rowsandcols/LimitedRowsAndColumns.java | 9 -- .../MapOfColumnsRowsAndColumns.java | 2 +- .../rowsandcols/RearrangedRowsAndColumns.java | 7 -- .../query/rowsandcols/RowsAndColumns.java | 86 +++++++++++++- .../concrete/AbstractFrameRowsAndColumns.java | 106 ++++++++++++++++++ .../ColumnBasedFrameRowsAndColumns.java | 54 ++------- .../FrameRowsAndColumns.java} | 18 +-- .../concrete/RowBasedFrameRowsAndColumns.java | 44 +------- .../jackson/DefaultObjectMapperTest.java | 26 +++++ .../query/rowsandcols/NoAsRowsAndColumns.java | 9 -- .../ColumnBasedFrameRowsAndColumnsTest.java | 10 +- .../semantic/RowsAndColumnsDecoratorTest.java | 37 ++++++ .../sql/calcite/run/NativeSqlEngine.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 1 + .../sql/calcite/CalciteWindowQueryTest.java | 2 +- .../druid/sql/calcite/NotYetSupported.java | 1 + .../wikipediaFramedAggregations.sqlTest | 2 +- 24 files changed, 296 insertions(+), 197 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java rename processing/src/main/java/org/apache/druid/query/rowsandcols/{semantic/WireTransferable.java => concrete/FrameRowsAndColumns.java} (67%) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 30cc388f1d9d..cad8fdfd8315 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -37,7 +37,6 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws ); addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); - addSerializer(RowsAndColumns.class, new JsonSerializer() - { - @Override - public void serialize( - RowsAndColumns value, - JsonGenerator gen, - SerializerProvider serializers - ) throws IOException - { - // It would be really cool if jackson offered an output stream that would allow us to push bytes - // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute - // back to Jackson at some point. - gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); - } - }); + addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer()); + addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d18f6c252c1a..f86f91be18be 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import org.apache.druid.error.DruidException; -import org.apache.druid.frame.Frame; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -31,10 +30,8 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.segment.Segment; -import org.apache.druid.segment.column.RowSignature; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -100,19 +97,8 @@ public Sequence apply( @Override public RowsAndColumns apply(@Nullable RowsAndColumns input) { - // This is interim code to force a materialization by synthesizing the wire transfer - // that will need to naturally happen as we flesh out this code more. For now, we - // materialize the bytes on-heap and then read them back in as a frame. if (input instanceof LazilyDecoratedRowsAndColumns) { - final WireTransferable wire = WireTransferable.fromRAC(input); - final byte[] frameBytes = wire.bytesToTransfer(); - - RowSignature.Builder sigBob = RowSignature.builder(); - for (String column : input.getColumnNames()) { - sigBob.add(column, input.findColumn(column).toAccessor().getType()); - } - - return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + return input.as(FrameRowsAndColumns.class); } return input; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java index 61f6855cd01c..d83f56c7ba5f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java @@ -79,14 +79,4 @@ public Column findColumn(String name) } return retVal; } - - @Override - @SuppressWarnings("unchecked") - public T as(Class clazz) - { - if (AppendableRowsAndColumns.class.equals(clazz)) { - return (T) this; - } - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java index c6ced60849d4..3f70f82a2537 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java @@ -141,13 +141,6 @@ public Column findColumn(String name) } } - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - private class ConcatedidColumn implements Column { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java index 6fa74660f7df..46fda857516f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java @@ -61,7 +61,7 @@ public T as(Class clazz) if (CursorFactory.class == clazz) { return (T) cursorFactory; } - return null; + return RowsAndColumns.super.as(clazz); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java index dd0c7dab1cda..56647e0f5687 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; @@ -44,11 +43,4 @@ public Column findColumn(String name) { return null; } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index a05b31dc2cb4..bb35f6837976 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -39,10 +39,10 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; @@ -150,16 +150,10 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() @SuppressWarnings("unused") @SemanticCreator - public WireTransferable toWireTransferable() + public FrameRowsAndColumns toFrameRowsAndColumns() { - return () -> { - final Pair materialized = materialize(); - if (materialized == null) { - return new byte[]{}; - } else { - return materialized.lhs; - } - }; + maybeMaterialize(); + return base.as(FrameRowsAndColumns.class); } private void maybeMaterialize() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java index abb3d4649b1a..8cfadecb4dd2 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java @@ -23,7 +23,6 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.LimitedColumn; -import javax.annotation.Nullable; import java.util.Collection; public class LimitedRowsAndColumns implements RowsAndColumns @@ -66,12 +65,4 @@ public Column findColumn(String name) return new LimitedColumn(column, start, end); } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java index 29f092f67440..d6bc1026a98d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java @@ -164,7 +164,7 @@ public T as(Class clazz) if (AppendableRowsAndColumns.class.equals(clazz)) { return (T) new AppendableMapOfColumns(this); } - return null; + return RowsAndColumns.super.as(clazz); } public static class Builder diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index f1793f8fd0e4..e64f086edd7f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum) ); } } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 7b6a1f6215d3..a34d0e463c07 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,12 +19,30 @@ package org.apache.druid.query.rowsandcols; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.channel.ByteTracker; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; import java.util.Collection; /** @@ -110,6 +128,72 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) * @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had * through a local implementation of the interface. */ + @SuppressWarnings("unchecked") @Nullable - T as(Class clazz); + default T as(Class clazz) + { + if (clazz.isInstance(this)) { + return (T) this; + } + return null; + } + + /** + * Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns} + */ + class RowsAndColumnsSerializer extends StdSerializer + { + public RowsAndColumnsSerializer() + { + super(RowsAndColumns.class); + } + + @Override + public void serialize( + RowsAndColumns rac, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class); + if (frameRAC == null) { + throw DruidException.defensive("Unable to serialize RAC"); + } + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); + + Frame frame = frameRAC.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker()); + + jsonGenerator.writeBinary(baos.toByteArray()); + } + } + + /** + * Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns} + */ + class RowsAndColumnsDeserializer extends StdDeserializer + { + public RowsAndColumnsDeserializer() + { + super(RowsAndColumns.class); + } + + @Override + public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + RowSignature sig = jsonParser.readValueAs(RowSignature.class); + jsonParser.nextValue(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jsonParser.readBinaryValue(baos); + Frame frame = Frame.wrap(baos.toByteArray()); + if (frame.type() == FrameType.COLUMNAR) { + return new ColumnBasedFrameRowsAndColumns(frame, sig); + } else { + return new RowBasedFrameRowsAndColumns(frame, sig); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java new file mode 100644 index 000000000000..5295326c8622 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import com.google.common.base.Objects; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public abstract class AbstractFrameRowsAndColumns implements FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + final Frame frame; + final RowSignature signature; + final LinkedHashMap colCache = new LinkedHashMap<>(); + + public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = frame; + this.signature = signature; + } + + @Override + public Frame getFrame() + { + return frame; + } + + @Override + public RowSignature getSignature() + { + return signature; + } + + @Override + public Collection getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + if (CursorFactory.class.equals(clazz)) { + return (T) FrameReader.create(signature).makeCursorFactory(frame); + } + return FrameRowsAndColumns.super.as(clazz); + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFrameRowsAndColumns)) { + return false; + } + AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index e99a3f7f3139..c4a4577dc1af 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -19,44 +19,21 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.COLUMNAR.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.COLUMNAR.ensureType(frame), signature); } @Nullable @@ -71,28 +48,17 @@ public Column findColumn(String name) } else { final ColumnType columnType = signature .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + .orElseThrow( + () -> DruidException.defensive( + "just got the id [%s][%s], why is columnType not there?", + columnIndex, + name + ) + ); colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); } } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java similarity index 67% rename from processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java rename to processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index a7d55f599293..022a0f91ac16 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -17,21 +17,15 @@ * under the License. */ -package org.apache.druid.query.rowsandcols.semantic; +package org.apache.druid.query.rowsandcols.concrete; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.frame.Frame; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.segment.column.RowSignature; -public interface WireTransferable +public interface FrameRowsAndColumns extends RowsAndColumns { - static WireTransferable fromRAC(RowsAndColumns rac) - { - WireTransferable retVal = rac.as(WireTransferable.class); - if (retVal == null) { - throw new ISE("Rac[%s] cannot be transferred over the wire", rac.getClass()); - } - return retVal; - } + Frame getFrame(); - byte[] bytesToTransfer(); + RowSignature getSignature(); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 865a24e5d6da..c702c210775c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -24,40 +24,17 @@ import org.apache.druid.frame.FrameType; import org.apache.druid.frame.field.FieldReader; import org.apache.druid.frame.field.FieldReaders; -import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.ROW_BASED.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.ROW_BASED.ensureType(frame), signature); } @Nullable @@ -86,21 +63,4 @@ public Column findColumn(String name) } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 989d137770ee..92c8a2cb2989 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -22,12 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.query.Query; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assert; @@ -35,6 +41,8 @@ import java.util.Arrays; +import static org.junit.Assert.assertEquals; + /** * */ @@ -102,4 +110,22 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException } Assert.fail("We expect InvalidTypeIdException to be thrown"); } + + @Test + public void testColumnBasedFrameRowsAndColumns() throws Exception + { + DefaultObjectMapper om = new DefaultObjectMapper("test"); + + MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ))); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + byte[] bytes = om.writeValueAsBytes(frc); + + ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); + assertEquals(frc, frc2); + } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java index 422c87c8b7c6..16cd44e870ba 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.Collection; public class NoAsRowsAndColumns implements RowsAndColumns @@ -50,12 +49,4 @@ public Column findColumn(String name) { return rac.findColumn(name); } - - @Nullable - @Override - public T as(Class clazz) - { - // Pretend like this doesn't implement any semantic interfaces - return null; - } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index acfcbe6f83ed..f6a10e011464 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -37,7 +37,15 @@ public ColumnBasedFrameRowsAndColumnsTest() public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { - LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); + LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns( + input, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); rac.numRows(); // materialize return (ColumnBasedFrameRowsAndColumns) rac.getBase(); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index f90c2ea19172..41295f480176 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.semantic; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -32,6 +33,9 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -214,6 +218,39 @@ public void testDecorationWithListOfResultRows() } } + @Test + public void testDecoratorWithColumnBasedFrameRAC() + { + RowSignature siggy = RowSignature.builder() + .add("colA", ColumnType.LONG) + .add("colB", ColumnType.LONG) + .build(); + + Object[][] vals = new Object[][]{ + {1L, 4L}, + {2L, -4L}, + {3L, 3L}, + {4L, -3L}, + {5L, 4L}, + {6L, 82L}, + {7L, -90L}, + {8L, 4L}, + {9L, 0L}, + {10L, 0L} + }; + + MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ) + ); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + + validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null); + } + private void validateDecorated( RowsAndColumns base, RowSignature siggy, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 2477ac38dec1..d02d302437b8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -109,7 +109,6 @@ public boolean featureAvailable(EngineFeature feature) case ALLOW_TOP_LEVEL_UNION_ALL: case TIME_BOUNDARY_QUERY: case GROUPBY_IMPLICITLY_SORTS: - case WINDOW_LEAF_OPERATOR: return true; case CAN_INSERT: case CAN_REPLACE: @@ -117,6 +116,7 @@ public boolean featureAvailable(EngineFeature feature) case WRITE_EXTERNAL_DATA: case SCAN_ORDER_BY_NON_TIME: case SCAN_NEEDS_SIGNATURE: + case WINDOW_LEAF_OPERATOR: return false; default: throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index a8dcc35ea7ad..732681de238a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -16086,6 +16086,7 @@ public void testScanAndSortOnJoin() .run(); } + @NotYetSupported(Modes.UNSUPPORTED_DATASOURCE) @Test public void testWindowingOverJoin() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index ccf459e743e7..b03938b6ee96 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -266,7 +266,7 @@ public void testFailure_partitionByMVD() ); assertEquals( - "Encountered a multi value column [v0]. Window processing does not support MVDs. " + "Encountered a multi value column. Window processing does not support MVDs. " + "Consider using UNNEST or MV_TO_ARRAY.", e.getMessage() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index da1431f433d4..f0c48ff44f2f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -89,6 +89,7 @@ enum Modes RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"), LONG_CASTING(AssertionError.class, "expected: java.lang.Long"), UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"), + UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run on top of a query or inline data source"), UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"), UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"), JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"), diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index 87873d44c485..104cb0d2422d 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -2,7 +2,7 @@ type: "operatorValidation" sql: | SELECT - countryIsoCode, + countryIsoCode, CAST (FLOOR(__time TO HOUR) AS BIGINT) t, SUM(delta) delta, SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta From bb6d061b544d3c5940cabcef4fe415c4774a0a96 Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala Date: Tue, 10 Sep 2024 14:20:54 +0530 Subject: [PATCH 2/2] Fix String Frame Readers to read String Arrays correctly (#16885) While writing to a frame, String arrays are written by setting the multivalue byte. But while reading, it was hardcoded to false. --- .../druid/frame/field/StringFieldReader.java | 5 +- .../read/columnar/FrameColumnReaders.java | 4 +- .../StringArrayFrameColumnReader.java | 385 ++++++++++++++++ .../columnar/StringFrameColumnReader.java | 430 +++++++----------- .../columnar/StringFrameColumnWriter.java | 5 +- .../operator/window/RowsAndColumnsHelper.java | 12 + ...t.java => EvaluateRowsAndColumnsTest.java} | 36 +- 7 files changed, 603 insertions(+), 274 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java rename processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/{TestVirtualColumnEvaluationRowsAndColumnsTest.java => EvaluateRowsAndColumnsTest.java} (73%) diff --git a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java index 1c51a914e0d5..be2263be138d 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java @@ -480,9 +480,8 @@ public int numRows() public boolean isNull(int rowNum) { final long fieldPosition = coach.computeFieldPosition(rowNum); - byte[] nullBytes = new byte[3]; - dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3); - return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL); + return dataRegion.getByte(fieldPosition) == StringFieldWriter.NULL_ROW + && dataRegion.getByte(fieldPosition + 1) == StringFieldWriter.ROW_TERMINATOR; } @Override diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java index 9b4dc85cb1e0..6ceb6b729176 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReaders.java @@ -51,7 +51,7 @@ public static FrameColumnReader create( return new DoubleFrameColumnReader(columnNumber); case STRING: - return new StringFrameColumnReader(columnNumber, false); + return new StringFrameColumnReader(columnNumber); case COMPLEX: return new ComplexFrameColumnReader(columnNumber); @@ -59,7 +59,7 @@ public static FrameColumnReader create( case ARRAY: switch (columnType.getElementType().getType()) { case STRING: - return new StringFrameColumnReader(columnNumber, true); + return new StringArrayFrameColumnReader(columnNumber); case LONG: return new LongArrayFrameColumnReader(columnNumber); case FLOAT: diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java new file mode 100644 index 000000000000..31c56bf38e7d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringArrayFrameColumnReader.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.read.columnar; + +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.objects.ObjectArrays; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReaderUtils; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.columnar.FrameColumnWriters; +import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; +import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.ObjectColumnSelector; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.vector.ReadableVectorInspector; +import org.apache.druid.segment.vector.ReadableVectorOffset; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * Reader for {@link ColumnType#STRING_ARRAY}. + * This is similar to {@link StringFrameColumnReader} reading mvds in reading bytes from frame + */ +public class StringArrayFrameColumnReader implements FrameColumnReader +{ + private final int columnNumber; + + /** + * Create a new reader. + * + * @param columnNumber column number + */ + StringArrayFrameColumnReader(int columnNumber) + { + this.columnNumber = columnNumber; + } + + @Override + public Column readRACColumn(Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long positionOfLengths = getStartOfStringLengthSection(frame.numRows()); + final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows()); + + StringArrayFrameColumn frameCol = new StringArrayFrameColumn( + frame, + memory, + positionOfLengths, + positionOfPayloads + ); + + return new ColumnAccessorBasedColumn(frameCol); + } + + @Override + public ColumnPlus readColumn(final Frame frame) + { + final Memory memory = frame.region(columnNumber); + validate(memory); + + final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows()); + final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows()); + + final BaseColumn baseColumn = new StringArrayFrameColumn( + frame, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); + + return new ColumnPlus( + baseColumn, + new ColumnCapabilitiesImpl().setType(ColumnType.STRING_ARRAY) + .setHasMultipleValues(false) + .setDictionaryEncoded(false), + frame.numRows() + ); + } + + private void validate(final Memory region) + { + // Check if column is big enough for a header + if (region.getCapacity() < StringFrameColumnWriter.DATA_OFFSET) { + throw DruidException.defensive("Column[%s] is not big enough for a header", columnNumber); + } + + final byte typeCode = region.getByte(0); + if (typeCode != FrameColumnWriters.TYPE_STRING_ARRAY) { + throw DruidException.defensive( + "Column[%s] does not have the correct type code; expected[%s], got[%s]", + columnNumber, + FrameColumnWriters.TYPE_STRING_ARRAY, + typeCode + ); + } + } + + private static long getStartOfCumulativeLengthSection() + { + return StringFrameColumnWriter.DATA_OFFSET; + } + + private static long getStartOfStringLengthSection(final int numRows) + { + return StringFrameColumnWriter.DATA_OFFSET + (long) Integer.BYTES * numRows; + } + + private long getStartOfStringDataSection( + final Memory memory, + final int numRows + ) + { + if (numRows < 0) { + throw DruidException.defensive("Encountered -ve numRows [%s] while reading frame", numRows); + } + final int totalNumValues = FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + numRows - 1 + ); + + return getStartOfStringLengthSection(numRows) + (long) Integer.BYTES * totalNumValues; + } + + private static class StringArrayFrameColumn extends ObjectColumnAccessorBase implements BaseColumn + { + private final Frame frame; + private final Memory memory; + private final long startOfStringLengthSection; + private final long startOfStringDataSection; + + private StringArrayFrameColumn( + Frame frame, + Memory memory, + long startOfStringLengthSection, + long startOfStringDataSection + ) + { + this.frame = frame; + this.memory = memory; + this.startOfStringLengthSection = startOfStringLengthSection; + this.startOfStringDataSection = startOfStringDataSection; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) + { + return new ObjectColumnSelector() + { + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + + @Nullable + @Override + public Object getObject() + { + return getRowAsObject(frame.physicalRow(offset.getOffset()), true); + } + + @Override + public Class classOfObject() + { + return Object[].class; + } + }; + } + + @Override + public VectorObjectSelector makeVectorObjectSelector(final ReadableVectorOffset offset) + { + class StringArrayFrameVectorObjectSelector implements VectorObjectSelector + { + private final Object[] vector = new Object[offset.getMaxVectorSize()]; + private int id = ReadableVectorInspector.NULL_ID; + + @Override + public Object[] getObjectVector() + { + computeVectorIfNeeded(); + return vector; + } + + @Override + public int getMaxVectorSize() + { + return offset.getMaxVectorSize(); + } + + @Override + public int getCurrentVectorSize() + { + return offset.getCurrentVectorSize(); + } + + private void computeVectorIfNeeded() + { + if (id == offset.getId()) { + return; + } + + if (offset.isContiguous()) { + final int start = offset.getStartOffset(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(i + start); + vector[i] = getRowAsObject(physicalRow, true); + } + } else { + final int[] offsets = offset.getOffsets(); + + for (int i = 0; i < offset.getCurrentVectorSize(); i++) { + final int physicalRow = frame.physicalRow(offsets[i]); + vector[i] = getRowAsObject(physicalRow, true); + } + } + + id = offset.getId(); + } + } + + return new StringArrayFrameVectorObjectSelector(); + } + + @Override + public void close() + { + // Do nothing. + } + + @Override + public ColumnType getType() + { + return ColumnType.STRING_ARRAY; + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Override + protected Object getVal(int rowNum) + { + return getRowAsObject(frame.physicalRow(rowNum), true); + } + + @Override + protected Comparator getComparator() + { + return Comparator.nullsFirst(ColumnType.STRING_ARRAY.getStrategy()); + } + + /** + * Returns a ByteBuffer containing UTF-8 encoded string number {@code index}. The ByteBuffer is always newly + * created, so it is OK to change its position, limit, etc. However, it may point to shared memory, so it is + * not OK to write to its contents. + */ + @Nullable + private ByteBuffer getStringUtf8(final int index) + { + if (startOfStringLengthSection > Long.MAX_VALUE - (long) Integer.BYTES * index) { + throw DruidException.defensive("length index would overflow trying to read the frame memory!"); + } + + final int dataEndVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * index); + if (startOfStringDataSection > Long.MAX_VALUE - dataEndVariableIndex) { + throw DruidException.defensive("data end index would overflow trying to read the frame memory!"); + } + + final long dataStart; + final long dataEnd = startOfStringDataSection + dataEndVariableIndex; + + if (index == 0) { + dataStart = startOfStringDataSection; + } else { + final int dataStartVariableIndex = memory.getInt(startOfStringLengthSection + (long) Integer.BYTES * (index + - 1)); + if (startOfStringDataSection > Long.MAX_VALUE - dataStartVariableIndex) { + throw DruidException.defensive("data start index would overflow trying to read the frame memory!"); + } + dataStart = startOfStringDataSection + dataStartVariableIndex; + } + + final int dataLength = Ints.checkedCast(dataEnd - dataStart); + + if ((dataLength == 0 && NullHandling.replaceWithDefault()) || + (dataLength == 1 && memory.getByte(dataStart) == FrameWriterUtils.NULL_STRING_MARKER)) { + return null; + } + + return FrameReaderUtils.readByteBuffer(memory, dataStart, dataLength); + } + + @Nullable + private String getString(final int index) + { + final ByteBuffer stringUtf8 = getStringUtf8(index); + + if (stringUtf8 == null) { + return null; + } else { + return StringUtils.fromUtf8(stringUtf8); + } + } + + /** + * Returns the object at the given physical row number. + * + * @param physicalRow physical row number + * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. + */ + @Nullable + private Object getRowAsObject(final int physicalRow, final boolean decode) + { + final int cumulativeRowLength = FrameColumnReaderUtils.getCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow + ); + final int rowLength; + + if (FrameColumnReaderUtils.isNullRow(cumulativeRowLength)) { + return null; + } else if (physicalRow == 0) { + rowLength = cumulativeRowLength; + } else { + rowLength = cumulativeRowLength - FrameColumnReaderUtils.getAdjustedCumulativeRowLength( + memory, + getStartOfCumulativeLengthSection(), + physicalRow - 1 + ); + } + + if (rowLength == 0) { + return ObjectArrays.EMPTY_ARRAY; + } else { + final Object[] row = new Object[rowLength]; + + for (int i = 0; i < rowLength; i++) { + final int index = cumulativeRowLength - rowLength + i; + row[i] = decode ? getString(index) : getStringUtf8(index); + } + + return row; + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java index d9fb9d83a9f4..2385c431e5b3 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/StringFrameColumnReader.java @@ -19,18 +19,16 @@ package org.apache.druid.frame.read.columnar; -import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; -import it.unimi.dsi.fastutil.objects.ObjectArrays; import org.apache.datasketches.memory.Memory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReaderUtils; import org.apache.druid.frame.write.FrameWriterUtils; import org.apache.druid.frame.write.columnar.FrameColumnWriters; import org.apache.druid.frame.write.columnar.StringFrameColumnWriter; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.filter.DruidPredicateFactory; @@ -40,13 +38,11 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase; import org.apache.druid.segment.BaseSingleValueDimensionSelector; -import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelectorUtils; import org.apache.druid.segment.IdLookup; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.DictionaryEncodedColumn; @@ -67,23 +63,20 @@ import java.util.List; /** - * Reader for {@link StringFrameColumnWriter}, types {@link ColumnType#STRING} and {@link ColumnType#STRING_ARRAY}. + * Reader for {@link StringFrameColumnWriter}, type {@link ColumnType#STRING}. */ public class StringFrameColumnReader implements FrameColumnReader { private final int columnNumber; - private final boolean asArray; /** * Create a new reader. * * @param columnNumber column number - * @param asArray true for {@link ColumnType#STRING_ARRAY}, false for {@link ColumnType#STRING} */ - StringFrameColumnReader(int columnNumber, boolean asArray) + StringFrameColumnReader(int columnNumber) { this.columnNumber = columnNumber; - this.asArray = asArray; } @Override @@ -92,18 +85,20 @@ public Column readRACColumn(Frame frame) final Memory memory = frame.region(columnNumber); validate(memory); + if (isMultiValue(memory)) { + throw InvalidInput.exception("Encountered a multi value column. Window processing does not support MVDs. " + + "Consider using UNNEST or MV_TO_ARRAY."); + } final long positionOfLengths = getStartOfStringLengthSection(frame.numRows(), false); final long positionOfPayloads = getStartOfStringDataSection(memory, frame.numRows(), false); - StringFrameColumn frameCol = - new StringFrameColumn( - frame, - false, - memory, - positionOfLengths, - positionOfPayloads, - asArray || isMultiValue(memory) // Read MVDs as String arrays - ); + StringFrameColumn frameCol = new StringFrameColumn( + frame, + false, + memory, + positionOfLengths, + positionOfPayloads + ); return new ColumnAccessorBasedColumn(frameCol); } @@ -118,35 +113,19 @@ public ColumnPlus readColumn(final Frame frame) final long startOfStringLengthSection = getStartOfStringLengthSection(frame.numRows(), multiValue); final long startOfStringDataSection = getStartOfStringDataSection(memory, frame.numRows(), multiValue); - final BaseColumn baseColumn; - - if (asArray) { - baseColumn = new StringArrayFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection - ); - } else { - baseColumn = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - false - ); - } + final BaseColumn baseColumn = new StringFrameColumn( + frame, + multiValue, + memory, + startOfStringLengthSection, + startOfStringDataSection + ); return new ColumnPlus( baseColumn, - new ColumnCapabilitiesImpl().setType(asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING) - .setHasMultipleValues(!asArray && multiValue) - .setDictionaryEncoded(false) - .setHasBitmapIndexes(false) - .setHasSpatialIndexes(false) - .setHasNulls(ColumnCapabilities.Capable.UNKNOWN), + new ColumnCapabilitiesImpl().setType(ColumnType.STRING) + .setHasMultipleValues(multiValue) + .setDictionaryEncoded(false), frame.numRows() ); } @@ -159,12 +138,11 @@ private void validate(final Memory region) } final byte typeCode = region.getByte(0); - final byte expectedTypeCode = asArray ? FrameColumnWriters.TYPE_STRING_ARRAY : FrameColumnWriters.TYPE_STRING; - if (typeCode != expectedTypeCode) { + if (typeCode != FrameColumnWriters.TYPE_STRING) { throw DruidException.defensive( "Column[%s] does not have the correct type code; expected[%s], got[%s]", columnNumber, - expectedTypeCode, + FrameColumnWriters.TYPE_STRING, typeCode ); } @@ -172,7 +150,7 @@ private void validate(final Memory region) private static boolean isMultiValue(final Memory memory) { - return memory.getByte(1) == 1; + return memory.getByte(StringFrameColumnWriter.MULTI_VALUE_POSITION) == StringFrameColumnWriter.MULTI_VALUE_BYTE; } private static long getStartOfCumulativeLengthSection() @@ -213,8 +191,7 @@ private static long getStartOfStringDataSection( return getStartOfStringLengthSection(numRows, multiValue) + (long) Integer.BYTES * totalNumValues; } - @VisibleForTesting - static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn + private static class StringFrameColumn extends ObjectColumnAccessorBase implements DictionaryEncodedColumn { private final Frame frame; private final Memory memory; @@ -226,18 +203,12 @@ static class StringFrameColumn extends ObjectColumnAccessorBase implements Dicti */ private final boolean multiValue; - /** - * Whether the column is being read as {@link ColumnType#STRING_ARRAY} (true) or {@link ColumnType#STRING} (false). - */ - private final boolean asArray; - private StringFrameColumn( Frame frame, boolean multiValue, Memory memory, long startOfStringLengthSection, - long startOfStringDataSection, - final boolean asArray + long startOfStringDataSection ) { this.frame = frame; @@ -245,7 +216,6 @@ private StringFrameColumn( this.memory = memory; this.startOfStringLengthSection = startOfStringLengthSection; this.startOfStringDataSection = startOfStringDataSection; - this.asArray = asArray; } @Override @@ -293,11 +263,142 @@ public int getCardinality() @Override public DimensionSelector makeDimensionSelector(ReadableOffset offset, @Nullable ExtractionFn extractionFn) { - if (asArray) { - throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", ColumnType.STRING_ARRAY); - } + if (multiValue) { + class MultiValueSelector implements DimensionSelector + { + private int currentRow = -1; + private List currentValues = null; + private final RangeIndexedInts indexedInts = new RangeIndexedInts(); + + @Override + public int getValueCardinality() + { + return CARDINALITY_UNKNOWN; + } + + @Nullable + @Override + public String lookupName(int id) + { + populate(); + final ByteBuffer buf = currentValues.get(id); + final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate()); + return extractionFn == null ? s : extractionFn.apply(s); + } + + @Nullable + @Override + public ByteBuffer lookupNameUtf8(int id) + { + assert supportsLookupNameUtf8(); + populate(); + return currentValues.get(id); + } + + @Override + public boolean supportsLookupNameUtf8() + { + return extractionFn == null; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return false; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + + @Override + public IndexedInts getRow() + { + populate(); + return indexedInts; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); + } - return makeDimensionSelectorInternal(offset, extractionFn); + @Override + public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicateFactory); + } + + @Nullable + @Override + public Object getObject() + { + return getRowAsObject(frame.physicalRow(offset.getOffset()), true); + } + + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + + private void populate() + { + final int row = offset.getOffset(); + + if (row != currentRow) { + currentValues = getRowAsListUtf8(frame.physicalRow(row)); + indexedInts.setSize(currentValues.size()); + currentRow = row; + } + } + } + + return new MultiValueSelector(); + } else { + class SingleValueSelector extends BaseSingleValueDimensionSelector + { + @Nullable + @Override + protected String getValue() + { + final String s = getString(frame.physicalRow(offset.getOffset())); + return extractionFn == null ? s : extractionFn.apply(s); + } + + @Nullable + @Override + public ByteBuffer lookupNameUtf8(int id) + { + assert supportsLookupNameUtf8(); + return getStringUtf8(frame.physicalRow(offset.getOffset())); + } + + @Override + public boolean supportsLookupNameUtf8() + { + return extractionFn == null; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Do nothing. + } + } + + return new SingleValueSelector(); + } } @Override @@ -385,7 +486,7 @@ public void close() @Override public ColumnType getType() { - return asArray ? ColumnType.STRING_ARRAY : ColumnType.STRING; + return ColumnType.STRING; } @Override @@ -397,7 +498,7 @@ public int numRows() @Override protected Object getVal(int rowNum) { - return getString(frame.physicalRow(rowNum)); + return getRowAsObject(frame.physicalRow(rowNum), true); } @Override @@ -452,10 +553,6 @@ private String getString(final int index) /** * Returns the object at the given physical row number. * - * When {@link #asArray}, the return value is always of type {@code Object[]}. Otherwise, the return value - * is either an empty list (if the row is empty), a single String (if the row has one value), or a List - * of Strings (if the row has more than one value). - * * @param physicalRow physical row number * @param decode if true, return java.lang.String. If false, return UTF-8 ByteBuffer. */ @@ -483,11 +580,11 @@ private Object getRowAsObject(final int physicalRow, final boolean decode) } if (rowLength == 0) { - return asArray ? ObjectArrays.EMPTY_ARRAY : Collections.emptyList(); + return Collections.emptyList(); } else if (rowLength == 1) { final int index = cumulativeRowLength - 1; final Object o = decode ? getString(index) : getStringUtf8(index); - return asArray ? new Object[]{o} : o; + return o; } else { final Object[] row = new Object[rowLength]; @@ -496,26 +593,21 @@ private Object getRowAsObject(final int physicalRow, final boolean decode) row[i] = decode ? getString(index) : getStringUtf8(index); } - return asArray ? row : Arrays.asList(row); + return Arrays.asList(row); } } else { final Object o = decode ? getString(physicalRow) : getStringUtf8(physicalRow); - return asArray ? new Object[]{o} : o; + return o; } } /** - * Returns the value at the given physical row number as a list of ByteBuffers. Only valid when !asArray, i.e., - * when type is {@link ColumnType#STRING}. + * Returns the value at the given physical row number as a list of ByteBuffers. * * @param physicalRow physical row number */ private List getRowAsListUtf8(final int physicalRow) { - if (asArray) { - throw DruidException.defensive("Unexpected call for array column"); - } - final Object object = getRowAsObject(physicalRow, false); if (object == null) { @@ -527,185 +619,5 @@ private List getRowAsListUtf8(final int physicalRow) return Collections.singletonList((ByteBuffer) object); } } - - /** - * Selector used by this column. It's versatile: it can run as string array (asArray = true) or regular string - * column (asArray = false). - */ - private DimensionSelector makeDimensionSelectorInternal(ReadableOffset offset, @Nullable ExtractionFn extractionFn) - { - if (multiValue) { - class MultiValueSelector implements DimensionSelector - { - private int currentRow = -1; - private List currentValues = null; - private final RangeIndexedInts indexedInts = new RangeIndexedInts(); - - @Override - public int getValueCardinality() - { - return CARDINALITY_UNKNOWN; - } - - @Nullable - @Override - public String lookupName(int id) - { - populate(); - final ByteBuffer buf = currentValues.get(id); - final String s = buf == null ? null : StringUtils.fromUtf8(buf.duplicate()); - return extractionFn == null ? s : extractionFn.apply(s); - } - - @Nullable - @Override - public ByteBuffer lookupNameUtf8(int id) - { - assert supportsLookupNameUtf8(); - populate(); - return currentValues.get(id); - } - - @Override - public boolean supportsLookupNameUtf8() - { - return extractionFn == null; - } - - @Override - public boolean nameLookupPossibleInAdvance() - { - return false; - } - - @Nullable - @Override - public IdLookup idLookup() - { - return null; - } - - @Override - public IndexedInts getRow() - { - populate(); - return indexedInts; - } - - @Override - public ValueMatcher makeValueMatcher(@Nullable String value) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); - } - - @Override - public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicateFactory); - } - - @Nullable - @Override - public Object getObject() - { - return getRowAsObject(frame.physicalRow(offset.getOffset()), true); - } - - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - // Do nothing. - } - - private void populate() - { - final int row = offset.getOffset(); - - if (row != currentRow) { - currentValues = getRowAsListUtf8(frame.physicalRow(row)); - indexedInts.setSize(currentValues.size()); - currentRow = row; - } - } - } - - return new MultiValueSelector(); - } else { - class SingleValueSelector extends BaseSingleValueDimensionSelector - { - @Nullable - @Override - protected String getValue() - { - final String s = getString(frame.physicalRow(offset.getOffset())); - return extractionFn == null ? s : extractionFn.apply(s); - } - - @Nullable - @Override - public ByteBuffer lookupNameUtf8(int id) - { - assert supportsLookupNameUtf8(); - return getStringUtf8(frame.physicalRow(offset.getOffset())); - } - - @Override - public boolean supportsLookupNameUtf8() - { - return extractionFn == null; - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - // Do nothing. - } - } - - return new SingleValueSelector(); - } - } - } - - static class StringArrayFrameColumn implements BaseColumn - { - private final StringFrameColumn delegate; - - private StringArrayFrameColumn( - Frame frame, - boolean multiValue, - Memory memory, - long startOfStringLengthSection, - long startOfStringDataSection - ) - { - this.delegate = new StringFrameColumn( - frame, - multiValue, - memory, - startOfStringLengthSection, - startOfStringDataSection, - true - ); - } - - @Override - @SuppressWarnings("rawtypes") - public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) - { - return delegate.makeDimensionSelectorInternal(offset, null); - } - - @Override - public void close() - { - delegate.close(); - } } } diff --git a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java index 8eee0fd0cef9..75f64613388b 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/columnar/StringFrameColumnWriter.java @@ -44,6 +44,9 @@ public abstract class StringFrameColumnWriter imp public static final long DATA_OFFSET = 1 /* type code */ + 1 /* single or multi-value? */; + public static final byte MULTI_VALUE_BYTE = (byte) 0x01; + public static final long MULTI_VALUE_POSITION = 1; + private final T selector; private final byte typeCode; protected final ColumnCapabilities.Capable multiValue; @@ -228,7 +231,7 @@ public long writeTo(final WritableMemory memory, final long startPosition) long currentPosition = startPosition; memory.putByte(currentPosition, typeCode); - memory.putByte(currentPosition + 1, writeMultiValue ? (byte) 1 : (byte) 0); + memory.putByte(currentPosition + 1, writeMultiValue ? MULTI_VALUE_BYTE : (byte) 0); currentPosition += 2; if (writeMultiValue) { diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index 2636156b53cc..cdc84620ab0f 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; @@ -280,6 +281,17 @@ public void validate(String msgBase, Column col) } else { Assert.assertEquals(msg, ((Long) expectedVal).longValue(), accessor.getLong(i)); } + } else if (expectedVal instanceof Object[]) { + Object actualVal = accessor.getObject(i); + if (expectedNulls[i]) { + Assert.assertNull(msg, accessor.getObject(i)); + } else { + if (actualVal instanceof ArrayList) { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], ((ArrayList) actualVal).toArray()); + } else { + Assert.assertArrayEquals(msg, (Object[]) expectedVals[i], (Object[]) actualVal); + } + } } else { if (expectedNulls[i]) { Assert.assertNull(msg, accessor.getObject(i)); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java similarity index 73% rename from processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java rename to processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java index 99d5dfabc89b..e2cee35a8e9a 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/TestVirtualColumnEvaluationRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java @@ -38,32 +38,44 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeNotNull; -public class TestVirtualColumnEvaluationRowsAndColumnsTest extends SemanticTestBase +public class EvaluateRowsAndColumnsTest extends SemanticTestBase { - public TestVirtualColumnEvaluationRowsAndColumnsTest(String name, Function fn) + public EvaluateRowsAndColumnsTest(String name, Function fn) { super(name, fn); } @Test - public void testMaterializeVirtualColumns() + public void testMaterializeColumns() { Object[][] vals = new Object[][] { - {1L, "a", 123L, 0L}, - {2L, "a", 456L, 1L}, - {3L, "b", 789L, 2L}, - {4L, "b", 123L, 3L}, + {1L, "a", 123L, new Object[]{"xyz", "x"}, 0L}, + {2L, "a", 456L, new Object[]{"abc"}, 1L}, + {3L, "b", 789L, new Object[]{null}, 2L}, + {4L, null, 123L, null, 3L}, }; RowSignature siggy = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim", ColumnType.STRING) .add("val", ColumnType.LONG) + .add("array", ColumnType.STRING_ARRAY) .add("arrayIndex", ColumnType.LONG) .build(); final RowsAndColumns base = make(MapOfColumnsRowsAndColumns.fromRowObjects(vals, siggy)); + Object[] expectedArr = new Object[][] { + {"xyz", "x"}, + {"abc"}, + {null}, + null + }; + + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(base); + assumeNotNull("skipping: CursorFactory not supported", base.as(CursorFactory.class)); LazilyDecoratedRowsAndColumns ras = new LazilyDecoratedRowsAndColumns( @@ -82,12 +94,18 @@ public void testMaterializeVirtualColumns() // do the materialziation ras.numRows(); - assertEquals(Lists.newArrayList("__time", "dim", "val", "arrayIndex", "expr"), ras.getColumnNames()); + assertEquals(Lists.newArrayList("__time", "dim", "val", "array", "arrayIndex", "expr"), ras.getColumnNames()); new RowsAndColumnsHelper() .expectColumn("expr", new long[] {123 * 2, 456L * 2, 789 * 2, 123 * 2}) .validate(ras); - } + new RowsAndColumnsHelper() + .expectColumn("dim", new String[] {"a", "a", "b", null}, ColumnType.STRING) + .validate(ras); + new RowsAndColumnsHelper() + .expectColumn("array", expectedArr, ColumnType.STRING_ARRAY) + .validate(ras); + } }