diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java index f912f5e70b29..1a6fc81e4eb8 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java @@ -304,9 +304,7 @@ static int encodeToFile(long[] vals, String encoding, FileChannel output)throws } serializer.open(); - for (long val : vals) { - serializer.add(val); - } + serializer.addAll(vals, 0, vals.length); serializer.writeTo(output, null); return (int) serializer.getSerializedSize(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 134f5305169d..ae6babe76618 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -907,7 +907,7 @@ public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException() ); provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of()); } - + @Test public void testCreateIngestionSchema() throws IOException { @@ -2033,14 +2033,6 @@ private static class TestIndexIO extends IndexIO } } - final Metadata metadata = new Metadata( - null, - aggregatorFactories.toArray(new AggregatorFactory[0]), - null, - null, - null - ); - queryableIndexMap.put( entry.getValue(), new SimpleQueryableIndex( @@ -2049,9 +2041,21 @@ private static class TestIndexIO extends IndexIO null, columnMap, null, - metadata, false ) + { + @Override + public Metadata getMetadata() + { + return new Metadata( + null, + aggregatorFactories.toArray(new AggregatorFactory[0]), + null, + null, + null + ); + } + } ); } } @@ -2074,10 +2078,15 @@ void removeMetadata(File file) index.getBitmapFactoryForDimensions(), index.getColumns(), index.getFileMapper(), - null, false ) - ); + { + @Override + public Metadata getMetadata() + { + return null; + } + }); } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java b/processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java similarity index 84% rename from processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java rename to processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java index bb1af0e4d9f1..0142b3e8ed0a 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java +++ b/processing/src/main/java/org/apache/druid/common/semantic/SemanticCreator.java @@ -17,7 +17,9 @@ * under the License. */ -package org.apache.druid.query.rowsandcols; +package org.apache.druid.common.semantic; + +import org.apache.druid.query.rowsandcols.RowsAndColumns; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -26,8 +28,8 @@ /** * Annotation used to indicate that the method is used as a creator for a semantic interface. - * - * Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of + *

+ * Used in conjuction with {@link SemanticUtils#makeAsMap(Class)} to build maps for simplified implementation of * the {@link RowsAndColumns#as(Class)} method. */ @Retention(RetentionPolicy.RUNTIME) diff --git a/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java b/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java new file mode 100644 index 000000000000..4424b5fcccc7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/common/semantic/SemanticUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.semantic; + +import org.apache.druid.error.DruidException; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Function; + +public class SemanticUtils +{ + private static final Map, Map, Function>> OVERRIDES = new LinkedHashMap<>(); + + /** + * Allows the registration of overrides, which allows overriding of already existing mappings. + * This allows extensions to register mappings. + */ + @SuppressWarnings("unused") + public static void registerAsOverride(Class clazz, Class asInterface, Function fn) + { + final Map, Function> classOverrides = OVERRIDES.computeIfAbsent( + clazz, + theClazz -> new LinkedHashMap<>() + ); + + final Function oldVal = classOverrides.get(asInterface); + if (oldVal != null) { + throw DruidException.defensive( + "Attempt to side-override the same interface [%s] multiple times for the same class [%s].", + asInterface, + clazz + ); + } else { + classOverrides.put(asInterface, fn); + } + } + + public static Map, Function> makeAsMap(Class clazz) + { + final Map, Function> retVal = new HashMap<>(); + + for (Method method : clazz.getMethods()) { + if (method.isAnnotationPresent(SemanticCreator.class)) { + if (method.getParameterCount() != 0) { + throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method); + } + + retVal.put(method.getReturnType(), arg -> { + try { + return method.invoke(arg); + } + catch (InvocationTargetException | IllegalAccessException e) { + throw DruidException.defensive().build(e, "Problem invoking method [%s]", method); + } + }); + } + } + + final Map, Function> classOverrides = OVERRIDES.get(clazz); + if (classOverrides != null) { + for (Map.Entry, Function> overrideEntry : classOverrides.entrySet()) { + //noinspection unchecked + retVal.put(overrideEntry.getKey(), (Function) overrideEntry.getValue()); + } + } + + return retVal; + } +} diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index a04f3f6512cf..f4cc3065c7f6 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -176,6 +176,17 @@ public static DruidException defensive(String format, Object... args) return defensive().build(format, args); } + /** + * Build a "defensive" exception, this is an exception that should never actually be triggered, but we are + * throwing it inside a defensive check. + * + * @return A builder for a defensive exception. + */ + public static DruidException defensive(Throwable cause, String format, Object... args) + { + return defensive().build(cause, format, args); + } + /** * Build a "defensive" exception, this is an exception that should never actually be triggered. Throw to * allow messages to be seen by developers diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java index 6f5460095113..14b6f8a851a4 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java @@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntComparator; import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.common.semantic.SemanticCreator; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.operator.ColumnWithDirection; @@ -73,7 +75,7 @@ public class ArrayListRowsAndColumns implements AppendableRowsAndColumns { @SuppressWarnings("rawtypes") - private static final Map, Function> AS_MAP = RowsAndColumns + private static final Map, Function> AS_MAP = SemanticUtils .makeAsMap(ArrayListRowsAndColumns.class); private final ArrayList rows; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index 0dae40467f3f..ce199a7803c5 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -20,6 +20,8 @@ package org.apache.druid.query.rowsandcols; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.semantic.SemanticCreator; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.frame.Frame; import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; import org.apache.druid.frame.key.KeyColumn; @@ -66,7 +68,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns { - private static final Map, Function> AS_MAP = RowsAndColumns + private static final Map, Function> AS_MAP = SemanticUtils .makeAsMap(LazilyDecoratedRowsAndColumns.class); private RowsAndColumns base; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index d139265d147d..7b6a1f6215d3 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,19 +19,13 @@ package org.apache.druid.query.rowsandcols; -import org.apache.druid.error.DruidException; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; /** * An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows @@ -75,31 +69,6 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) return retVal; } - static Map, Function> makeAsMap(Class clazz) - { - Map, Function> retVal = new HashMap<>(); - - for (Method method : clazz.getMethods()) { - if (method.isAnnotationPresent(SemanticCreator.class)) { - if (method.getParameterCount() != 0) { - throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method); - } - - retVal.put(method.getReturnType(), arg -> { - try { - return method.invoke(arg); - } - catch (InvocationTargetException | IllegalAccessException e) { - throw DruidException.defensive().build(e, "Problem invoking method [%s]", method); - } - }); - } - } - - return retVal; - } - - /** * The set of column names available from the RowsAndColumns * diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java index 209d4430b1d1..73fc72a1ee48 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java @@ -19,10 +19,11 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.common.semantic.SemanticCreator; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.SemanticCreator; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.QueryableIndex; @@ -41,7 +42,7 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter { - private static final Map, Function> AS_MAP = RowsAndColumns + private static final Map, Function> AS_MAP = SemanticUtils .makeAsMap(QueryableIndexRowsAndColumns.class); private final QueryableIndex index; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java new file mode 100644 index 000000000000..204b5bd85489 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFrameMaker.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.semantic; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +public class DefaultFrameMaker implements FrameMaker +{ + private final RowsAndColumns rac; + + public DefaultFrameMaker(RowsAndColumns rac) + { + this.rac = rac; + } + + @Override + public RowSignature computeSignature() + { + final RowSignature.Builder signatureBuilder = RowSignature.builder(); + for (String column : rac.getColumnNames()) { + final Column racColumn = rac.findColumn(column); + if (racColumn == null) { + continue; + } + signatureBuilder.add(column, racColumn.toAccessor().getType()); + } + + return signatureBuilder.build(); + } + + @Override + public Frame toColumnBasedFrame() + { + final AtomicInteger rowId = new AtomicInteger(0); + final int numRows = rac.numRows(); + final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac); + final ColumnSelectorFactory selectorFactory = csfm.make(rowId); + + final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); // 200 MB + + final FrameWriter frameWriter = FrameWriters.makeColumnBasedFrameWriterFactory( + memFactory, + computeSignature(), + Collections.emptyList() + ).newFrameWriter(selectorFactory); + + rowId.set(0); + for (; rowId.get() < numRows; rowId.incrementAndGet()) { + frameWriter.addSelection(); + } + + return Frame.wrap(frameWriter.toByteArray()); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java new file mode 100644 index 000000000000..095bfe1ed87c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FrameMaker.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.semantic; + +import org.apache.druid.frame.Frame; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.segment.column.RowSignature; + +public interface FrameMaker +{ + static FrameMaker fromRAC(RowsAndColumns rac) + { + FrameMaker retVal = rac.as(FrameMaker.class); + if (retVal == null) { + retVal = new DefaultFrameMaker(rac); + } + return retVal; + } + + RowSignature computeSignature(); + + Frame toColumnBasedFrame(); +} diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index dd0ac9ab1177..966de4052066 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -510,9 +510,15 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen new ConciseBitmapFactory(), columns, index.getFileMapper(), - null, lazy - ); + ) + { + @Override + public Metadata getMetadata() + { + return null; + } + }; } private Supplier getColumnHolderSupplier(ColumnBuilder builder, boolean lazy) @@ -604,25 +610,6 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen allDims = null; } - Metadata metadata = null; - ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); - if (metadataBB != null) { - try { - metadata = mapper.readValue( - SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()), - Metadata.class - ); - } - catch (JsonParseException | JsonMappingException ex) { - // Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which - // is no longer supported then it is OK to not use the metadata instead of failing segment loading - log.warn(ex, "Failed to load metadata for segment [%s]", inDir); - } - catch (IOException ex) { - throw new IOException("Failed to read metadata", ex); - } - } - Map> columns = new LinkedHashMap<>(); // Register the time column @@ -663,9 +650,32 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, - metadata, lazy - ); + ) + { + @Override + public Metadata getMetadata() + { + try { + ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); + if (metadataBB != null) { + return mapper.readValue( + SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()), + Metadata.class + ); + } + } + catch (JsonParseException | JsonMappingException ex) { + // Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which + // is no longer supported then it is OK to not use the metadata instead of failing segment loading + log.warn(ex, "Failed to load metadata for segment [%s]", inDir); + } + catch (IOException ex) { + log.warn(ex, "Failed to read metadata for segment [%s]", inDir); + } + return null; + } + }; log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index 9d75748b4162..b8d4d2d16cf9 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -19,17 +19,24 @@ package org.apache.druid.segment; +import org.apache.druid.common.semantic.SemanticCreator; +import org.apache.druid.common.semantic.SemanticUtils; import org.apache.druid.query.rowsandcols.concrete.QueryableIndexRowsAndColumns; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Map; +import java.util.function.Function; /** */ public class QueryableIndexSegment implements Segment { + private static final Map, Function> AS_MAP = SemanticUtils + .makeAsMap(QueryableIndexSegment.class); + private final QueryableIndex index; private final QueryableIndexStorageAdapter storageAdapter; private final SegmentId segmentId; @@ -77,10 +84,18 @@ public void close() @Override public T as(@Nonnull Class clazz) { - if (CloseableShapeshifter.class.equals(clazz)) { - return (T) new QueryableIndexRowsAndColumns(index); + final Function fn = AS_MAP.get(clazz); + if (fn != null) { + return (T) fn.apply(this); } return Segment.super.as(clazz); } + + @SemanticCreator + @SuppressWarnings("unused") + public CloseableShapeshifter toCloseableShapeshifter() + { + return new QueryableIndexRowsAndColumns(index); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java index 924c7911f8a3..013a634fdc4b 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java @@ -38,7 +38,7 @@ /** * */ -public class SimpleQueryableIndex implements QueryableIndex +public abstract class SimpleQueryableIndex implements QueryableIndex { private final Interval dataInterval; private final List columnNames; @@ -46,8 +46,6 @@ public class SimpleQueryableIndex implements QueryableIndex private final BitmapFactory bitmapFactory; private final Map> columns; private final SmooshedFileMapper fileMapper; - @Nullable - private final Metadata metadata; private final Supplier> dimensionHandlers; public SimpleQueryableIndex( @@ -56,7 +54,6 @@ public SimpleQueryableIndex( BitmapFactory bitmapFactory, Map> columns, SmooshedFileMapper fileMapper, - @Nullable Metadata metadata, boolean lazy ) { @@ -73,7 +70,6 @@ public SimpleQueryableIndex( this.bitmapFactory = bitmapFactory; this.columns = columns; this.fileMapper = fileMapper; - this.metadata = metadata; if (lazy) { this.dimensionHandlers = Suppliers.memoize(() -> initDimensionHandlers(availableDimensions)); @@ -141,10 +137,7 @@ public void close() } @Override - public Metadata getMetadata() - { - return metadata; - } + public abstract Metadata getMetadata(); @Override public Map getDimensionHandlers() diff --git a/processing/src/main/java/org/apache/druid/segment/column/BaseColumn.java b/processing/src/main/java/org/apache/druid/segment/column/BaseColumn.java index f22693365e13..4829ed145999 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/BaseColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/BaseColumn.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; +import javax.annotation.Nullable; import java.io.Closeable; public interface BaseColumn extends Closeable @@ -41,4 +42,11 @@ default VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offse { throw new UOE("Cannot make VectorObjectSelector for column with class[%s]", getClass().getName()); } + + @SuppressWarnings("unused") + @Nullable + default T as(Class clazz) + { + return null; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/LongsColumn.java b/processing/src/main/java/org/apache/druid/segment/column/LongsColumn.java index 6f17dfb7c015..1f88ab044d10 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/LongsColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/LongsColumn.java @@ -28,6 +28,8 @@ import org.apache.druid.segment.vector.ReadableVectorOffset; import org.apache.druid.segment.vector.VectorValueSelector; +import javax.annotation.Nullable; + /** */ public class LongsColumn implements NumericColumn @@ -75,6 +77,13 @@ public long getLongSingleValueRow(int rowNum) return column.get(rowNum); } + @Override + @Nullable + public T as(Class clazz) + { + return column.as(clazz); + } + @Override public void close() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java index 6fe04fbd31f9..36dbf5f5309f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java @@ -47,7 +47,7 @@ public BlockLayoutColumnarLongsSupplier( CompressionStrategy strategy ) { - baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy)); + this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy)); this.totalSize = totalSize; this.sizePer = sizePer; this.baseReader = reader; @@ -156,6 +156,12 @@ public long get(int index) @Override public void get(final long[] out, final int start, final int length) + { + get(out, 0, start, length); + } + + @Override + public void get(long[] out, int offset, int start, int length) { // division + remainder is optimized by the compiler so keep those together int bufferNum = start / sizePer; @@ -169,7 +175,7 @@ public void get(final long[] out, final int start, final int length) } final int limit = Math.min(length - p, sizePer - bufferIndex); - reader.read(out, p, bufferIndex, limit); + reader.read(out, offset + p, bufferIndex, limit); p += limit; bufferNum++; bufferIndex = 0; diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarInts.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarInts.java index dc2adbbb6710..e4633d032980 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarInts.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarInts.java @@ -28,4 +28,10 @@ */ public interface ColumnarInts extends IndexedInts, Closeable { + default void get(int[] out, int offset, int start, int length) + { + for (int i = 0; i < length; i++) { + out[offset + i] = get(i + start); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongs.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongs.java index 256c9934a21a..6d8162ef2670 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongs.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongs.java @@ -46,9 +46,14 @@ public interface ColumnarLongs extends Closeable long get(int index); default void get(long[] out, int start, int length) + { + get(out, 0, start, length); + } + + default void get(long[] out, int offset, int start, int length) { for (int i = 0; i < length; i++) { - out[i] = get(i + start); + out[offset + i] = get(i + start); } } @@ -62,6 +67,12 @@ default void get(long[] out, int[] indexes, int length) @Override void close(); + @Nullable + default T as(Class clazz) + { + return null; + } + default ColumnValueSelector makeColumnValueSelector(ReadableOffset offset, ImmutableBitmap nullValueBitmap) { if (nullValueBitmap.isEmpty()) { diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongsSerializer.java index 05cf26439e35..2166874a9f33 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarLongsSerializer.java @@ -29,6 +29,15 @@ public interface ColumnarLongsSerializer extends Serializer { void open() throws IOException; + int size(); + void add(long value) throws IOException; + + default void addAll(long[] values, int start, int end) throws IOException + { + for (int i = start; i < end; ++i) { + add(values[i]); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index d3869bd9ef58..f9cceaf360f8 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.druid.collections.bitmap.ImmutableBitmap; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; @@ -451,7 +452,10 @@ public VectorValueSelector makeVectorValueSelector(ReadableVectorOffset offset) @Override public int getLength() { - return -1; + if (compressedRawColumn == null) { + compressedRawColumn = closer.register(compressedRawColumnSupplier.get()); + } + return compressedRawColumn.size(); } @Override @@ -534,9 +538,14 @@ public ColumnValueSelector makeColumnValueSelector(List path, if (arrayFieldIndex >= 0) { final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex(); if (elementNumber < 0) { - throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); } - DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder(arrayField, arrayFieldIndex).getColumn(); + DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder( + arrayField, + arrayFieldIndex + ).getColumn(); ColumnValueSelector arraySelector = col.makeColumnValueSelector(readableOffset); return new ColumnValueSelector() { @@ -633,9 +642,14 @@ public VectorObjectSelector makeVectorObjectSelector(List path, if (arrayFieldIndex >= 0) { final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex(); if (elementNumber < 0) { - throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); } - DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder(arrayField, arrayFieldIndex).getColumn(); + DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder( + arrayField, + arrayFieldIndex + ).getColumn(); VectorObjectSelector arraySelector = col.makeVectorObjectSelector(readableOffset); return new VectorObjectSelector() @@ -701,9 +715,14 @@ public VectorValueSelector makeVectorValueSelector(List path, Re if (arrayFieldIndex >= 0) { final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex(); if (elementNumber < 0) { - throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path); } - DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder(arrayField, arrayFieldIndex).getColumn(); + DictionaryEncodedColumn col = (DictionaryEncodedColumn) getColumnHolder( + arrayField, + arrayFieldIndex + ).getColumn(); VectorObjectSelector arraySelector = col.makeVectorObjectSelector(readableOffset); return new VectorValueSelector() diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/SemanticCreatorUsageTest.java b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java similarity index 96% rename from processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/SemanticCreatorUsageTest.java rename to processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java index b5de751651e2..0dd61fb4b3ea 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/SemanticCreatorUsageTest.java +++ b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java @@ -17,10 +17,9 @@ * under the License. */ -package org.apache.druid.query.rowsandcols.semantic; +package org.apache.druid.common.semantic; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.rowsandcols.SemanticCreator; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -80,7 +79,7 @@ public void testPublic() /** * {@link SemanticCreator} must return with an interface. - * + *

* An exact implementation may indicate that some interface methods might be missing. */ @Test @@ -95,7 +94,7 @@ public void testReturnType() /** * {@link SemanticCreator} method names must follow the naming pattern toReturnType(). - * + *

* For example: a method returning with a type of Ball should be named as "toBall" */ @Test diff --git a/processing/src/test/java/org/apache/druid/common/semantic/SemanticUtilsTest.java b/processing/src/test/java/org/apache/druid/common/semantic/SemanticUtilsTest.java new file mode 100644 index 000000000000..d26670e83fc6 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/common/semantic/SemanticUtilsTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.semantic; + +import org.apache.druid.error.DruidException; +import org.apache.druid.segment.CloseableShapeshifter; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; +import java.util.function.Function; + +public class SemanticUtilsTest +{ + @Test + public void testInvalidParameters() + { + Assert.assertThrows( + DruidException.class, + () -> SemanticUtils.makeAsMap(InvalidShapeshifter.class) + ); + } + + @Test + public void testValidParameters() + { + TestShapeshifter testShapeshifter = new TestShapeshifter(); + Assert.assertTrue(testShapeshifter.as(A.class) instanceof A); + } + + @Test + public void testOverrideForNewMapping() + { + SemanticUtils.registerAsOverride( + TestShapeshifter.class, + OverrideClass.class, + (testShapeshifter) -> new OverrideClass() + ); + TestShapeshifter testShapeshifter = new TestShapeshifter(); + Assert.assertTrue(testShapeshifter.as(A.class) instanceof A); + Assert.assertTrue(testShapeshifter.as(OverrideClass.class) instanceof OverrideClass); + } + + @Test + public void testOverrideForExistingMapping() + { + SemanticUtils.registerAsOverride( + TestShapeshifter.class, + A.class, + (testShapeshifter) -> new OverrideClass() + ); + TestShapeshifter testShapeshifter = new TestShapeshifter(); + Assert.assertTrue(testShapeshifter.as(A.class) instanceof OverrideClass); + } + + static class TestShapeshifter implements CloseableShapeshifter + { + private final Map, Function> asMap; + + public TestShapeshifter() + { + this.asMap = SemanticUtils.makeAsMap(TestShapeshifter.class); + } + + @SuppressWarnings("unchecked") + @Override + @Nullable + public T as(@Nonnull Class clazz) + { + //noinspection ReturnOfNull + return (T) asMap.getOrDefault(clazz, arg -> null).apply(this); + } + + @Override + public void close() + { + } + + @SemanticCreator + public AInterface toAInterface() + { + return new A(); + } + } + + static class InvalidShapeshifter implements CloseableShapeshifter + { + @Nullable + @Override + public T as(@Nonnull Class clazz) + { + return null; + } + + @Override + public void close() + { + } + + @SemanticCreator + public AInterface toAInterface(String invalidParameter) + { + return new A(); + } + } + + interface AInterface + { + } + + static class A implements AInterface + { + } + + static class OverrideClass extends A + { + } +} diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FrameMakerTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FrameMakerTest.java new file mode 100644 index 000000000000..e0a448884b2a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FrameMakerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.semantic; + +import org.apache.druid.frame.Frame; +import org.apache.druid.query.rowsandcols.ArrayListRowsAndColumnsTest; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.junit.Assert; +import org.junit.Test; + +public class FrameMakerTest +{ + public static RowSignature ROW_SIGNATURE = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("dim3", ColumnType.STRING) + .add("m1", ColumnType.LONG) + .add("m2", ColumnType.LONG) + .build(); + + @Test + public void testFrameMaker() + { + final MapOfColumnsRowsAndColumns mapOfColumnsRowsAndColumns = MapOfColumnsRowsAndColumns + .builder() + .add("dim1", ColumnType.STRING, "a", "b", "c") + .add("dim2", ColumnType.STRING, "m", "d", "e") + .add("dim3", ColumnType.STRING, "a") + .add("m1", ColumnType.LONG, 1L, 2L, 3L) + .add("m2", ColumnType.LONG, 52L, 42L) + .build(); + + final FrameMaker frameMaker = FrameMaker.fromRAC(ArrayListRowsAndColumnsTest.MAKER.apply(mapOfColumnsRowsAndColumns)); + + Assert.assertEquals(ROW_SIGNATURE, frameMaker.computeSignature()); + + final Frame frame = frameMaker.toColumnBasedFrame(); + ColumnBasedFrameRowsAndColumns columnBasedFrameRowsAndColumns = new ColumnBasedFrameRowsAndColumns( + frame, + frameMaker.computeSignature() + ); + for (String columnName : mapOfColumnsRowsAndColumns.getColumnNames()) { + ColumnAccessor expectedColumn = mapOfColumnsRowsAndColumns.findColumn(columnName).toAccessor(); + ColumnAccessor actualColumn = columnBasedFrameRowsAndColumns.findColumn(columnName).toAccessor(); + + for (int i = 0; i < expectedColumn.numRows(); i++) { + Assert.assertEquals( + expectedColumn.getObject(i), + actualColumn.getObject(i) + ); + } + } + Assert.assertEquals(3, frame.numRows()); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIONullColumnsCompatibilityTest.java b/processing/src/test/java/org/apache/druid/segment/IndexIONullColumnsCompatibilityTest.java index 2e3284490758..703de4439e40 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexIONullColumnsCompatibilityTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexIONullColumnsCompatibilityTest.java @@ -31,6 +31,7 @@ import com.google.common.primitives.Ints; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.io.smoosh.Smoosh; @@ -184,20 +185,6 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); } - Metadata metadata = null; - ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); - if (metadataBB != null) { - try { - metadata = mapper.readValue( - IndexIO.SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()), - Metadata.class - ); - } - catch (IOException ex) { - throw new IOException("Failed to read metadata", ex); - } - } - Map> columns = new HashMap<>(); for (String columnName : cols) { @@ -251,9 +238,28 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, - metadata, lazy - ); + ) + { + @Override + public Metadata getMetadata() + { + try { + ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); + if (metadataBB != null) { + return mapper.readValue( + IndexIO.SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()), + Metadata.class + ); + } else { + return null; + } + } + catch (IOException ex) { + throw DruidException.defensive(ex, "Failed to read metadata"); + } + } + }; return index; } diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java index 2bb84c0eeb98..1fedf2605530 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerLongestSharedDimOrderTest.java @@ -167,11 +167,15 @@ private QueryableIndexIndexableAdapter makeIndexWithDimensionList(List d mockBitmapFactory, ImmutableMap.of(ColumnHolder.TIME_COLUMN_NAME, mockSupplier), mockSmooshedFileMapper, - null, true ) + { + @Override + public Metadata getMetadata() + { + return null; + } + } ); } } - - diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java index 01c9cc26dca6..945f86eb8ef1 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplierTest.java @@ -36,6 +36,7 @@ import java.nio.ByteOrder; import java.nio.IntBuffer; import java.nio.channels.Channels; +import java.util.Arrays; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -290,6 +291,11 @@ private void assertIndexMatchesVals() indices[i] = i; } + int[] offsetValues = new int[columnarInts.size() + 1]; + columnarInts.get(offsetValues, 1, 0, columnarInts.size()); + Assert.assertEquals(0, offsetValues[0]); + Assert.assertArrayEquals(vals, Arrays.copyOfRange(offsetValues, 1, offsetValues.length)); + // random access, limited to 1000 elements for large lists (every element would take too long) IntArrays.shuffle(indices, ThreadLocalRandom.current()); final int limit = Math.min(columnarInts.size(), 1000); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java index 0fd5bbf6f890..4876a347fb21 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java @@ -108,9 +108,7 @@ public void testValues(long[] values) throws Exception ); serializer.open(); - for (long value : values) { - serializer.add(value); - } + serializer.addAll(values, 0, values.length); Assert.assertEquals(values.length, serializer.size()); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index ba35a03bff51..00a7b339ddb1 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -186,9 +186,7 @@ public void testValues(long[] values) throws Exception ); serializer.open(); - for (long value : values) { - serializer.add(value); - } + serializer.addAll(values, 0, values.length); Assert.assertEquals(values.length, serializer.size()); final ByteArrayOutputStream baos = new ByteArrayOutputStream();