Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -107,6 +110,44 @@ public static byte[] toUtf8(final String string)
}
}

/**
* Encodes "string" into the buffer "byteBuffer", using no more than the number of bytes remaining in the buffer.
* Will only encode whole characters. The byteBuffer's position and limit be changed during operation, but will
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ... limit can be changed ... or ... may be ...?

* be reset before this method call ends.
*
* @return the number of bytes written, which may be shorter than the full encoded string length if there
* is not enough room in the output buffer.
*/
public static int toUtf8WithLimit(final String string, final ByteBuffer byteBuffer)
{
final CharsetEncoder encoder = StandardCharsets.UTF_8
.newEncoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);

final int originalPosition = byteBuffer.position();
final int originalLimit = byteBuffer.limit();
final int maxBytes = byteBuffer.remaining();

try {
final char[] chars = string.toCharArray();
final CharBuffer charBuffer = CharBuffer.wrap(chars);

// No reason to look at the CoderResult from the "encode" call; we can tell the number of transferred characters
// by looking at the output buffer's position.
encoder.encode(charBuffer, byteBuffer, true);

final int bytesWritten = byteBuffer.position() - originalPosition;

assert bytesWritten <= maxBytes;
return bytesWritten;
}
finally {
byteBuffer.position(originalPosition);
byteBuffer.limit(originalLimit);
}
}

@Nullable
public static byte[] toUtf8Nullable(@Nullable final String string)
{
Expand Down Expand Up @@ -163,6 +204,7 @@ public static String toUpperCase(String s)
* application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well.
*
* @param s String to be encoded
*
* @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20".
*/
@Nullable
Expand Down Expand Up @@ -311,6 +353,7 @@ public static String emptyToNullNonDruidDataString(@Nullable String string)
* Convert an input to base 64 and return the utf8 string of that byte array
*
* @param input The string to convert to base64
*
* @return the base64 of the input in string form
*/
public static String utf8Base64(String input)
Expand All @@ -322,6 +365,7 @@ public static String utf8Base64(String input)
* Convert an input byte array into a newly-allocated byte array using the {@link Base64} encoding scheme
*
* @param input The byte array to convert to base64
*
* @return the base64 of the input in byte array form
*/
public static byte[] encodeBase64(byte[] input)
Expand All @@ -333,6 +377,7 @@ public static byte[] encodeBase64(byte[] input)
* Convert an input byte array into a string using the {@link Base64} encoding scheme
*
* @param input The byte array to convert to base64
*
* @return the base64 of the input in string form
*/
public static String encodeBase64String(byte[] input)
Expand All @@ -344,6 +389,7 @@ public static String encodeBase64String(byte[] input)
* Decode an input byte array using the {@link Base64} encoding scheme and return a newly-allocated byte array
*
* @param input The byte array to decode from base64
*
* @return a newly-allocated byte array
*/
public static byte[] decodeBase64(byte[] input)
Expand All @@ -355,6 +401,7 @@ public static byte[] decodeBase64(byte[] input)
* Decode an input string using the {@link Base64} encoding scheme and return a newly-allocated byte array
*
* @param input The string to decode from base64
*
* @return a newly-allocated byte array
*/
public static byte[] decodeBase64String(String input)
Expand Down Expand Up @@ -411,16 +458,17 @@ public static String repeat(String s, int count)
System.arraycopy(multiple, 0, multiple, copied, limit - copied);
return new String(multiple, StandardCharsets.UTF_8);
}

/**
* Returns the string left-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters.
* Lpad and rpad functions are migrated from flink's scala function with minor refactor
* https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
*
* @param base The base string to be padded
* @param len The length of padded string
* @param pad The pad string
* @param len The length of padded string
* @param pad The pad string
*
* @return the string left-padded with pad to a length of len
*/
public static String lpad(String base, Integer len, String pad)
Expand Down Expand Up @@ -453,11 +501,12 @@ public static String lpad(String base, Integer len, String pad)

/**
* Returns the string right-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters.
* If str is longer than len, the return value is shortened to len characters.
*
* @param base The base string to be padded
* @param len The length of padded string
* @param pad The pad string
* @param len The length of padded string
* @param pad The pad string
*
* @return the string right-padded with pad to a length of len
*/
public static String rpad(String base, Integer len, String pad)
Expand All @@ -473,12 +522,12 @@ public static String rpad(String base, Integer len, String pad)
int pos = 0;

