From a202b0f32683e14fb0120a67108d09034021c369 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 17 May 2023 15:30:58 -0700 Subject: [PATCH 01/12] * add getWeightedSize method to Inputformat interface, and use in LocalInputSource CloudObjectInputSource --- .../input/parquet/ParquetInputFormat.java | 9 +++ .../apache/druid/data/input/InputFormat.java | 6 ++ .../input/impl/CloudObjectInputSource.java | 16 ++++- .../druid/data/input/impl/CsvInputFormat.java | 17 +++++ .../data/input/impl/JsonInputFormat.java | 18 +++++ .../data/input/impl/LocalInputSource.java | 11 +-- .../apache/druid/utils/CompressionUtils.java | 72 +++++++++++++++---- 7 files changed, 128 insertions(+), 21 deletions(-) diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index 403802b34f75..38f40bc0593b 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputEntity; @@ -39,6 +40,7 @@ public class ParquetInputFormat extends NestedInputFormat { + private static final long SCALE_FACTOR = 8L; private final boolean binaryAsString; private final Configuration conf; @@ -98,6 +100,13 @@ public InputEntityReader createReader( return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } + @JsonIgnore + @Override + public long getWeightedSize(String path, long size) + { + return size * SCALE_FACTOR; + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index 345b47ca3ee9..8684b7d6d7b7 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -67,4 +67,10 @@ InputEntityReader createReader( InputEntity source, File temporaryDirectory ); + + @JsonIgnore + default long getWeightedSize(String path, long size) + { + return size; + } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index d3d134cd2131..1ca076a05f9c 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -125,6 +125,7 @@ public Stream>> createSplits( { if (!CollectionUtils.isNullOrEmpty(objects)) { return getSplitsForObjects( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), objects, @@ -132,6 +133,7 @@ public Stream>> createSplits( ); } else if (!CollectionUtils.isNullOrEmpty(uris)) { return getSplitsForObjects( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), Lists.transform(uris, CloudObjectLocation::new), @@ -139,6 +141,7 @@ public Stream>> createSplits( ); } else { return getSplitsForPrefixes( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), prefixes, @@ -226,6 +229,7 @@ private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException * implementations do), this method filters out empty objects. */ private static Stream>> getSplitsForPrefixes( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List prefixes, @@ -246,6 +250,7 @@ private static Stream>> getSplitsForPrefixe // Only consider nonempty objects. Note: size may be unknown; if so we allow it through, to avoid // calling getObjectSize and triggering a network call. return toSplitStream( + inputFormat, splitWidget, splitHintSpec, Iterators.filter(iterator, object -> object.getSize() != 0) // Allow UNKNOWN_SIZE through @@ -262,6 +267,7 @@ private static Stream>> getSplitsForPrefixe * come in through prefixes.) */ private static Stream>> getSplitsForObjects( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List objectLocations, @@ -279,6 +285,7 @@ private static Stream>> getSplitsForObjects } return toSplitStream( + inputFormat, splitWidget, splitHintSpec, Iterators.transform( @@ -289,6 +296,7 @@ private static Stream>> getSplitsForObjects } private static Stream>> toSplitStream( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final Iterator objectIterator @@ -300,9 +308,13 @@ private static Stream>> toSplitStream( o -> { try { if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) { - return new InputFileAttribute(splitWidget.getObjectSize(o.getLocation())); + return new InputFileAttribute( + inputFormat.getWeightedSize( + o.getLocation().getPath(), + splitWidget.getObjectSize(o.getLocation()) + )); } else { - return new InputFileAttribute(o.getSize()); + return new InputFileAttribute(inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize())); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 03706d1ef445..12c655acb4ad 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -25,10 +25,12 @@ import com.opencsv.RFC4180Parser; import com.opencsv.RFC4180ParserBuilder; import com.opencsv.enums.CSVReaderNullFieldIndicator; +import org.apache.commons.compress.utils.FileNameUtils; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.File; @@ -82,6 +84,21 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity ); } + @JsonIgnore + @Override + public long getWeightedSize(String path, long size) + { + String pathExtension = FileNameUtils.getExtension(path); + if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { + return 1; + } + + if (CompressionUtils.Format.GZ == CompressionUtils.Format.fromSuffix(pathExtension)) { + return size * 4L; + } + return size; + } + public static RFC4180Parser createOpenCsvParser() { return NullHandling.replaceWithDefault() diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 200c621e1389..569e74f07ff7 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -20,15 +20,18 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.compress.utils.FileNameUtils; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.File; @@ -156,6 +159,21 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity } } + @JsonIgnore + @Override + public long getWeightedSize(String path, long size) + { + String pathExtension = FileNameUtils.getExtension(path); + if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { + return 1; + } + + if (CompressionUtils.Format.GZ == CompressionUtils.Format.fromSuffix(pathExtension)) { + return size * 4L; + } + return size; + } + /** * Create a new JsonInputFormat object based on the given parameter * diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 23208c94d765..0dbf80676928 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -148,20 +148,23 @@ private List getFilesForSerialization() @Override public Stream>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Streams.sequentialStreamFrom(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))) + return Streams.sequentialStreamFrom(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))) .map(InputSplit::new); } @Override public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Iterators.size(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))); + return Iterators.size(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))); } - private Iterator> getSplitFileIterator(SplitHintSpec splitHintSpec) + private Iterator> getSplitFileIterator(final InputFormat inputFormat, SplitHintSpec splitHintSpec) { final Iterator fileIterator = getFileIterator(); - return splitHintSpec.split(fileIterator, file -> new InputFileAttribute(file.length())); + return splitHintSpec.split( + fileIterator, + file -> new InputFileAttribute(inputFormat.getWeightedSize(file.getName(), file.length())) + ); } @VisibleForTesting diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 058361bd7a8a..44db46ea2f80 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; @@ -40,6 +41,7 @@ import org.apache.druid.java.util.common.io.NativeIO; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.File; @@ -53,6 +55,7 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Enumeration; +import java.util.Map; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -64,14 +67,53 @@ @PublicApi public class CompressionUtils { + + public enum Format + { + BZ2(".bz2"), + GZ(".gz"), + SNAPPY(".sz"), + XZ(".xz"), + ZIP(".zip"), + ZSTD(".zst"); + + static final Map SUPPORTED_COMPRESSION_FORMATS; + + static { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(BZ2.getSuffix(), BZ2); + builder.put(GZ.getSuffix(), GZ); + builder.put(SNAPPY.getSuffix(), SNAPPY); + builder.put(XZ.getSuffix(), XZ); + builder.put(ZIP.getSuffix(), ZIP); + builder.put(ZSTD.getSuffix(), ZSTD); + SUPPORTED_COMPRESSION_FORMATS = builder.build(); + } + + private final String suffix; + Format(String suffix) + { + this.suffix = suffix; + } + + public String getSuffix() + { + return suffix; + } + + public static boolean isSupportedCompressionFormat(@Nullable String suffix) + { + return null != suffix && SUPPORTED_COMPRESSION_FORMATS.containsKey(suffix); + } + + @Nullable + public static Format fromSuffix(String suffix) + { + return SUPPORTED_COMPRESSION_FORMATS.get(suffix); + } + } private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; - private static final String BZ2_SUFFIX = ".bz2"; - private static final String GZ_SUFFIX = ".gz"; - private static final String XZ_SUFFIX = ".xz"; - private static final String ZIP_SUFFIX = ".zip"; - private static final String SNAPPY_SUFFIX = ".sz"; - private static final String ZSTD_SUFFIX = ".zst"; private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512 /** @@ -198,7 +240,7 @@ public static FileUtils.FileCopyResult unzip( throw Throwables.propagate(e); } } else { - final File tmpFile = File.createTempFile("compressionUtilZipCache", ZIP_SUFFIX); + final File tmpFile = File.createTempFile("compressionUtilZipCache", Format.ZIP.getSuffix()); try { FileUtils.retryCopy( byteSource, @@ -527,7 +569,7 @@ public static boolean isZip(String fName) if (Strings.isNullOrEmpty(fName)) { return false; } - return fName.endsWith(ZIP_SUFFIX); // Technically a file named `.zip` would be fine + return fName.endsWith(Format.ZIP.getSuffix()); // Technically a file named `.zip` would be fine } /** @@ -542,7 +584,7 @@ public static boolean isGz(String fName) if (Strings.isNullOrEmpty(fName)) { return false; } - return fName.endsWith(GZ_SUFFIX) && fName.length() > GZ_SUFFIX.length(); + return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length(); } /** @@ -568,17 +610,17 @@ public static String getGzBaseName(String fname) */ public static InputStream decompress(final InputStream in, final String fileName) throws IOException { - if (fileName.endsWith(GZ_SUFFIX)) { + if (fileName.endsWith(Format.GZ.getSuffix())) { return gzipInputStream(in); - } else if (fileName.endsWith(BZ2_SUFFIX)) { + } else if (fileName.endsWith(Format.BZ2.getSuffix())) { return new BZip2CompressorInputStream(in, true); - } else if (fileName.endsWith(XZ_SUFFIX)) { + } else if (fileName.endsWith(Format.XZ.getSuffix())) { return new XZCompressorInputStream(in, true); - } else if (fileName.endsWith(SNAPPY_SUFFIX)) { + } else if (fileName.endsWith(Format.SNAPPY.getSuffix())) { return new FramedSnappyCompressorInputStream(in); - } else if (fileName.endsWith(ZSTD_SUFFIX)) { + } else if (fileName.endsWith(Format.ZSTD.getSuffix())) { return new ZstdCompressorInputStream(in); - } else if (fileName.endsWith(ZIP_SUFFIX)) { + } else if (fileName.endsWith(Format.ZIP.getSuffix())) { // This reads the first file in the archive. final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8); try { From 031cc7e50eec0cd081b8380dabf769943655ba36 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 17 May 2023 15:50:24 -0700 Subject: [PATCH 02/12] * add javadoc --- .../org/apache/druid/data/input/InputFormat.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index 8684b7d6d7b7..1eceee48ab3a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -68,6 +68,18 @@ InputEntityReader createReader( File temporaryDirectory ); + /** + * Computes the weighted size of a given input object of the underyling input format type, weighted + * for its cost during ingestion. The weight calculated is dependent on the format type and compression type + * ({@link org.apache.druid.utils.CompressionUtils.Format}) used if any. Uncompressed newline delimited json + * is used as baseline with scale factor 1. This means that when computing the byte weight that an uncompressed + * newline delimited json input object has towards ingestion, we take the file size as is, 1:1. + * + * @param path The path of the input object. Used to tell whether any compression is used. + * @param size The size of the input object in bytes. + * + * @return The weighted size of the input object. + */ @JsonIgnore default long getWeightedSize(String path, long size) { From 5f7fe92644af63877785678483464780400eb2b4 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 19 May 2023 12:27:56 -0700 Subject: [PATCH 03/12] * simplify --- .../external/ExternalInputSpecSlicer.java | 26 ++++++++++++++++--- .../input/parquet/ParquetInputFormat.java | 2 +- .../druid/data/input/InputFileAttribute.java | 19 ++++++++++++++ .../apache/druid/data/input/InputFormat.java | 3 ++- .../input/impl/CloudObjectInputSource.java | 16 ++---------- .../druid/data/input/impl/CsvInputFormat.java | 2 +- .../data/input/impl/JsonInputFormat.java | 2 +- 7 files changed, 49 insertions(+), 21 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java index 8583162cd0e4..56c90873d636 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; import org.apache.druid.data.input.InputFileAttribute; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; @@ -32,6 +33,7 @@ import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.SlicerUtils; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -80,7 +82,12 @@ public List sliceDynamic( if (externalInputSpec.getInputSource().isSplittable()) { return sliceSplittableInputSource( externalInputSpec, - new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice, maxBytesPerSlice), + new DynamicSplitHintSpec( + maxNumSlices, + maxFilesPerSlice, + maxBytesPerSlice, + externalInputSpec.getInputFormat() + ), maxNumSlices ); } else { @@ -212,12 +219,19 @@ static class DynamicSplitHintSpec implements SplitHintSpec private final int maxNumSlices; private final int maxFilesPerSlice; private final long maxBytesPerSlice; + @Nullable + private final InputFormat inputFormat; - public DynamicSplitHintSpec(final int maxNumSlices, final int maxFilesPerSlice, final long maxBytesPerSlice) + public DynamicSplitHintSpec( + final int maxNumSlices, + final int maxFilesPerSlice, + final long maxBytesPerSlice, + @Nullable final InputFormat inputFormat) { this.maxNumSlices = maxNumSlices; this.maxFilesPerSlice = maxFilesPerSlice; this.maxBytesPerSlice = maxBytesPerSlice; + this.inputFormat = inputFormat; } @Override @@ -229,7 +243,13 @@ public Iterator> split( return Iterators.filter( SlicerUtils.makeSlicesDynamic( inputIterator, - item -> inputAttributeExtractor.apply(item).getSize(), + item -> { + if (null != inputFormat) { + InputFileAttribute inputFileAttribute = inputAttributeExtractor.apply(item); + return inputFormat.getWeightedSize(inputFileAttribute.getPath(), inputFileAttribute.getSize()); + } + return inputAttributeExtractor.apply(item).getSize(); + }, maxNumSlices, maxFilesPerSlice, maxBytesPerSlice diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index 38f40bc0593b..1efaedb27daf 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -102,7 +102,7 @@ public InputEntityReader createReader( @JsonIgnore @Override - public long getWeightedSize(String path, long size) + public long getWeightedSize(@Nullable String path, long size) { return size * SCALE_FACTOR; } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java index 5cb90cf57b66..ca21def88fed 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input; +import javax.annotation.Nullable; + /** * A class storing some attributes of an input file. * This information is used to make splits in the parallel indexing. @@ -33,13 +35,30 @@ public class InputFileAttribute */ private final long size; + /** + * The path of the input file. + */ + @Nullable + private final String path; + public InputFileAttribute(long size) + { + this(size, null); + } + + public InputFileAttribute(long size, @Nullable String path) { this.size = size; + this.path = path; } public long getSize() { return size; } + + public String getPath() + { + return path; + } } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index 1eceee48ab3a..d2f06052d5d2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.UnstableApi; +import javax.annotation.Nullable; import java.io.File; /** @@ -81,7 +82,7 @@ InputEntityReader createReader( * @return The weighted size of the input object. */ @JsonIgnore - default long getWeightedSize(String path, long size) + default long getWeightedSize(@Nullable String path, long size) { return size; } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 1ca076a05f9c..d3d134cd2131 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -125,7 +125,6 @@ public Stream>> createSplits( { if (!CollectionUtils.isNullOrEmpty(objects)) { return getSplitsForObjects( - inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), objects, @@ -133,7 +132,6 @@ public Stream>> createSplits( ); } else if (!CollectionUtils.isNullOrEmpty(uris)) { return getSplitsForObjects( - inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), Lists.transform(uris, CloudObjectLocation::new), @@ -141,7 +139,6 @@ public Stream>> createSplits( ); } else { return getSplitsForPrefixes( - inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), prefixes, @@ -229,7 +226,6 @@ private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException * implementations do), this method filters out empty objects. */ private static Stream>> getSplitsForPrefixes( - final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List prefixes, @@ -250,7 +246,6 @@ private static Stream>> getSplitsForPrefixe // Only consider nonempty objects. Note: size may be unknown; if so we allow it through, to avoid // calling getObjectSize and triggering a network call. return toSplitStream( - inputFormat, splitWidget, splitHintSpec, Iterators.filter(iterator, object -> object.getSize() != 0) // Allow UNKNOWN_SIZE through @@ -267,7 +262,6 @@ private static Stream>> getSplitsForPrefixe * come in through prefixes.) */ private static Stream>> getSplitsForObjects( - final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List objectLocations, @@ -285,7 +279,6 @@ private static Stream>> getSplitsForObjects } return toSplitStream( - inputFormat, splitWidget, splitHintSpec, Iterators.transform( @@ -296,7 +289,6 @@ private static Stream>> getSplitsForObjects } private static Stream>> toSplitStream( - final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final Iterator objectIterator @@ -308,13 +300,9 @@ private static Stream>> toSplitStream( o -> { try { if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) { - return new InputFileAttribute( - inputFormat.getWeightedSize( - o.getLocation().getPath(), - splitWidget.getObjectSize(o.getLocation()) - )); + return new InputFileAttribute(splitWidget.getObjectSize(o.getLocation())); } else { - return new InputFileAttribute(inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize())); + return new InputFileAttribute(o.getSize()); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 12c655acb4ad..0ad27f8c5ece 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -86,7 +86,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @JsonIgnore @Override - public long getWeightedSize(String path, long size) + public long getWeightedSize(@Nullable String path, long size) { String pathExtension = FileNameUtils.getExtension(path); if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 569e74f07ff7..d6368be438e4 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -161,7 +161,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @JsonIgnore @Override - public long getWeightedSize(String path, long size) + public long getWeightedSize(@Nullable String path, long size) { String pathExtension = FileNameUtils.getExtension(path); if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { From aef4b2b15f908247f825caf23d79086a73322435 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 19 May 2023 14:42:57 -0700 Subject: [PATCH 04/12] * test locally and fix bugs --- .../external/ExternalInputSpecSlicer.java | 7 ++- .../input/parquet/ParquetInputFormat.java | 3 +- .../druid/data/input/InputFileAttribute.java | 17 ++++--- .../apache/druid/data/input/InputFormat.java | 11 ++--- .../druid/data/input/impl/CsvInputFormat.java | 10 +---- .../data/input/impl/JsonInputFormat.java | 10 +---- .../data/input/impl/LocalInputSource.java | 9 ++-- .../apache/druid/utils/CompressionUtils.java | 45 +++++++++++-------- 8 files changed, 58 insertions(+), 54 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java index 56c90873d636..e971feffcaf1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java @@ -246,9 +246,12 @@ public Iterator> split( item -> { if (null != inputFormat) { InputFileAttribute inputFileAttribute = inputAttributeExtractor.apply(item); - return inputFormat.getWeightedSize(inputFileAttribute.getPath(), inputFileAttribute.getSize()); + return inputFormat.getWeightedSize( + inputFileAttribute.getCompressionFormat(), + inputFileAttribute.getSize()); + } else { + return inputAttributeExtractor.apply(item).getSize(); } - return inputAttributeExtractor.apply(item).getSize(); }, maxNumSlices, maxFilesPerSlice, diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index 1efaedb27daf..c286e3387c47 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -102,7 +103,7 @@ public InputEntityReader createReader( @JsonIgnore @Override - public long getWeightedSize(@Nullable String path, long size) + public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) { return size * SCALE_FACTOR; } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java index ca21def88fed..f6f7757ba60a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input; +import org.apache.druid.utils.CompressionUtils; + import javax.annotation.Nullable; /** @@ -39,26 +41,27 @@ public class InputFileAttribute * The path of the input file. */ @Nullable - private final String path; + private final CompressionUtils.Format compressionFormat; public InputFileAttribute(long size) { this(size, null); } - public InputFileAttribute(long size, @Nullable String path) + public InputFileAttribute(long size, @Nullable CompressionUtils.Format compressionFormat) { this.size = size; - this.path = path; + this.compressionFormat = compressionFormat; } - public long getSize() + @Nullable + public CompressionUtils.Format getCompressionFormat() { - return size; + return compressionFormat; } - public String getPath() + public long getSize() { - return path; + return size; } } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index d2f06052d5d2..da99dba46842 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.File; @@ -72,17 +73,17 @@ InputEntityReader createReader( /** * Computes the weighted size of a given input object of the underyling input format type, weighted * for its cost during ingestion. The weight calculated is dependent on the format type and compression type - * ({@link org.apache.druid.utils.CompressionUtils.Format}) used if any. Uncompressed newline delimited json - * is used as baseline with scale factor 1. This means that when computing the byte weight that an uncompressed - * newline delimited json input object has towards ingestion, we take the file size as is, 1:1. + * ({@link CompressionUtils.Format}) used if any. Uncompressed newline delimited json is used as baseline + * with scale factor 1. This means that when computing the byte weight that an uncompressed newline delimited + * json input object has towards ingestion, we take the file size as is, 1:1. * - * @param path The path of the input object. Used to tell whether any compression is used. + * @param compressionFormat The compression format of the input object, if any. * @param size The size of the input object in bytes. * * @return The weighted size of the input object. */ @JsonIgnore - default long getWeightedSize(@Nullable String path, long size) + default long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) { return size; } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 0ad27f8c5ece..99e211466535 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -25,7 +25,6 @@ import com.opencsv.RFC4180Parser; import com.opencsv.RFC4180ParserBuilder; import com.opencsv.enums.CSVReaderNullFieldIndicator; -import org.apache.commons.compress.utils.FileNameUtils; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; @@ -86,14 +85,9 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @JsonIgnore @Override - public long getWeightedSize(@Nullable String path, long size) + public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) { - String pathExtension = FileNameUtils.getExtension(path); - if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { - return 1; - } - - if (CompressionUtils.Format.GZ == CompressionUtils.Format.fromSuffix(pathExtension)) { + if (CompressionUtils.Format.GZ == compressionFormat) { return size * 4L; } return size; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index d6368be438e4..4470b143042a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.compress.utils.FileNameUtils; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; @@ -161,14 +160,9 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @JsonIgnore @Override - public long getWeightedSize(@Nullable String path, long size) + public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) { - String pathExtension = FileNameUtils.getExtension(path); - if (!CompressionUtils.Format.isSupportedCompressionFormat(pathExtension)) { - return 1; - } - - if (CompressionUtils.Format.GZ == CompressionUtils.Format.fromSuffix(pathExtension)) { + if (CompressionUtils.Format.GZ == compressionFormat) { return size * 4L; } return size; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 0dbf80676928..6dd3b2eb2468 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.CollectionUtils; +import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import javax.annotation.Nonnull; @@ -148,22 +149,22 @@ private List getFilesForSerialization() @Override public Stream>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Streams.sequentialStreamFrom(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))) + return Streams.sequentialStreamFrom(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))) .map(InputSplit::new); } @Override public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Iterators.size(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))); + return Iterators.size(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))); } - private Iterator> getSplitFileIterator(final InputFormat inputFormat, SplitHintSpec splitHintSpec) + private Iterator> getSplitFileIterator(SplitHintSpec splitHintSpec) { final Iterator fileIterator = getFileIterator(); return splitHintSpec.split( fileIterator, - file -> new InputFileAttribute(inputFormat.getWeightedSize(file.getName(), file.length())) + file -> new InputFileAttribute(file.length(), CompressionUtils.Format.fromFileName(file.getName())) ); } diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 44db46ea2f80..4f33c909399d 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -31,6 +31,7 @@ import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; +import org.apache.commons.compress.utils.FileNameUtils; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -70,30 +71,32 @@ public class CompressionUtils public enum Format { - BZ2(".bz2"), - GZ(".gz"), - SNAPPY(".sz"), - XZ(".xz"), - ZIP(".zip"), - ZSTD(".zst"); + BZ2(".bz2", "bz2"), + GZ(".gz", "gz"), + SNAPPY(".sz", "sz"), + XZ(".xz", "xz"), + ZIP(".zip", "zip"), + ZSTD(".zst", "zst"); - static final Map SUPPORTED_COMPRESSION_FORMATS; + private static final Map EXTENSION_TO_COMPRESSION_FORMAT; static { ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.put(BZ2.getSuffix(), BZ2); - builder.put(GZ.getSuffix(), GZ); - builder.put(SNAPPY.getSuffix(), SNAPPY); - builder.put(XZ.getSuffix(), XZ); - builder.put(ZIP.getSuffix(), ZIP); - builder.put(ZSTD.getSuffix(), ZSTD); - SUPPORTED_COMPRESSION_FORMATS = builder.build(); + builder.put(BZ2.getExtension(), BZ2); + builder.put(GZ.getExtension(), GZ); + builder.put(SNAPPY.getExtension(), SNAPPY); + builder.put(XZ.getExtension(), XZ); + builder.put(ZIP.getExtension(), ZIP); + builder.put(ZSTD.getExtension(), ZSTD); + EXTENSION_TO_COMPRESSION_FORMAT = builder.build(); } private final String suffix; - Format(String suffix) + private final String extension; + Format(String suffix, String extension) { this.suffix = suffix; + this.extension = extension; } public String getSuffix() @@ -101,15 +104,19 @@ public String getSuffix() return suffix; } - public static boolean isSupportedCompressionFormat(@Nullable String suffix) + public String getExtension() { - return null != suffix && SUPPORTED_COMPRESSION_FORMATS.containsKey(suffix); + return extension; } @Nullable - public static Format fromSuffix(String suffix) + public static Format fromFileName(@Nullable String fileName) { - return SUPPORTED_COMPRESSION_FORMATS.get(suffix); + String extension = FileNameUtils.getExtension(fileName); + if (null == extension) { + return null; + } + return EXTENSION_TO_COMPRESSION_FORMAT.get(extension); } } private static final Logger log = new Logger(CompressionUtils.class); From 436f46a555e3c43274592d8eee4fc9b7aa90dec9 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 19 May 2023 14:47:15 -0700 Subject: [PATCH 05/12] * allow for weighted size calculation for CloudObjectInputSources --- .../druid/data/input/impl/CloudObjectInputSource.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index d3d134cd2131..07d61fd14705 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.utils.CollectionUtils; +import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import javax.annotation.Nullable; @@ -300,9 +301,13 @@ private static Stream>> toSplitStream( o -> { try { if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) { - return new InputFileAttribute(splitWidget.getObjectSize(o.getLocation())); + return new InputFileAttribute( + splitWidget.getObjectSize(o.getLocation()), + CompressionUtils.Format.fromFileName(o.getLocation().getPath())); } else { - return new InputFileAttribute(o.getSize()); + return new InputFileAttribute( + o.getSize(), + CompressionUtils.Format.fromFileName(o.getLocation().getPath())); } } catch (IOException e) { From 823c4b02c34085b699cef8024c92b276c5df47ed Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 23 May 2023 12:32:31 -0700 Subject: [PATCH 06/12] * add tests --- .../external/ExternalInputSpecSlicerTest.java | 71 +++++++++++++++++-- .../input/parquet/ParquetInputFormatTest.java | 45 ++++++++++++ .../data/input/impl/CsvInputFormatTest.java | 17 +++++ .../data/input/impl/JsonInputFormatTest.java | 29 ++++++++ 4 files changed, 155 insertions(+), 7 deletions(-) create mode 100644 extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java index 40444307d7e7..f974e686c532 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java @@ -19,6 +19,8 @@ package org.apache.druid.msq.input.external; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; @@ -29,8 +31,10 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import org.junit.Assert; import org.junit.Before; @@ -42,13 +46,37 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; public class ExternalInputSpecSlicerTest { - static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null, null, null); + static class TestInputFormat extends JsonInputFormat + { + public TestInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("featureSpec") @Nullable Map featureSpec, + @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns, + @JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited, + @JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader + ) + { + super(flattenSpec, featureSpec, keepNullColumns, assumeNewlineDelimited, useJsonNodeReader); + } + + @JsonIgnore + @Override + public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) + { + if (null != compressionFormat) { + return size * 4L; + } + return size; + } + } + static final TestInputFormat INPUT_FORMAT = new TestInputFormat(null, null, null, null, null); static final RowSignature SIGNATURE = RowSignature.builder().add("s", ColumnType.STRING).build(); private ExternalInputSpecSlicer slicer; @@ -187,6 +215,19 @@ public void test_sliceDynamic_splittable_needTwoDueToBytes() ); } + @Test + public void test_sliceDynamic_splittableWithCompression_needThreeDueToBytes() + { + Assert.assertEquals( + ImmutableList.of( + splittableSlice("foo"), + splittableSlice("bar"), + splittableSlice("baz") + ), + slicer.sliceDynamic(splittableSpecWithCompression("foo", "bar", "baz"), 100, 5, 7) + ); + } + @Test public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneHundredMax() { @@ -226,7 +267,16 @@ public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneMax() static ExternalInputSpec splittableSpec(final String... strings) { return new ExternalInputSpec( - new TestSplittableInputSource(Arrays.asList(strings), true), + new TestSplittableInputSource(Arrays.asList(strings), true, false), + INPUT_FORMAT, + SIGNATURE + ); + } + + static ExternalInputSpec splittableSpecWithCompression(final String... strings) + { + return new ExternalInputSpec( + new TestSplittableInputSource(Arrays.asList(strings), true, true), INPUT_FORMAT, SIGNATURE ); @@ -235,7 +285,7 @@ static ExternalInputSpec splittableSpec(final String... strings) static ExternalInputSpec splittableSpecThatIgnoresSplitHints(final String... strings) { return new ExternalInputSpec( - new TestSplittableInputSource(Arrays.asList(strings), false), + new TestSplittableInputSource(Arrays.asList(strings), false, false), INPUT_FORMAT, SIGNATURE ); @@ -254,7 +304,7 @@ static ExternalInputSlice splittableSlice(final String... strings) { return new ExternalInputSlice( Stream.of(strings) - .map(s -> new TestSplittableInputSource(Collections.singletonList(s), false)) + .map(s -> new TestSplittableInputSource(Collections.singletonList(s), false, false)) .collect(Collectors.toList()), INPUT_FORMAT, SIGNATURE @@ -333,11 +383,16 @@ private static class TestSplittableInputSource implements SplittableInputSource< { private final List strings; private final boolean useSplitHintSpec; + private final boolean compresssedFiles; - public TestSplittableInputSource(final List strings, final boolean useSplitHintSpec) + public TestSplittableInputSource( + final List strings, + final boolean useSplitHintSpec, + final boolean compressedFiles) { this.strings = strings; this.useSplitHintSpec = useSplitHintSpec; + this.compresssedFiles = compressedFiles; } @Override @@ -367,7 +422,9 @@ public Stream>> createSplits( if (useSplitHintSpec) { splits = splitHintSpec.split( strings.iterator(), - s -> new InputFileAttribute(s.length()) + s -> compresssedFiles + ? new InputFileAttribute(s.length(), CompressionUtils.Format.GZ) + : new InputFileAttribute(s.length()) ); } else { // Ignore splitHintSpec, return one element per split. Similar to HttpInputSource, for example. @@ -386,7 +443,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit> split) { - return new TestSplittableInputSource(split.get(), useSplitHintSpec); + return new TestSplittableInputSource(split.get(), useSplitHintSpec, compresssedFiles); } @Override diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java new file mode 100644 index 000000000000..7d5106d7dfaf --- /dev/null +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.parquet; + +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.utils.CompressionUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +public class ParquetInputFormatTest +{ + @Test + public void test_getWeightedSize_withoutCompression() + { + final ParquetInputFormat format = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration()); + long unweightedSize = 100L; + Assert.assertEquals(unweightedSize * 8L, format.getWeightedSize(null, unweightedSize)); + } + + @Test + public void test_getWeightedSize_withGzCompression() + { + final ParquetInputFormat format = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration()); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize * 8L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index 5f5a52ef15af..2774e258318d 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.InputFormat; import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.utils.CompressionUtils; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.junit.Assert; @@ -185,4 +186,20 @@ public void testHasHeaderRowWithMissingColumnsReturningItsValue() final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); Assert.assertTrue(format.isFindColumnsFromHeader()); } + + @Test + public void test_getWeightedSize_withoutCompression() + { + final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, format.getWeightedSize(null, unweightedSize)); + } + + @Test + public void test_getWeightedSize_withGzCompression() + { + final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize * 4L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index acc8a5703fef..e00f6ba5679a 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.utils.CompressionUtils; import org.junit.Assert; import org.junit.Test; @@ -116,4 +117,32 @@ public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets() ); Assert.assertFalse(format.isKeepNullColumns()); } + + @Test + public void test_getWeightedSize_withoutCompression() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec(true, null), + null, + false, + null, + null + ); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, format.getWeightedSize(null, unweightedSize)); + } + + @Test + public void test_getWeightedSize_withGzCompression() + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec(true, null), + null, + false, + null, + null + ); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize * 4L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + } } From 03e45b5e77b461526b4b33991f87a0e0d2f7d13d Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 24 May 2023 14:50:41 -0700 Subject: [PATCH 07/12] * address review comments --- .../data/input/avro/AvroOCFInputFormat.java | 7 ++ .../input/avro/AvroOCFInputFormatTest.java | 17 +++++ .../external/ExternalInputSpecSlicer.java | 29 +------- .../external/ExternalInputSpecSlicerTest.java | 68 ++++--------------- .../druid/data/input/orc/OrcInputFormat.java | 7 ++ .../data/input/orc/OrcInputFormatTest.java | 11 +++ .../input/parquet/ParquetInputFormat.java | 7 +- .../input/parquet/ParquetInputFormatTest.java | 14 ++-- .../druid/data/input/InputFileAttribute.java | 20 ++---- .../apache/druid/data/input/InputFormat.java | 6 +- .../input/impl/CloudObjectInputSource.java | 16 +++-- .../druid/data/input/impl/CsvInputFormat.java | 11 --- .../data/input/impl/FlatTextInputFormat.java | 11 +++ .../data/input/impl/JsonInputFormat.java | 7 +- .../data/input/impl/LocalInputSource.java | 9 ++- .../data/input/impl/RegexInputFormat.java | 11 +++ .../apache/druid/utils/CompressionUtils.java | 2 + .../druid/data/input/InputFormatTest.java | 50 ++++++++++++++ .../data/input/impl/CsvInputFormatTest.java | 7 +- .../input/impl/DelimitedInputFormatTest.java | 19 ++++++ .../data/input/impl/JsonInputFormatTest.java | 7 +- .../data/input/impl/RegexInputFormatTest.java | 27 ++++++++ 22 files changed, 221 insertions(+), 142 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java index cc487a644554..526d03681a33 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java @@ -40,6 +40,7 @@ public class AvroOCFInputFormat extends NestedInputFormat { + static final long SCALE_FACTOR = 8L; private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class); private final boolean binaryAsString; @@ -116,6 +117,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity ); } + @Override + public long getWeightedSize(String path, long size) + { + return size * SCALE_FACTOR; + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java index a5c40a78cc01..ceb394b0b3b3 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java @@ -105,4 +105,21 @@ public void testSerdeNonDefaults() throws Exception Assert.assertEquals(inputFormat, inputFormat2); } + + @Test + public void test_getWeightedSize_withoutCompression() throws Exception + { + AvroOCFInputFormat format = new AvroOCFInputFormat( + jsonMapper, + flattenSpec, + null, + false, + false + ); + long unweightedSize = 100L; + Assert.assertEquals( + unweightedSize * AvroOCFInputFormat.SCALE_FACTOR, + format.getWeightedSize("file.avro", unweightedSize) + ); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java index e971feffcaf1..ca853bb4e658 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; import org.apache.druid.data.input.InputFileAttribute; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; @@ -33,7 +32,6 @@ import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.SlicerUtils; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -82,12 +80,7 @@ public List sliceDynamic( if (externalInputSpec.getInputSource().isSplittable()) { return sliceSplittableInputSource( externalInputSpec, - new DynamicSplitHintSpec( - maxNumSlices, - maxFilesPerSlice, - maxBytesPerSlice, - externalInputSpec.getInputFormat() - ), + new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice, maxBytesPerSlice), maxNumSlices ); } else { @@ -219,19 +212,12 @@ static class DynamicSplitHintSpec implements SplitHintSpec private final int maxNumSlices; private final int maxFilesPerSlice; private final long maxBytesPerSlice; - @Nullable - private final InputFormat inputFormat; - public DynamicSplitHintSpec( - final int maxNumSlices, - final int maxFilesPerSlice, - final long maxBytesPerSlice, - @Nullable final InputFormat inputFormat) + public DynamicSplitHintSpec(final int maxNumSlices, final int maxFilesPerSlice, final long maxBytesPerSlice) { this.maxNumSlices = maxNumSlices; this.maxFilesPerSlice = maxFilesPerSlice; this.maxBytesPerSlice = maxBytesPerSlice; - this.inputFormat = inputFormat; } @Override @@ -243,16 +229,7 @@ public Iterator> split( return Iterators.filter( SlicerUtils.makeSlicesDynamic( inputIterator, - item -> { - if (null != inputFormat) { - InputFileAttribute inputFileAttribute = inputAttributeExtractor.apply(item); - return inputFormat.getWeightedSize( - inputFileAttribute.getCompressionFormat(), - inputFileAttribute.getSize()); - } else { - return inputAttributeExtractor.apply(item).getSize(); - } - }, + item -> inputAttributeExtractor.apply(item).getWeightedSize(), maxNumSlices, maxFilesPerSlice, maxBytesPerSlice diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java index f974e686c532..d8dff6fe082b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java @@ -19,8 +19,6 @@ package org.apache.druid.msq.input.external; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; @@ -31,10 +29,8 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; -import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import org.junit.Assert; import org.junit.Before; @@ -46,37 +42,13 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; public class ExternalInputSpecSlicerTest { - static class TestInputFormat extends JsonInputFormat - { - public TestInputFormat( - @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, - @JsonProperty("featureSpec") @Nullable Map featureSpec, - @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns, - @JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited, - @JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader - ) - { - super(flattenSpec, featureSpec, keepNullColumns, assumeNewlineDelimited, useJsonNodeReader); - } - - @JsonIgnore - @Override - public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) - { - if (null != compressionFormat) { - return size * 4L; - } - return size; - } - } - static final TestInputFormat INPUT_FORMAT = new TestInputFormat(null, null, null, null, null); + static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null, null, null); static final RowSignature SIGNATURE = RowSignature.builder().add("s", ColumnType.STRING).build(); private ExternalInputSpecSlicer slicer; @@ -216,15 +188,15 @@ public void test_sliceDynamic_splittable_needTwoDueToBytes() } @Test - public void test_sliceDynamic_splittableWithCompression_needThreeDueToBytes() + public void test_sliceDynamic_splittableFilesWithCompression_needThreeDueToBytes() { Assert.assertEquals( ImmutableList.of( - splittableSlice("foo"), - splittableSlice("bar"), - splittableSlice("baz") + splittableSlice("foo.gz"), + splittableSlice("bar.gz"), + splittableSlice("baz.gz") ), - slicer.sliceDynamic(splittableSpecWithCompression("foo", "bar", "baz"), 100, 5, 7) + slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 100, 5, 7) ); } @@ -267,16 +239,7 @@ public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneMax() static ExternalInputSpec splittableSpec(final String... strings) { return new ExternalInputSpec( - new TestSplittableInputSource(Arrays.asList(strings), true, false), - INPUT_FORMAT, - SIGNATURE - ); - } - - static ExternalInputSpec splittableSpecWithCompression(final String... strings) - { - return new ExternalInputSpec( - new TestSplittableInputSource(Arrays.asList(strings), true, true), + new TestSplittableInputSource(Arrays.asList(strings), true), INPUT_FORMAT, SIGNATURE ); @@ -285,7 +248,7 @@ static ExternalInputSpec splittableSpecWithCompression(final String... strings) static ExternalInputSpec splittableSpecThatIgnoresSplitHints(final String... strings) { return new ExternalInputSpec( - new TestSplittableInputSource(Arrays.asList(strings), false, false), + new TestSplittableInputSource(Arrays.asList(strings), false), INPUT_FORMAT, SIGNATURE ); @@ -304,7 +267,7 @@ static ExternalInputSlice splittableSlice(final String... strings) { return new ExternalInputSlice( Stream.of(strings) - .map(s -> new TestSplittableInputSource(Collections.singletonList(s), false, false)) + .map(s -> new TestSplittableInputSource(Collections.singletonList(s), false)) .collect(Collectors.toList()), INPUT_FORMAT, SIGNATURE @@ -383,16 +346,11 @@ private static class TestSplittableInputSource implements SplittableInputSource< { private final List strings; private final boolean useSplitHintSpec; - private final boolean compresssedFiles; - public TestSplittableInputSource( - final List strings, - final boolean useSplitHintSpec, - final boolean compressedFiles) + public TestSplittableInputSource(final List strings, final boolean useSplitHintSpec) { this.strings = strings; this.useSplitHintSpec = useSplitHintSpec; - this.compresssedFiles = compressedFiles; } @Override @@ -422,9 +380,7 @@ public Stream>> createSplits( if (useSplitHintSpec) { splits = splitHintSpec.split( strings.iterator(), - s -> compresssedFiles - ? new InputFileAttribute(s.length(), CompressionUtils.Format.GZ) - : new InputFileAttribute(s.length()) + s -> new InputFileAttribute(s.length(), INPUT_FORMAT.getWeightedSize(s, s.length())) ); } else { // Ignore splitHintSpec, return one element per split. Similar to HttpInputSource, for example. @@ -443,7 +399,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit> split) { - return new TestSplittableInputSource(split.get(), useSplitHintSpec, compresssedFiles); + return new TestSplittableInputSource(split.get(), useSplitHintSpec); } @Override diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java index af54d6aac508..7474a79a15eb 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java @@ -39,6 +39,7 @@ public class OrcInputFormat extends NestedInputFormat { + static final long SCALE_FACTOR = 8L; private final boolean binaryAsString; private final Configuration conf; @@ -94,6 +95,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity return new OrcReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } + @Override + public long getWeightedSize(String path, long size) + { + return size * SCALE_FACTOR; + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java index 2a0a72728660..e7530167dfce 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java @@ -76,4 +76,15 @@ public void testEquals() .usingGetClass() .verify(); } + + @Test + public void test_getWeightedSize_withoutCompression() throws Exception + { + final OrcInputFormat format = new OrcInputFormat(null, null, null); + long unweightedSize = 100L; + Assert.assertEquals( + unweightedSize * OrcInputFormat.SCALE_FACTOR, + format.getWeightedSize("file.orc", unweightedSize) + ); + } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java index c286e3387c47..723929e98c5b 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputEntity; @@ -30,7 +29,6 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.parquet.guice.Parquet; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -41,7 +39,7 @@ public class ParquetInputFormat extends NestedInputFormat { - private static final long SCALE_FACTOR = 8L; + static final long SCALE_FACTOR = 8L; private final boolean binaryAsString; private final Configuration conf; @@ -101,9 +99,8 @@ public InputEntityReader createReader( return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString); } - @JsonIgnore @Override - public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) + public long getWeightedSize(String path, long size) { return size * SCALE_FACTOR; } diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java index 7d5106d7dfaf..d1f49306f054 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.parquet; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; @@ -32,14 +31,9 @@ public void test_getWeightedSize_withoutCompression() { final ParquetInputFormat format = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration()); long unweightedSize = 100L; - Assert.assertEquals(unweightedSize * 8L, format.getWeightedSize(null, unweightedSize)); - } - - @Test - public void test_getWeightedSize_withGzCompression() - { - final ParquetInputFormat format = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration()); - final long unweightedSize = 100L; - Assert.assertEquals(unweightedSize * 8L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + Assert.assertEquals( + unweightedSize * ParquetInputFormat.SCALE_FACTOR, + format.getWeightedSize("file.parquet", unweightedSize) + ); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java index f6f7757ba60a..67ccfa04f95e 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java @@ -19,10 +19,6 @@ package org.apache.druid.data.input; -import org.apache.druid.utils.CompressionUtils; - -import javax.annotation.Nullable; - /** * A class storing some attributes of an input file. * This information is used to make splits in the parallel indexing. @@ -38,26 +34,24 @@ public class InputFileAttribute private final long size; /** - * The path of the input file. + * The weighted size of the input file. */ - @Nullable - private final CompressionUtils.Format compressionFormat; + private final long weightedSize; public InputFileAttribute(long size) { - this(size, null); + this(size, size); } - public InputFileAttribute(long size, @Nullable CompressionUtils.Format compressionFormat) + public InputFileAttribute(long size, long weightedSize) { this.size = size; - this.compressionFormat = compressionFormat; + this.weightedSize = weightedSize; } - @Nullable - public CompressionUtils.Format getCompressionFormat() + public long getWeightedSize() { - return compressionFormat; + return weightedSize; } public long getSize() diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index da99dba46842..8abb213b74ca 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -32,7 +32,6 @@ import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.utils.CompressionUtils; -import javax.annotation.Nullable; import java.io.File; /** @@ -77,13 +76,12 @@ InputEntityReader createReader( * with scale factor 1. This means that when computing the byte weight that an uncompressed newline delimited * json input object has towards ingestion, we take the file size as is, 1:1. * - * @param compressionFormat The compression format of the input object, if any. + * @param path The path of the input object. Used to tell whether any compression is used. * @param size The size of the input object in bytes. * * @return The weighted size of the input object. */ - @JsonIgnore - default long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) + default long getWeightedSize(String path, long size) { return size; } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 07d61fd14705..13c38c6bf6cf 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -34,7 +34,6 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.utils.CollectionUtils; -import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import javax.annotation.Nullable; @@ -126,6 +125,7 @@ public Stream>> createSplits( { if (!CollectionUtils.isNullOrEmpty(objects)) { return getSplitsForObjects( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), objects, @@ -133,6 +133,7 @@ public Stream>> createSplits( ); } else if (!CollectionUtils.isNullOrEmpty(uris)) { return getSplitsForObjects( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), Lists.transform(uris, CloudObjectLocation::new), @@ -140,6 +141,7 @@ public Stream>> createSplits( ); } else { return getSplitsForPrefixes( + inputFormat, getSplitWidget(), getSplitHintSpecOrDefault(splitHintSpec), prefixes, @@ -227,6 +229,7 @@ private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException * implementations do), this method filters out empty objects. */ private static Stream>> getSplitsForPrefixes( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List prefixes, @@ -247,6 +250,7 @@ private static Stream>> getSplitsForPrefixe // Only consider nonempty objects. Note: size may be unknown; if so we allow it through, to avoid // calling getObjectSize and triggering a network call. return toSplitStream( + inputFormat, splitWidget, splitHintSpec, Iterators.filter(iterator, object -> object.getSize() != 0) // Allow UNKNOWN_SIZE through @@ -263,6 +267,7 @@ private static Stream>> getSplitsForPrefixe * come in through prefixes.) */ private static Stream>> getSplitsForObjects( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final List objectLocations, @@ -280,6 +285,7 @@ private static Stream>> getSplitsForObjects } return toSplitStream( + inputFormat, splitWidget, splitHintSpec, Iterators.transform( @@ -290,6 +296,7 @@ private static Stream>> getSplitsForObjects } private static Stream>> toSplitStream( + final InputFormat inputFormat, final CloudObjectSplitWidget splitWidget, final SplitHintSpec splitHintSpec, final Iterator objectIterator @@ -301,13 +308,14 @@ private static Stream>> toSplitStream( o -> { try { if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) { + long size = splitWidget.getObjectSize(o.getLocation()); return new InputFileAttribute( - splitWidget.getObjectSize(o.getLocation()), - CompressionUtils.Format.fromFileName(o.getLocation().getPath())); + size, + inputFormat.getWeightedSize(o.getLocation().getPath(), size)); } else { return new InputFileAttribute( o.getSize(), - CompressionUtils.Format.fromFileName(o.getLocation().getPath())); + inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize())); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 99e211466535..03706d1ef445 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -29,7 +29,6 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.File; @@ -83,16 +82,6 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity ); } - @JsonIgnore - @Override - public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) - { - if (CompressionUtils.Format.GZ == compressionFormat) { - return size * 4L; - } - return size; - } - public static RFC4180Parser createOpenCsvParser() { return NullHandling.replaceWithDefault() diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index b542fc23d10a..000ce65dae54 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.util.Collections; @@ -139,6 +140,16 @@ public boolean equals(Object o) Objects.equals(delimiter, that.delimiter); } + @Override + public long getWeightedSize(String path, long size) + { + CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); + if (CompressionUtils.Format.GZ == compressionFormat) { + return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; + } + return size; + } + @Override public int hashCode() { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 4470b143042a..dc8957cc1dc9 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; @@ -158,12 +157,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity } } - @JsonIgnore @Override - public long getWeightedSize(@Nullable CompressionUtils.Format compressionFormat, long size) + public long getWeightedSize(String path, long size) { + CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); if (CompressionUtils.Format.GZ == compressionFormat) { - return size * 4L; + return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; } return size; } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 6dd3b2eb2468..2b8fecddc6e0 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -44,7 +44,6 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.CollectionUtils; -import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.Streams; import javax.annotation.Nonnull; @@ -149,22 +148,22 @@ private List getFilesForSerialization() @Override public Stream>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Streams.sequentialStreamFrom(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))) + return Streams.sequentialStreamFrom(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))) .map(InputSplit::new); } @Override public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { - return Iterators.size(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))); + return Iterators.size(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec))); } - private Iterator> getSplitFileIterator(SplitHintSpec splitHintSpec) + private Iterator> getSplitFileIterator(final InputFormat inputFormat, SplitHintSpec splitHintSpec) { final Iterator fileIterator = getFileIterator(); return splitHintSpec.split( fileIterator, - file -> new InputFileAttribute(file.length(), CompressionUtils.Format.fromFileName(file.getName())) + file -> new InputFileAttribute(file.length(), inputFormat.getWeightedSize(file.getName(), file.length())) ); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java index 6c32de5f167d..a6b823ae8915 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.File; @@ -91,4 +92,14 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity { return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns); } + + @Override + public long getWeightedSize(String path, long size) + { + CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); + if (CompressionUtils.Format.GZ == compressionFormat) { + return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; + } + return size; + } } diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 4f33c909399d..7ab05b907196 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -119,6 +119,8 @@ public static Format fromFileName(@Nullable String fileName) return EXTENSION_TO_COMPRESSION_FORMAT.get(extension); } } + + public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L; private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512 diff --git a/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java new file mode 100644 index 000000000000..6384073a4d36 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +public class InputFormatTest +{ + private static final InputFormat INPUT_FORMAT = new InputFormat() + { + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return null; + } + }; + + @Test + public void test_getWeightedSize_withoutCompression() + { + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, INPUT_FORMAT.getWeightedSize("file.json", unweightedSize)); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index 2774e258318d..7768e09d9dc3 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -192,7 +192,7 @@ public void test_getWeightedSize_withoutCompression() { final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); final long unweightedSize = 100L; - Assert.assertEquals(unweightedSize, format.getWeightedSize(null, unweightedSize)); + Assert.assertEquals(unweightedSize, format.getWeightedSize("file.csv", unweightedSize)); } @Test @@ -200,6 +200,9 @@ public void test_getWeightedSize_withGzCompression() { final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); final long unweightedSize = 100L; - Assert.assertEquals(unweightedSize * 4L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + Assert.assertEquals( + unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR, + format.getWeightedSize("file.csv.gz", unweightedSize) + ); } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java index 35f35ab9ccd1..9cf6db93c3f2 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.utils.CompressionUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -139,4 +140,22 @@ public void testHasHeaderRowWithMissingColumnsReturningItsValue() final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0); Assert.assertTrue(format.isFindColumnsFromHeader()); } + @Test + public void test_getWeightedSize_withoutCompression() + { + final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, format.getWeightedSize("file.tsv", unweightedSize)); + } + + @Test + public void test_getWeightedSize_withGzCompression() + { + final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0); + final long unweightedSize = 100L; + Assert.assertEquals( + unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR, + format.getWeightedSize("file.tsv.gz", unweightedSize) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index e00f6ba5679a..d2f1474c2aad 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -129,7 +129,7 @@ public void test_getWeightedSize_withoutCompression() null ); final long unweightedSize = 100L; - Assert.assertEquals(unweightedSize, format.getWeightedSize(null, unweightedSize)); + Assert.assertEquals(unweightedSize, format.getWeightedSize("file.json", unweightedSize)); } @Test @@ -143,6 +143,9 @@ public void test_getWeightedSize_withGzCompression() null ); final long unweightedSize = 100L; - Assert.assertEquals(unweightedSize * 4L, format.getWeightedSize(CompressionUtils.Format.GZ, unweightedSize)); + Assert.assertEquals( + unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR, + format.getWeightedSize("file.json.gz", unweightedSize) + ); } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java index 9754a75ee299..40ab023eeffd 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.utils.CompressionUtils; import org.junit.Assert; import org.junit.Test; @@ -69,4 +70,30 @@ public void testIgnoreCompiledPatternInJson() throws IOException final Map map = mapper.readValue(json, Map.class); Assert.assertFalse(map.containsKey("compiledPattern")); } + + @Test + public void test_getWeightedSize_withoutCompression() throws IOException + { + final RegexInputFormat format = new RegexInputFormat( + "//[^\\r\\n]*[\\r\\n]", + "|", + ImmutableList.of("col1", "col2", "col3") + ); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt", unweightedSize)); + } + @Test + public void test_getWeightedSize_withGzCompression() + { + final RegexInputFormat format = new RegexInputFormat( + "//[^\\r\\n]*[\\r\\n]", + "|", + ImmutableList.of("col1", "col2", "col3") + ); + final long unweightedSize = 100L; + Assert.assertEquals( + unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR, + format.getWeightedSize("file.txt.gz", unweightedSize) + ); + } } From e017ce781e91bdbed99423b0e085b4030ed373cc Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 24 May 2023 15:12:23 -0700 Subject: [PATCH 08/12] * update default value of `maxInputBytesPerWorker` query parameter --- .../src/main/java/org/apache/druid/msq/exec/Limits.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java index c946fd796c23..f9a53ff21950 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java @@ -62,7 +62,7 @@ public class Limits /** * Maximum number of input bytes per worker in case number of tasks is determined automatically. */ - public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 1024L; + public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 1024 * 1024 * 512L; /** * Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}. From 4094212598f6fd6627dcb64607ed31057f82c241 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 24 May 2023 15:14:32 -0700 Subject: [PATCH 09/12] * update docs --- docs/multi-stage-query/reference.md | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 5186171359b6..a37dc208ddfd 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -228,19 +228,19 @@ If you're using the web console, you can specify the context parameters through The following table lists the context parameters for the MSQ task engine: -| Parameter | Description | Default value | -|---|---|---| -| `maxNumTasks` | SELECT, INSERT, REPLACE

The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.

May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 | -| `taskAssignment` | SELECT, INSERT, REPLACE

Determines how many tasks to use. Possible values include:
  • `max`: Uses as many tasks as possible, up to `maxNumTasks`.
  • `auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.
| `max` | -| `finalizeAggregations` | SELECT, INSERT, REPLACE

Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true | -| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE

Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` | -| `rowsInMemory` | INSERT or REPLACE

Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 | +| Parameter | Description | Default value | +|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---| +| `maxNumTasks` | SELECT, INSERT, REPLACE

The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.

May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority. | 2 | +| `taskAssignment` | SELECT, INSERT, REPLACE

Determines how many tasks to use. Possible values include:
  • `max`: Uses as many tasks as possible, up to `maxNumTasks`.
  • `auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 512 MiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When calculating the size of files, the weighted size is used, which considers the file format and compression format used if any. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.
| `max` | +| `finalizeAggregations` | SELECT, INSERT, REPLACE

Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true | +| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE

Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` | +| `rowsInMemory` | INSERT or REPLACE

Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 | | `segmentSortOrder` | INSERT or REPLACE

Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.

You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list | -| `maxParseExceptions`| SELECT, INSERT, REPLACE

Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 | -| `rowsPerSegment` | INSERT or REPLACE

The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 | -| `indexSpec` | INSERT or REPLACE

An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). | -| `durableShuffleStorage` | SELECT, INSERT, REPLACE

Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.

| `false` | -| `faultTolerance` | SELECT, INSERT, REPLACE

Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | +| `maxParseExceptions`| SELECT, INSERT, REPLACE

Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 | +| `rowsPerSegment` | INSERT or REPLACE

The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 | +| `indexSpec` | INSERT or REPLACE

An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). | +| `durableShuffleStorage` | SELECT, INSERT, REPLACE

Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.

| `false` | +| `faultTolerance` | SELECT, INSERT, REPLACE

Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | ## Joins From ea678c1d44c93d09b685705c587e07126a23272e Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 24 May 2023 16:13:18 -0700 Subject: [PATCH 10/12] * fix test failure --- .../data/input/azure/AzureInputSourceTest.java | 15 +++++++++++++-- .../data/input/impl/CloudObjectInputSource.java | 7 +++++-- .../druid/data/input/impl/LocalInputSource.java | 7 ++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index eefa5bfcb905..655e1f342a55 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -23,11 +23,14 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.azure.AzureCloudBlobIterable; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureInputDataConfig; @@ -62,6 +65,14 @@ public class AzureInputSourceTest extends EasyMockSupport private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH); private static final int MAX_LISTING_LENGTH = 10; + private static final InputFormat INPUT_FORMAT = new JsonInputFormat( + new JSONPathSpec(true, null), + null, + false, + null, + null + ); + private AzureStorage storage; private AzureEntityFactory entityFactory; private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; @@ -165,7 +176,7 @@ public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLo ); Stream>> cloudObjectStream = azureInputSource.createSplits( - null, + INPUT_FORMAT, new MaxSizeSplitHintSpec(null, 1) ); @@ -214,7 +225,7 @@ public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudL ); Stream>> cloudObjectStream = azureInputSource.createSplits( - null, + INPUT_FORMAT, new MaxSizeSplitHintSpec(null, 1) ); diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 13c38c6bf6cf..724cd96745d9 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -311,11 +311,14 @@ private static Stream>> toSplitStream( long size = splitWidget.getObjectSize(o.getLocation()); return new InputFileAttribute( size, - inputFormat.getWeightedSize(o.getLocation().getPath(), size)); + inputFormat != null ? inputFormat.getWeightedSize(o.getLocation().getPath(), size) : size); } else { return new InputFileAttribute( o.getSize(), - inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize())); + inputFormat != null + ? inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize()) + : o.getSize() + ); } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 2b8fecddc6e0..f802ae6621cf 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -163,7 +163,12 @@ private Iterator> getSplitFileIterator(final InputFormat inputFormat, final Iterator fileIterator = getFileIterator(); return splitHintSpec.split( fileIterator, - file -> new InputFileAttribute(file.length(), inputFormat.getWeightedSize(file.getName(), file.length())) + file -> new InputFileAttribute( + file.length(), + inputFormat != null + ? inputFormat.getWeightedSize(file.getName(), file.length()) + : file.length() + ) ); } From b04f8d149dd9ec3541fc9990f3d9a86062817dc4 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 25 May 2023 12:03:37 -0700 Subject: [PATCH 11/12] * fix more test failures --- .../druid/msq/kernel/controller/WorkerInputsTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java index f59ae121d4b4..bfb4c8ca6753 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java @@ -221,7 +221,7 @@ public void test_auto_threeBigInputs_fourWorkers() { final StageDefinition stageDef = StageDefinition.builder(0) - .inputs(new TestInputSpec(4_000_000_000L, 4_000_000_001L, 4_000_000_002L)) + .inputs(new TestInputSpec(200_000_000L, 200_000_001L, 200_000_002L)) .maxWorkerCount(4) .processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L)) .build(QUERY_ID); @@ -236,8 +236,8 @@ public void test_auto_threeBigInputs_fourWorkers() Assert.assertEquals( ImmutableMap.>builder() - .put(0, Collections.singletonList(new TestInputSlice(4_000_000_000L, 4_000_000_001L))) - .put(1, Collections.singletonList(new TestInputSlice(4_000_000_002L))) + .put(0, Collections.singletonList(new TestInputSlice(200_000_000L, 200_000_001L))) + .put(1, Collections.singletonList(new TestInputSlice(200_000_002L))) .build(), inputs.assignmentsMap() ); @@ -395,7 +395,7 @@ public void test_auto_shouldSplitDynamicIfPossible() @Test public void test_auto_shouldUseLeastWorkersPossible() { - TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L); + TestInputSpec inputSpecToSplit = new TestInputSpec(100_000_000L, 100_000_000L, 100_000_000L); final StageDefinition stageDef = StageDefinition.builder(0) .inputs(inputSpecToSplit) @@ -417,7 +417,7 @@ public void test_auto_shouldUseLeastWorkersPossible() ImmutableMap.>builder() .put( 0, - Collections.singletonList(new TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L)) + Collections.singletonList(new TestInputSlice(100_000_000L, 100_000_000L, 100_000_000L)) ) .build(), inputs.assignmentsMap() From aabee22833fa56729aea8a215b65048acc4afbe6 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 30 May 2023 11:38:34 -0700 Subject: [PATCH 12/12] * fix static check failure --- .../org/apache/druid/data/input/orc/OrcInputFormatTest.java | 2 +- .../org/apache/druid/data/input/impl/RegexInputFormatTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java index e7530167dfce..a7f6e5131c3a 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java @@ -78,7 +78,7 @@ public void testEquals() } @Test - public void test_getWeightedSize_withoutCompression() throws Exception + public void test_getWeightedSize_withoutCompression() { final OrcInputFormat format = new OrcInputFormat(null, null, null); long unweightedSize = 100L; diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java index 40ab023eeffd..d8c5f49eb3d1 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java @@ -72,7 +72,7 @@ public void testIgnoreCompiledPatternInJson() throws IOException } @Test - public void test_getWeightedSize_withoutCompression() throws IOException + public void test_getWeightedSize_withoutCompression() { final RegexInputFormat format = new RegexInputFormat( "//[^\\r\\n]*[\\r\\n]",