diff --git a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
index 29bf0945adbe..75ad70d5cb42 100644
--- a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
@@ -21,24 +21,32 @@
import com.google.common.base.Preconditions;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Comparator;
/**
* Reads values written by {@link ComplexFieldWriter}.
- *
+ *
* Format:
- *
+ *
* - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link ComplexFieldWriter#NOT_NULL_BYTE}
* - 4 bytes: length of serialized complex value, little-endian int
* - N bytes: serialized complex value
@@ -121,7 +129,7 @@ public static Object readFieldFromByteArray(
* Alternative interface to read the field from the memory without creating a selector and field pointer
*/
@Nullable
- public static Object readFieldFromMemory(
+ public static T readFieldFromMemory(
final ComplexMetricSerde serde,
final Memory memory,
final long position
@@ -136,7 +144,8 @@ public static Object readFieldFromMemory(
final byte[] bytes = new byte[length];
memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0, length);
- return serde.fromBytes(bytes, 0, length);
+ //noinspection unchecked
+ return (T) serde.fromBytes(bytes, 0, length);
} else {
throw new ISE("Unexpected null byte [%s]", nullByte);
}
@@ -166,8 +175,8 @@ private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetric
@Override
public T getObject()
{
- //noinspection unchecked
- return (T) readFieldFromMemory(serde, memory, fieldPointer.position());
+ final long fieldPosition = fieldPointer.position();
+ return readFieldFromMemory(serde, memory, fieldPosition);
}
@Override
@@ -183,4 +192,80 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
// Do nothing.
}
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String columnName)
+ {
+ return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName), signature.size());
+ }
+
+ private class ComplexFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final ColumnType type;
+ private final FieldPositionHelper coach;
+
+ public ComplexFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+ {
+ this.frame = frame;
+ dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.type = ColumnType.ofComplex(serde.getTypeName());
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ObjectColumnAccessorBase()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return type;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ return dataRegion.getByte(fieldPosition) == ComplexFieldWriter.NULL_BYTE;
+ }
+
+ @Override
+ protected Object getVal(int rowNum)
+ {
+ return readFieldFromMemory(serde, dataRegion, coach.computeFieldPosition(rowNum));
+ }
+
+ @Override
+ protected Comparator