// Copy the base
for ( ; pos < base.length() && pos < len; pos++) {
for (; pos < base.length() && pos < len; pos++) {
data[pos] = base.charAt(pos);
}

// Copy the padding
for ( ; pos < len; pos += pad.length()) {
for (; pos < len; pos += pad.length()) {
for (int i = 0; i < pad.length() && i < len - pos; i++) {
data[pos + i] = pad.charAt(i);
}
Expand Down
20 changes: 3 additions & 17 deletions core/src/main/java/org/apache/druid/math/expr/Evals.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.logger.Logger;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;

Expand All @@ -46,21 +47,6 @@ public static boolean isAllConstants(List<Expr> exprs)
return true;
}

// for binary operator not providing constructor of form <init>(String, Expr, Expr),
// you should create it explicitly in here
public static Expr binaryOp(BinaryOpExprBase binary, Expr left, Expr right)
{
try {
return binary.getClass()
.getDeclaredConstructor(String.class, Expr.class, Expr.class)
.newInstance(binary.op, left, right);
}
catch (Exception e) {
log.warn(e, "failed to rewrite expression " + binary);
return binary; // best effort.. keep it working
}
}

public static long asLong(boolean x)
{
return x ? 1L : 0L;
Expand All @@ -81,8 +67,8 @@ public static boolean asBoolean(double x)
return x > 0;
}

public static boolean asBoolean(String x)
public static boolean asBoolean(@Nullable String x)
{
return !NullHandling.isNullOrEquivalent(x) && Boolean.valueOf(x);
return !NullHandling.isNullOrEquivalent(x) && Boolean.parseBoolean(x);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private static class StringExprEval extends ExprEval<String>

private static final StringExprEval OF_NULL = new StringExprEval(null);

@Nullable
private Number numericVal;

private StringExprEval(@Nullable String value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,32 @@ public void toUtf8ConversionTest()
}
}

@Test
public void toUtf8WithLimitTest()
{
final ByteBuffer smallBuffer = ByteBuffer.allocate(4);
final ByteBuffer mediumBuffer = ByteBuffer.allocate(6);
final ByteBuffer bigBuffer = ByteBuffer.allocate(8);

final int smallBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", smallBuffer);
Assert.assertEquals(4, smallBufferResult);
final byte[] smallBufferByteArray = new byte[smallBufferResult];
smallBuffer.get(smallBufferByteArray);
Assert.assertEquals("🚀", StringUtils.fromUtf8(smallBufferByteArray));

final int mediumBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", mediumBuffer);
Assert.assertEquals(4, mediumBufferResult);
final byte[] mediumBufferByteArray = new byte[mediumBufferResult];
mediumBuffer.get(mediumBufferByteArray);
Assert.assertEquals("🚀", StringUtils.fromUtf8(mediumBufferByteArray));

final int bigBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", bigBuffer);
Assert.assertEquals(8, bigBufferResult);
final byte[] bigBufferByteArray = new byte[bigBufferResult];
bigBuffer.get(bigBufferByteArray);
Assert.assertEquals("🚀🌔", StringUtils.fromUtf8(bigBufferByteArray));
}

@Test
public void fromUtf8ByteBufferHeap()
{
Expand Down Expand Up @@ -181,7 +207,7 @@ public void testRepeat()
expectedException.expectMessage("count is negative, -1");
Assert.assertEquals("", StringUtils.repeat("foo", -1));
}

@Test
public void testLpad()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand All @@ -44,10 +43,10 @@
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
*
* @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values
* to add to a bloom filter, or other bloom filters to merge into this bloom filter.
* @param <TSelector> type of selector that feeds this aggregator, likely either values to add to a bloom filter,
* or other bloom filters to merge into this bloom filter.
*/
public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector>
public abstract class BaseBloomFilterAggregator<TSelector>
implements BufferAggregator, Aggregator
{
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.BaseNullableColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
Expand Down Expand Up @@ -279,16 +280,21 @@ private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory column
);
}
} else {
// No column capabilities, try to guess based on selector type.
BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension());

if (selector instanceof NilColumnValueSelector) {
return new NoopBloomFilterAggregator(maxNumEntries, onHeap);
} else if (selector instanceof DimensionSelector) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return new StringBloomFilterAggregator((DimensionSelector) selector, maxNumEntries, onHeap);
} else {
// Use fallback 'object' aggregator.
return new ObjectBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
}
// no column capabilities, use fallback 'object' aggregator
return new ObjectBloomFilterAggregator(
columnFactory.makeColumnValueSelector(field.getDimension()),
maxNumEntries,
onHeap
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import java.nio.ByteBuffer;

public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator<ColumnValueSelector<ByteBuffer>>
public final class BloomFilterMergeAggregator
extends BaseBloomFilterAggregator<BaseObjectColumnValueSelector<ByteBuffer>>
{
BloomFilterMergeAggregator(ColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
BloomFilterMergeAggregator(BaseObjectColumnValueSelector<ByteBuffer> selector, int maxNumEntries, boolean onHeap)
{
super(selector, maxNumEntries, onHeap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@

package org.apache.druid.query.aggregation.bloom;

import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import java.nio.ByteBuffer;

/**
* Handles "unknown" columns by examining what comes out of the selector
*/
class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<ColumnValueSelector>
class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator<BaseObjectColumnValueSelector<Object>>
{
ObjectBloomFilterAggregator(
ColumnValueSelector selector,
BaseObjectColumnValueSelector<Object> selector,
int maxNumEntries,
boolean onHeap
)
Expand All @@ -48,16 +46,14 @@ void bufferAdd(ByteBuffer buf)
final ByteBuffer other = (ByteBuffer) object;
BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position());
} else {
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
if (object instanceof Long) {
BloomKFilter.addLong(buf, selector.getLong());
} else if (object instanceof Double) {
BloomKFilter.addDouble(buf, selector.getDouble());
} else if (object instanceof Float) {
BloomKFilter.addFloat(buf, selector.getFloat());
} else {
StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector);
}
if (object instanceof Long) {
BloomKFilter.addLong(buf, (long) object);
} else if (object instanceof Double) {
BloomKFilter.addDouble(buf, (double) object);
} else if (object instanceof Float) {
BloomKFilter.addFloat(buf, (float) object);
} else if (object instanceof String) {
BloomKFilter.addString(buf, (String) object);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is a good change I think, moving the DimensionSelector out of here. This doesn't need to handle List since it doesn't work at ingestion time i think, and i can't think of anywhere else they would come from without a column capabilities.

} else {
BloomKFilter.addBytes(buf, null, 0, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
import org.apache.druid.query.aggregation.post.LongLeastPostAggregator;
import org.apache.druid.segment.serde.ComplexMetrics;

/**
*/
public class AggregatorsModule extends SimpleModule
{
public AggregatorsModule()
Expand Down
Loading