Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public GroupByPostShuffleFrameProcessor(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
() -> outputRow,
RowSignature.Finalization.YES
GroupByQueryKit.isFinalize(query) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"expectedResults": [
{
"__time": 1672058096000,
"double_col": 0.0
"double_col": null
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.write.RowBasedFrameWriterFactory;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.frame.write.cast.TypeCastSelectors;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
Expand Down Expand Up @@ -101,7 +102,8 @@ private static FieldWriter makeLongWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.LONG);
return LongFieldWriter.forPrimitive(selector);
}

Expand All @@ -110,7 +112,8 @@ private static FieldWriter makeFloatWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.FLOAT);
return FloatFieldWriter.forPrimitive(selector);
}

Expand All @@ -119,7 +122,8 @@ private static FieldWriter makeDoubleWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.DOUBLE);
return DoubleFieldWriter.forPrimitive(selector);
}

Expand All @@ -139,7 +143,8 @@ private static FieldWriter makeStringArrayWriter(
final boolean removeNullBytes
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.STRING_ARRAY);
Comment thread
gianm marked this conversation as resolved.
return new StringArrayFieldWriter(selector, removeNullBytes);
}

Expand All @@ -148,7 +153,8 @@ private static FieldWriter makeLongArrayWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.LONG_ARRAY);
return NumericArrayFieldWriter.getLongArrayFieldWriter(selector);
}

Expand All @@ -157,7 +163,8 @@ private static FieldWriter makeFloatArrayWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.FLOAT_ARRAY);
return NumericArrayFieldWriter.getFloatArrayFieldWriter(selector);
}

Expand All @@ -166,7 +173,8 @@ private static FieldWriter makeDoubleArrayWriter(
final String columnName
)
{
final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.DOUBLE_ARRAY);
return NumericArrayFieldWriter.getDoubleArrayFieldWriter(selector);
}

Expand All @@ -185,7 +193,8 @@ private static FieldWriter makeComplexWriter(
throw new ISE("No serde for complexTypeName[%s], cannot write column [%s]", columnTypeName, columnName);
}

