Adding double colums supports#4491
Conversation
e9346e0 to
da75541
Compare
|
|
||
| @Override | ||
| public double getDouble(ByteBuffer buf, int position) | ||
| { |
|
|
||
| @Override | ||
| public double getDouble(ByteBuffer buf, int position) | ||
| { |
There was a problem hiding this comment.
this is should be getLong
| return true; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Please place this method near other similar methods
| return (float) most; | ||
| } | ||
|
|
||
| @Override |
| { | ||
| return (double) buf.getLong(position ); | ||
| } | ||
|
|
| } else { | ||
| } else if (type.equals("double")) { | ||
| event.put(metric, in.readDouble()); | ||
| } |
|
|
||
| long getLong(); | ||
|
|
||
| default double getDouble() { |
| * | ||
| * @param buf byte buffer storing the byte array representation of the aggregate | ||
| * @param position offset within the byte buffer at which the aggregate value is stored | ||
| * @return the double representation of the aggregate |
There was a problem hiding this comment.
It should be */ instead of * */
| } | ||
|
|
||
| @Override | ||
| public double getDouble() |
There was a problem hiding this comment.
Please rearrange along with other similar methods
| private DoubleColumnSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory) | ||
| { | ||
| return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, Float.MIN_VALUE); | ||
| return AggregatorUtil.getDoubleColumnSelector(metricFactory, macroTable, fieldName, expression, Float.MIN_VALUE); |
leventov
left a comment
There was a problem hiding this comment.
Also I have a general question about this PR - why do we need to add float-precision aggregators?
| private DoubleColumnSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory) | ||
| { | ||
| return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, Float.MAX_VALUE); | ||
| return AggregatorUtil.getDoubleColumnSelector(metricFactory, macroTable, fieldName, expression, Float.MAX_VALUE); |
| private DoubleColumnSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory) | ||
| { | ||
| return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, 0f); | ||
| return AggregatorUtil.getDoubleColumnSelector(metricFactory, macroTable, fieldName, expression, 0f); |
| // no resources to cleanup | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Please arrange similar methods together
|
|
||
| private FloatColumnSelector getFloatColumnSelector(ColumnSelectorFactory metricFactory) | ||
| { | ||
| return AggregatorUtil.getFloatColumnSelector(metricFactory, macroTable, fieldName, expression, Float.MIN_VALUE); |
| @Override | ||
| public int getMaxIntermediateSize() | ||
| { | ||
| return Floats.BYTES; |
| ) throws SegmentValidationException | ||
| { | ||
| if (!lhs.equals(rhs)) { | ||
| throw new SegmentValidationException("Dim [%s] value not equal. Expected [%s] found [%s]", lhs, rhs); |
There was a problem hiding this comment.
Three formatting params, two args
| /** | ||
| * Created by sbouguerra on 6/20/17. | ||
| */ | ||
| public class DoubleDimensionIndexer<EncodedType extends Comparable<EncodedType>, EncodedKeyComponentType, ActualType extends Comparable<ActualType>> implements DimensionIndexer<Double, Double, Double> |
There was a problem hiding this comment.
Looks like incomplete stub class
| import io.druid.segment.incremental.IncrementalIndexStorageAdapter; | ||
|
|
||
| /** | ||
| * Created by sbouguerra on 6/20/17. |
| @Override | ||
| public void writeMergedValueMetadata(List<IndexableAdapter> adapters) throws IOException | ||
| { | ||
|
|
There was a problem hiding this comment.
If this method is empty on purpose please leave a comment
| @Override | ||
| public void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException | ||
| { | ||
|
|
@leventov If you don't need double precision then it is a wast of storage to store numbers as 64bits instead of 32bits |
|
@leventov thanks for reviewing this, any more comments ? |
|
unrelated error testStartStopStart(io.druid.query.lookup.KafkaLookupExtractorFactoryTest) Time elapsed: 0.971 sec <<< FAILURE!
java.lang.AssertionError:
Unexpected method call ConsumerConnector.shutdown():
ConsumerConnector.createMessageStreamsByFilter(<any>, <any>, io.druid.query.lookup.KafkaLookupExtractorFactory$1@8dfe921, io.druid.query.lookup.KafkaLookupExtractorFactory$1@8dfe921): expected: 1, actual: 0
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:44)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:94)
at com.sun.proxy.$Proxy37.shutdown(Unknown Source)
at io.druid.query.lookup.KafkaLookupExtractorFactory.close(KafkaLookupExtractorFactory.java:314)
at io.druid.query.lookup.KafkaLookupExtractorFactoryTest.testStartStopStart(KafkaLookupExtractorFactoryTest.java:407)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:316)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:88)
at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:300)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:131)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.access$100(PowerMockJUnit47RunnerDelegateImpl.java:59)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner$TestExecutorStatement.evaluate(PowerMockJUnit47RunnerDelegateImpl.java:147)
at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:168)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.evaluateStatement(PowerMockJUnit47RunnerDelegateImpl.java:107)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:288)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:86)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:49)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:208)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:147)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:121)
at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:33)
at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:45)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:123)
at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:121)
at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:367)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:274)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:161)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.084 sec - in io.druid.query.lookup.TestKafkaExtractionCluster
Results :
Failed tests:
KafkaLookupExtractorFactoryTest.testStartStopStart:407
Unexpected method call ConsumerConnector.shutdown():
ConsumerConnector.createMessageStreamsByFilter(<any>, <any>, io.druid.query.lookup.KafkaLookupExtractorFactory$1@8dfe921, io.druid.query.lookup.KafkaLookupExtractorFactory$1@8dfe921): expected: 1, actual: 0 |
| public double getDouble(ByteBuffer buf, int position) | ||
| { | ||
| return (double) buf.getLong(position ); | ||
| return buf.getDouble(position); |
There was a problem hiding this comment.
Seems that this change is wrong (I suggested it - sorry). Also DistinctCountBufferAggregator could extend LongBufferAggregator
| public double getDouble(ByteBuffer buf, int position) | ||
| { | ||
| return (double) buf.getLong(position); | ||
| return buf.getDouble(position); |
There was a problem hiding this comment.
Seems that this change is wrong (I suggested it - sorry). Also TimestampBufferAggregator could extend LongBufferAggregator
| * | ||
| * @param buf byte buffer storing the byte array representation of the aggregate | ||
| * @param position offset within the byte buffer at which the aggregate value is stored | ||
| * @return the double representation of the aggregate |
There was a problem hiding this comment.
It should be */ instead of * */
| ); | ||
| } | ||
|
|
||
| protected DoubleColumnSelector getDoubleColumnSelector(ColumnSelectorFactory metricFactory, Double nullValue) |
There was a problem hiding this comment.
nullValue param should be primitive double
There was a problem hiding this comment.
Thanks, but soon with the null support work this will be an object so i will keep it this way.
| @Override | ||
| public int getMaxIntermediateSize() | ||
| { | ||
| return Doubles.BYTES; |
|
|
||
| public class TopNMapFn | ||
| { | ||
| public static final long ZERO_LONG = 0L; |
There was a problem hiding this comment.
If this is to avoid boxing, type should be Long, not long, otherwise it will still be boxing. Also suggested to pull this to some global place, like druid common "Primitives".
There was a problem hiding this comment.
And more likely in the form of methods nullToZero(Long), nullToZero(Double), nullToZero(Float) directly, because those constants are not used in any other context.
There was a problem hiding this comment.
will make is as an object right now and will stop there, i think this will be changed soon with the upcoming null support soon.
| } | ||
| Arrays.stream(aggregators).forEach(aggregator -> aggregator.aggregate()); | ||
| for (Aggregator aggregator: | ||
| aggregators) { |
| ) | ||
| { | ||
| return new Aggregator[0][]; | ||
| return null; |
There was a problem hiding this comment.
that per the interface contract.
There was a problem hiding this comment.
A dimension type that does not have integer values should return null.
| public final class DimensionHandlerUtils | ||
| { | ||
|
|
||
| public static final double ZERO_DOUBLE = 0.0d; |
There was a problem hiding this comment.
See above, needs to be Double.
| @Override | ||
| public boolean canSkip() | ||
| { | ||
| // can not be all null |
There was a problem hiding this comment.
This comment doesn't add clarity
| return String.valueOf(o); | ||
| } | ||
| }; | ||
| private static final Function<Object, String> TO_STRING_INCLUDING_NULL = o -> String.valueOf(o); |
There was a problem hiding this comment.
Could be method ref String::valueOf
| import io.druid.segment.incremental.IncrementalIndex; | ||
| import io.druid.segment.incremental.IncrementalIndexStorageAdapter; | ||
|
|
||
| public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double, Double> |
There was a problem hiding this comment.
This class looks like uncompleted stub
| { | ||
| final Object[] dims = currEntry.getKey().getDims(); | ||
|
|
||
| if (dimIndex >= dims.length) { |
| case DOUBLE: | ||
| builder.setValueType(ValueType.DOUBLE); | ||
| builder.addSerde( | ||
| DoubleGenericColumnPartSerde.serializerBuilder() |
There was a problem hiding this comment.
It uses different alignment than code above. Also I suggest to break the first method too:
DoubleGenericColumnPartSerde
.serializerBuilder()
.withByteOrder(IndexIO.BYTE_ORDER)
.withDelegate((DoubleColumnSerializer) writer)
.build()| { | ||
| final Object[] dims = currEntry.getKey().getDims(); | ||
|
|
||
| if (dimIndex >= dims.length) { |
|
|
||
| @Override | ||
| public float getFloatMetric(String metric) | ||
| { |
| public DoubleGenericColumnPartSerde build() | ||
| { | ||
| return new DoubleGenericColumnPartSerde( | ||
| byteOrder, new Serializer() |
There was a problem hiding this comment.
Each arg should be on a new line
| supplier = supplierFromDimensionSelector( | ||
| columnSelectorFactory.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)) | ||
| ); | ||
| } else if (nativeType == ValueType.DOUBLE) { |
| } | ||
| } | ||
|
|
||
| public static DoubleColumnSelector makeDoubleColumnSelector( |
There was a problem hiding this comment.
Arrange this method along with other similar methods
| } | ||
| } | ||
|
|
||
| @Override |
|
@leventov thanks for the second pass, i have updated the patch. |
| public static final double ZERO_DOUBLE = 0.0d; | ||
| public static final float ZERO_FLOAT = 0.0f; | ||
| public static final long ZERO_LONG = 0L; | ||
| public static final Double ZERO_DOUBLE = 0.0d; |
There was a problem hiding this comment.
Please add documentation to these fields, explaining why they are added
| final Object[] dims = currEntry.getKey().getDims(); | ||
|
|
||
| if (dimIndex >= dims.length) { | ||
| return 0.0; |
| public int compareUnsortedEncodedKeyComponents(Double lhs, Double rhs) | ||
| { | ||
| return 0; | ||
| return lhs.compareTo(rhs); |
| public boolean checkUnsortedEncodedKeyComponentsEqual(Double lhs, Double rhs) | ||
| { | ||
| return false; | ||
| return lhs.equals(rhs); |
| public int getUnsortedEncodedKeyComponentHashCode(Double key) | ||
| { | ||
| return 0; | ||
| return key.hashCode(); |
|
|
||
| if (dimIndex >= dims.length) { | ||
| return 0L; | ||
| return 0.0f; |
| } else if (typeName.equals("float") || typeName.equals("double")) { | ||
| // called "float", but the aggs really return doubles | ||
| if (cmpIsNumeric) { | ||
| serdeHelper = new DoubleRowBasedKeySerdeHelper(aggOffset); |
There was a problem hiding this comment.
do we need a FloatRowBasedKeySerdeHelper too ?
|
@leventov any more comments on this one ? |
| @Override | ||
| public int compareUnsortedEncodedKeyComponents(Double lhs, Double rhs) | ||
| { | ||
| return Double.compare(lhs == null ? 0.0: lhs, rhs == null ? 0.0 : rhs); |
There was a problem hiding this comment.
Let's extract nullToZero as a method
There was a problem hiding this comment.
Also please make the same change in LongDimensionIndexer and FloatDimensionIndexer.
| } | ||
|
|
||
| @Override | ||
| public int compareUnsortedEncodedKeyComponents(Double lhs, Double rhs) |
There was a problem hiding this comment.
Please mark parameters @Nullable, as well as in the implemented interface. As well as in similar methods i. e. checkUnsortedEncodedKeyComponentsEqual, getUnsortedEncodedKeyComponentHashCode.
|
@leventov thanks i have addressed the last comments. |
| @@ -242,7 +256,7 @@ public static Long convertObjectToLong(Object valObj) | |||
| public static Float convertObjectToFloat(Object valObj) | |||
There was a problem hiding this comment.
This method doesn't match convertObjectToDouble() in null handling of the result of tryParse(). Also, please locate those methods next to each other.
There was a problem hiding this comment.
Please make parameter @Nullable
| @@ -225,7 +239,7 @@ private static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> Colu | |||
| public static Long convertObjectToLong(Object valObj) | |||
There was a problem hiding this comment.
This method doesn't match convertObjectToDouble() in null handling of the result of getExactLongFromDecimalString().
There was a problem hiding this comment.
Please make parameter @Nullable
| } | ||
| } | ||
|
|
||
| public static Double convertObjectToDouble(Object valObj) |
There was a problem hiding this comment.
Please make parameter @Nullable
| public int compareUnsortedEncodedKeyComponents(@Nullable Double lhs, @Nullable Double rhs) | ||
| { | ||
| // DimensionHandlerUtils.convertObjectToDouble is used to convert null to DoubleZero | ||
| return Double.compare(DimensionHandlerUtils.convertObjectToDouble(lhs), DimensionHandlerUtils.convertObjectToDouble(rhs)); |
There was a problem hiding this comment.
IMO it's better to extract a method with shorter and more specific name like "nullToZero()", that accepts @Nullable Double, rather than Object. It's better for type safety and simplicity, because compareUnsortedEncodedKeyComponents() and similar methods could be called often. Also it's similar to Guava's nullToEmpty().
There was a problem hiding this comment.
Same comment for Long and Float
| @Override | ||
| public boolean checkUnsortedEncodedKeyComponentsEqual(@Nullable Double lhs, @Nullable Double rhs) | ||
| { | ||
| return Objects.equals(lhs, rhs); |
There was a problem hiding this comment.
If we decide to make null totally equivalent to zero, it should probably be nullToZero(lhs).equals(nullToZero(rhs). Otherwise null won't match zero.
There was a problem hiding this comment.
Same comment for Long and Float
| @Override | ||
| public int getUnsortedEncodedKeyComponentHashCode(@Nullable Double key) | ||
| { | ||
| return Objects.hashCode(key); |
There was a problem hiding this comment.
Same as above, should probably be nullToZero(key).hashCode(). And same for Long and Float.
|
@leventov thanks done. |
| } | ||
|
|
||
| public static Long convertObjectToLong(Object valObj) | ||
| public static Long convertObjectToLong(@Nullable Object valObj) |
There was a problem hiding this comment.
Could you comment on not handling null from getExactLongFromDecimalString(), that is not aligned with convertObjectToDouble()?
There was a problem hiding this comment.
this is out of the scope of this PR and it will fixed by https://github.com/druid-io/druid/pull/4509/files
| } | ||
|
|
||
| public static Float convertObjectToFloat(Object valObj) | ||
| public static Float convertObjectToFloat(@Nullable Object valObj) |
| public boolean checkUnsortedEncodedKeyComponentsEqual(@Nullable Float lhs, @Nullable Float rhs) | ||
| { | ||
| return lhs.equals(rhs); | ||
| return Objects.equals(DimensionHandlerUtils.nullToZero(lhs), DimensionHandlerUtils.nullToZero(rhs)); |
There was a problem hiding this comment.
DimensionHandlerUtils.nullToZero(lhs) is non-null, Objects.equals() is designed to accepts nullable arguments, so it's less confusing to use DimensionHandlerUtils.nullToZero(lhs).equals(DimensionHandlerUtils.nullToZero(rhs)) here. + Same for Double, Long.
| public int getUnsortedEncodedKeyComponentHashCode(@Nullable Float key) | ||
| { | ||
| return key.hashCode(); | ||
| return Objects.hashCode(DimensionHandlerUtils.nullToZero(key)); |
There was a problem hiding this comment.
Similar to above, suggested DimensionHandlerUtils.nullToZero(key).hashCode(). + Same for Double, Long.
|
@leventov done |
|
@leventov thanks for reviewing this ! |
|
This patch is a great step forwards, but it broke a lot of Druid SQL stuff. Please, consider Druid SQL when reviewing design of PRs. I will try to fix the problems before 0.11.0 is released. |
|
@gianm not sure what is exactly broken all the UTs are passing including the SQL one. I can help fixing stuff if it is clear what is broken. |
|
Broken in the sense that if you have double columns, SQL stuff won't work right. The UTs are passing since they don't test double columns. But there's code like this: public static StringComparator getStringComparatorForSqlTypeName(SqlTypeName sqlTypeName)
{
final ValueType valueType = getValueTypeForSqlTypeName(sqlTypeName);
if (valueType == ValueType.LONG || valueType == ValueType.FLOAT) {
return StringComparators.NUMERIC;
} else {
return StringComparators.LEXICOGRAPHIC;
}
}Which needs to get cleaned up since it would lead to wrong results. Probably also this kind of code should be changed to throw an error on unrecognized ValueTypes, in case we add new ones in the future, since errors are better than wrong results. |
|
Sorry, I'm wrong, it looks like the UTs do test double columns. But they're not testing float columns, and there's still some wires crossed where it treats doubles as floats. I'll take a closer look… |
Make double aggregators store and compute as double.
Adding Float based indexes for whom want to use 32bits as floating point precision.
Fix [#4449]