final ColumnValueSelector<?> selector = selectorFactory.makeColumnValueSelector(columnName);
final ColumnValueSelector<?> selector =
TypeCastSelectors.makeColumnValueSelector(selectorFactory, columnName, ColumnType.ofComplex(columnTypeName));
return new ComplexFieldWriter(serde, selector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*
* @param <ElementType> Type of the individual array elements
*/
public abstract class NumericArrayFieldSelector<ElementType extends Number> implements ColumnValueSelector
public abstract class NumericArrayFieldSelector<ElementType extends Number> implements ColumnValueSelector<Object[]>
{
/**
* Memory containing the serialized values of the array
Expand Down Expand Up @@ -81,15 +81,15 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)

@Nullable
@Override
public Object getObject()
public Object[] getObject()
{
return computeCurrentArray();
}

@Override
public Class classOfObject()
public Class<Object[]> classOfObject()
{
return Object.class;
return Object[].class;
}

@Override
Expand Down Expand Up @@ -131,7 +131,7 @@ public boolean isNull()
public abstract int getIndividualFieldSize();

@Nullable
private Number[] computeCurrentArray()
private Object[] computeCurrentArray()
{
final long fieldPosition = fieldPointer.position();
final long fieldLength = fieldPointer.length();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@

import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;

import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -128,7 +126,7 @@ public NumericArrayFieldWriter(final ColumnValueSelector selector, NumericFieldW
@Override
public long writeTo(WritableMemory memory, long position, long maxSize)
{
Object row = selector.getObject();
final Object[] row = (Object[]) selector.getObject();
if (row == null) {
int requiredSize = Byte.BYTES;
if (requiredSize > maxSize) {
Expand All @@ -137,18 +135,6 @@ public long writeTo(WritableMemory memory, long position, long maxSize)
memory.putByte(position, NULL_ROW);
return requiredSize;
} else {

List<? extends Number> list = FrameWriterUtils.getNumericArrayFromObject(row);

if (list == null) {
int requiredSize = Byte.BYTES;
if (requiredSize > maxSize) {
return -1;
}
memory.putByte(position, NULL_ROW);
return requiredSize;
}

// Create a columnValueSelector to write the individual elements re-using the NumericFieldWriter
AtomicInteger index = new AtomicInteger(0);
ColumnValueSelector<Number> columnValueSelector = new ColumnValueSelector<Number>()
Expand Down Expand Up @@ -199,7 +185,7 @@ public boolean isNull()
@Override
public Number getObject()
{
return list.get(index.get());
return (Number) row[index.get()];
}

@Override
Expand All @@ -215,7 +201,7 @@ public Class<? extends Number> classOfObject()
// Next [(1 + Numeric Size) x Number of elements of array] bytes are reserved for the elements of the array and
// their null markers
// Last byte is reserved for array termination
int requiredSize = Byte.BYTES + (writer.getNumericSizeBytes() + Byte.BYTES) * list.size() + Byte.BYTES;
int requiredSize = Byte.BYTES + (writer.getNumericSizeBytes() + Byte.BYTES) * row.length + Byte.BYTES;

if (requiredSize > maxSize) {
return -1;
Expand All @@ -225,7 +211,7 @@ public Class<? extends Number> classOfObject()
memory.putByte(position + offset, NON_NULL_ROW);
offset += Byte.BYTES;

for (; index.get() < list.size(); index.incrementAndGet()) {
for (; index.get() < row.length; index.incrementAndGet()) {
writer.writeTo(
memory,
position + offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,60 +144,16 @@ public static List<ByteBuffer> getUtf8ByteBuffersFromStringArraySelector(
@SuppressWarnings("rawtypes") final BaseObjectColumnValueSelector selector
)
{
Object row = selector.getObject();
final Object[] row = (Object[]) selector.getObject();
if (row == null) {
return null;
} else if (row instanceof String) {
return Collections.singletonList(getUtf8ByteBufferFromString((String) row));
}

final List<ByteBuffer> retVal = new ArrayList<>();
if (row instanceof List) {
for (int i = 0; i < ((List<?>) row).size(); i++) {
retVal.add(getUtf8ByteBufferFromString(((List<String>) row).get(i)));
}
} else if (row instanceof Object[]) {
for (Object value : (Object[]) row) {
retVal.add(getUtf8ByteBufferFromString((String) value));
}
} else {
throw new ISE("Unexpected type %s found", row.getClass().getName());
}
return retVal;
}

/**
* Retrieves a numeric list from a Java object, given that the object is an instance of something that can be returned
* from {@link ColumnValueSelector#getObject()} of valid numeric array selectors representations
*
* While {@link BaseObjectColumnValueSelector} specifies that only instances of {@code Object[]} can be returned from
* the numeric array selectors, this method also handles a few more cases which can be encountered if the selector is
* directly implemented on top of the group by stuff
*/
@Nullable
public static List<? extends Number> getNumericArrayFromObject(Object row)
{
if (row == null) {
return null;
} else if (row instanceof Number) {
return Collections.singletonList((Number) row);
}

final List<Number> retVal = new ArrayList<>();

if (row instanceof List) {
for (int i = 0; i < ((List<?>) row).size(); i++) {
retVal.add((Number) ((List<?>) row).get(i));
}
} else if (row instanceof Object[]) {
for (Object value : (Object[]) row) {
retVal.add((Number) value);
final List<ByteBuffer> retVal = new ArrayList<>();
for (Object value : row) {
retVal.add(getUtf8ByteBufferFromString((String) value));
}
} else {
throw new ISE("Unexpected type %s found", row.getClass().getName());
return retVal;
}

return retVal;
}

/**
Expand Down Expand Up @@ -275,6 +231,7 @@ public static void copyByteBufferToMemoryDisallowingNullBytes(
* Whenever "allowNullBytes" is true, "removeNullBytes" must be false. Use the methods {@link #copyByteBufferToMemoryAllowingNullBytes}
* and {@link #copyByteBufferToMemoryDisallowingNullBytes} to copy between the memory
* <p>
*
* @throws InvalidNullByteException if "allowNullBytes" and "removeNullBytes" is false and a null byte is encountered
*/
private static void copyByteBufferToMemory(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.frame.write.cast;

import org.apache.druid.error.DruidException;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.RowIdSupplier;

import javax.annotation.Nullable;

/**
* Wraps a {@link ColumnValueSelector}, calls {@link ColumnValueSelector#getObject()}, interprets that value using
* {@link ExprEval#ofType}, and casts it using {@link ExprEval#castTo}.
*/
public class ObjectToArrayColumnValueSelector implements ColumnValueSelector<Object[]>
{
private final ColumnValueSelector<?> selector;
@Nullable
private final ExpressionType desiredType;
@Nullable
private final RowIdSupplier rowIdSupplier;

public ObjectToArrayColumnValueSelector(
final ColumnValueSelector<?> selector,
final ExpressionType desiredType,
@Nullable final RowIdSupplier rowIdSupplier
)
{
this.selector = selector;
this.desiredType = desiredType;
this.rowIdSupplier = rowIdSupplier;

if (!desiredType.isArray() || desiredType.getElementType() == null) {
throw DruidException.defensive("Expected array with nonnull element type, got[%s]", desiredType);
}
}

@Override
public double getDouble()
{
throw DruidException.defensive("Unexpected call to getDouble on array selector");
}

@Override
public float getFloat()
{
throw DruidException.defensive("Unexpected call to getFloat on array selector");
}

@Override
public long getLong()
{
throw DruidException.defensive("Unexpected call to getLong on array selector");
}

@Override
public boolean isNull()
{
throw DruidException.defensive("Unexpected call to isNull on array selector");
}

@Nullable
@Override
public Object[] getObject()
{
return (Object[]) TypeCastSelectors.bestEffortCoerce(selector.getObject(), desiredType);
}

@Override
public Class<Object[]> classOfObject()
{
return Object[].class;
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("rowIdSupplier", rowIdSupplier);
}
}
Loading