diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 5186171359b6..a37dc208ddfd 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -228,19 +228,19 @@ If you're using the web console, you can specify the context parameters through
The following table lists the context parameters for the MSQ task engine:
-| Parameter | Description | Default value |
-|---|---|---|
-| `maxNumTasks` | SELECT, INSERT, REPLACE
The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.
May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
-| `taskAssignment` | SELECT, INSERT, REPLACE
Determines how many tasks to use. Possible values include:
- `max`: Uses as many tasks as possible, up to `maxNumTasks`.
- `auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.
| `max` |
-| `finalizeAggregations` | SELECT, INSERT, REPLACE
Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
-| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE
Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
-| `rowsInMemory` | INSERT or REPLACE
Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
+| Parameter | Description | Default value |
+|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
+| `maxNumTasks` | SELECT, INSERT, REPLACE
The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.
May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority. | 2 |
+| `taskAssignment` | SELECT, INSERT, REPLACE
Determines how many tasks to use. Possible values include: - `max`: Uses as many tasks as possible, up to `maxNumTasks`.
- `auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 512 MiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When calculating the size of files, the weighted size is used, which considers the file format and compression format used if any. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.
| `max` |
+| `finalizeAggregations` | SELECT, INSERT, REPLACE
Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
+| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE
Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
+| `rowsInMemory` | INSERT or REPLACE
Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE
Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.
You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
-| `maxParseExceptions`| SELECT, INSERT, REPLACE
Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 |
-| `rowsPerSegment` | INSERT or REPLACE
The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
-| `indexSpec` | INSERT or REPLACE
An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
-| `durableShuffleStorage` | SELECT, INSERT, REPLACE
Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.
| `false` |
-| `faultTolerance` | SELECT, INSERT, REPLACE
Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
+| `maxParseExceptions`| SELECT, INSERT, REPLACE
Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 |
+| `rowsPerSegment` | INSERT or REPLACE
The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
+| `indexSpec` | INSERT or REPLACE
An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
+| `durableShuffleStorage` | SELECT, INSERT, REPLACE
Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.
| `false` |
+| `faultTolerance` | SELECT, INSERT, REPLACE
Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
## Joins
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
index cc487a644554..526d03681a33 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
@@ -40,6 +40,7 @@
public class AvroOCFInputFormat extends NestedInputFormat
{
+ static final long SCALE_FACTOR = 8L;
private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class);
private final boolean binaryAsString;
@@ -116,6 +117,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
);
}
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ return size * SCALE_FACTOR;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
index a5c40a78cc01..ceb394b0b3b3 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
@@ -105,4 +105,21 @@ public void testSerdeNonDefaults() throws Exception
Assert.assertEquals(inputFormat, inputFormat2);
}
+
+ @Test
+ public void test_getWeightedSize_withoutCompression() throws Exception
+ {
+ AvroOCFInputFormat format = new AvroOCFInputFormat(
+ jsonMapper,
+ flattenSpec,
+ null,
+ false,
+ false
+ );
+ long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * AvroOCFInputFormat.SCALE_FACTOR,
+ format.getWeightedSize("file.avro", unweightedSize)
+ );
+ }
}
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index eefa5bfcb905..655e1f342a55 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -23,11 +23,14 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.azure.AzureCloudBlobIterable;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
@@ -62,6 +65,14 @@ public class AzureInputSourceTest extends EasyMockSupport
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
+ private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ false,
+ null,
+ null
+ );
+
private AzureStorage storage;
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
@@ -165,7 +176,7 @@ public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLo
);
Stream>> cloudObjectStream = azureInputSource.createSplits(
- null,
+ INPUT_FORMAT,
new MaxSizeSplitHintSpec(null, 1)
);
@@ -214,7 +225,7 @@ public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudL
);
Stream>> cloudObjectStream = azureInputSource.createSplits(
- null,
+ INPUT_FORMAT,
new MaxSizeSplitHintSpec(null, 1)
);
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
index c946fd796c23..f9a53ff21950 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
@@ -62,7 +62,7 @@ public class Limits
/**
* Maximum number of input bytes per worker in case number of tasks is determined automatically.
*/
- public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 1024L;
+ public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 1024 * 1024 * 512L;
/**
* Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
index 8583162cd0e4..ca853bb4e658 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java
@@ -229,7 +229,7 @@ public Iterator> split(
return Iterators.filter(
SlicerUtils.makeSlicesDynamic(
inputIterator,
- item -> inputAttributeExtractor.apply(item).getSize(),
+ item -> inputAttributeExtractor.apply(item).getWeightedSize(),
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
index 40444307d7e7..d8dff6fe082b 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java
@@ -187,6 +187,19 @@ public void test_sliceDynamic_splittable_needTwoDueToBytes()
);
}
+ @Test
+ public void test_sliceDynamic_splittableFilesWithCompression_needThreeDueToBytes()
+ {
+ Assert.assertEquals(
+ ImmutableList.of(
+ splittableSlice("foo.gz"),
+ splittableSlice("bar.gz"),
+ splittableSlice("baz.gz")
+ ),
+ slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 100, 5, 7)
+ );
+ }
+
@Test
public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneHundredMax()
{
@@ -367,7 +380,7 @@ public Stream>> createSplits(
if (useSplitHintSpec) {
splits = splitHintSpec.split(
strings.iterator(),
- s -> new InputFileAttribute(s.length())
+ s -> new InputFileAttribute(s.length(), INPUT_FORMAT.getWeightedSize(s, s.length()))
);
} else {
// Ignore splitHintSpec, return one element per split. Similar to HttpInputSource, for example.
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index f59ae121d4b4..bfb4c8ca6753 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -221,7 +221,7 @@ public void test_auto_threeBigInputs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
- .inputs(new TestInputSpec(4_000_000_000L, 4_000_000_001L, 4_000_000_002L))
+ .inputs(new TestInputSpec(200_000_000L, 200_000_001L, 200_000_002L))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
@@ -236,8 +236,8 @@ public void test_auto_threeBigInputs_fourWorkers()
Assert.assertEquals(
ImmutableMap.>builder()
- .put(0, Collections.singletonList(new TestInputSlice(4_000_000_000L, 4_000_000_001L)))
- .put(1, Collections.singletonList(new TestInputSlice(4_000_000_002L)))
+ .put(0, Collections.singletonList(new TestInputSlice(200_000_000L, 200_000_001L)))
+ .put(1, Collections.singletonList(new TestInputSlice(200_000_002L)))
.build(),
inputs.assignmentsMap()
);
@@ -395,7 +395,7 @@ public void test_auto_shouldSplitDynamicIfPossible()
@Test
public void test_auto_shouldUseLeastWorkersPossible()
{
- TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 1_000_000_000L, 1_000_000_000L);
+ TestInputSpec inputSpecToSplit = new TestInputSpec(100_000_000L, 100_000_000L, 100_000_000L);
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(inputSpecToSplit)
@@ -417,7 +417,7 @@ public void test_auto_shouldUseLeastWorkersPossible()
ImmutableMap.>builder()
.put(
0,
- Collections.singletonList(new TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L))
+ Collections.singletonList(new TestInputSlice(100_000_000L, 100_000_000L, 100_000_000L))
)
.build(),
inputs.assignmentsMap()
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
index af54d6aac508..7474a79a15eb 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
@@ -39,6 +39,7 @@
public class OrcInputFormat extends NestedInputFormat
{
+ static final long SCALE_FACTOR = 8L;
private final boolean binaryAsString;
private final Configuration conf;
@@ -94,6 +95,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
return new OrcReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ return size * SCALE_FACTOR;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
index 2a0a72728660..a7f6e5131c3a 100644
--- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
+++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcInputFormatTest.java
@@ -76,4 +76,15 @@ public void testEquals()
.usingGetClass()
.verify();
}
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final OrcInputFormat format = new OrcInputFormat(null, null, null);
+ long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * OrcInputFormat.SCALE_FACTOR,
+ format.getWeightedSize("file.orc", unweightedSize)
+ );
+ }
}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
index 403802b34f75..723929e98c5b 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
@@ -39,6 +39,7 @@
public class ParquetInputFormat extends NestedInputFormat
{
+ static final long SCALE_FACTOR = 8L;
private final boolean binaryAsString;
private final Configuration conf;
@@ -98,6 +99,12 @@ public InputEntityReader createReader(
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ return size * SCALE_FACTOR;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java
new file mode 100644
index 000000000000..d1f49306f054
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetInputFormatTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParquetInputFormatTest
+{
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final ParquetInputFormat format = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration());
+ long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * ParquetInputFormat.SCALE_FACTOR,
+ format.getWeightedSize("file.parquet", unweightedSize)
+ );
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java
index 5cb90cf57b66..67ccfa04f95e 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputFileAttribute.java
@@ -33,9 +33,25 @@ public class InputFileAttribute
*/
private final long size;
+ /**
+ * The weighted size of the input file.
+ */
+ private final long weightedSize;
+
public InputFileAttribute(long size)
+ {
+ this(size, size);
+ }
+
+ public InputFileAttribute(long size, long weightedSize)
{
this.size = size;
+ this.weightedSize = weightedSize;
+ }
+
+ public long getWeightedSize()
+ {
+ return weightedSize;
}
public long getSize()
diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
index 345b47ca3ee9..8abb213b74ca 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
@@ -30,6 +30,7 @@
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.utils.CompressionUtils;
import java.io.File;
@@ -67,4 +68,21 @@ InputEntityReader createReader(
InputEntity source,
File temporaryDirectory
);
+
+ /**
+ * Computes the weighted size of a given input object of the underyling input format type, weighted
+ * for its cost during ingestion. The weight calculated is dependent on the format type and compression type
+ * ({@link CompressionUtils.Format}) used if any. Uncompressed newline delimited json is used as baseline
+ * with scale factor 1. This means that when computing the byte weight that an uncompressed newline delimited
+ * json input object has towards ingestion, we take the file size as is, 1:1.
+ *
+ * @param path The path of the input object. Used to tell whether any compression is used.
+ * @param size The size of the input object in bytes.
+ *
+ * @return The weighted size of the input object.
+ */
+ default long getWeightedSize(String path, long size)
+ {
+ return size;
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
index d3d134cd2131..724cd96745d9 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
@@ -125,6 +125,7 @@ public Stream>> createSplits(
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return getSplitsForObjects(
+ inputFormat,
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
objects,
@@ -132,6 +133,7 @@ public Stream>> createSplits(
);
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
return getSplitsForObjects(
+ inputFormat,
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
Lists.transform(uris, CloudObjectLocation::new),
@@ -139,6 +141,7 @@ public Stream>> createSplits(
);
} else {
return getSplitsForPrefixes(
+ inputFormat,
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
prefixes,
@@ -226,6 +229,7 @@ private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException
* implementations do), this method filters out empty objects.
*/
private static Stream>> getSplitsForPrefixes(
+ final InputFormat inputFormat,
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final List prefixes,
@@ -246,6 +250,7 @@ private static Stream>> getSplitsForPrefixe
// Only consider nonempty objects. Note: size may be unknown; if so we allow it through, to avoid
// calling getObjectSize and triggering a network call.
return toSplitStream(
+ inputFormat,
splitWidget,
splitHintSpec,
Iterators.filter(iterator, object -> object.getSize() != 0) // Allow UNKNOWN_SIZE through
@@ -262,6 +267,7 @@ private static Stream>> getSplitsForPrefixe
* come in through prefixes.)
*/
private static Stream>> getSplitsForObjects(
+ final InputFormat inputFormat,
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final List objectLocations,
@@ -279,6 +285,7 @@ private static Stream>> getSplitsForObjects
}
return toSplitStream(
+ inputFormat,
splitWidget,
splitHintSpec,
Iterators.transform(
@@ -289,6 +296,7 @@ private static Stream>> getSplitsForObjects
}
private static Stream>> toSplitStream(
+ final InputFormat inputFormat,
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final Iterator objectIterator
@@ -300,9 +308,17 @@ private static Stream>> toSplitStream(
o -> {
try {
if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) {
- return new InputFileAttribute(splitWidget.getObjectSize(o.getLocation()));
+ long size = splitWidget.getObjectSize(o.getLocation());
+ return new InputFileAttribute(
+ size,
+ inputFormat != null ? inputFormat.getWeightedSize(o.getLocation().getPath(), size) : size);
} else {
- return new InputFileAttribute(o.getSize());
+ return new InputFileAttribute(
+ o.getSize(),
+ inputFormat != null
+ ? inputFormat.getWeightedSize(o.getLocation().getPath(), o.getSize())
+ : o.getSize()
+ );
}
}
catch (IOException e) {
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
index b542fc23d10a..000ce65dae54 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
@@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -139,6 +140,16 @@ public boolean equals(Object o)
Objects.equals(delimiter, that.delimiter);
}
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
+ if (CompressionUtils.Format.GZ == compressionFormat) {
+ return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
+ }
+ return size;
+ }
+
@Override
public int hashCode()
{
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 200c621e1389..dc8957cc1dc9 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -29,6 +29,7 @@
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.File;
@@ -156,6 +157,16 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
}
}
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
+ if (CompressionUtils.Format.GZ == compressionFormat) {
+ return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
+ }
+ return size;
+ }
+
/**
* Create a new JsonInputFormat object based on the given parameter
*
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
index 23208c94d765..f802ae6621cf 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java
@@ -148,20 +148,28 @@ private List getFilesForSerialization()
@Override
public Stream>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
- return Streams.sequentialStreamFrom(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec)))
+ return Streams.sequentialStreamFrom(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec)))
.map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
- return Iterators.size(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec)));
+ return Iterators.size(getSplitFileIterator(inputFormat, getSplitHintSpecOrDefault(splitHintSpec)));
}
- private Iterator> getSplitFileIterator(SplitHintSpec splitHintSpec)
+ private Iterator> getSplitFileIterator(final InputFormat inputFormat, SplitHintSpec splitHintSpec)
{
final Iterator fileIterator = getFileIterator();
- return splitHintSpec.split(fileIterator, file -> new InputFileAttribute(file.length()));
+ return splitHintSpec.split(
+ fileIterator,
+ file -> new InputFileAttribute(
+ file.length(),
+ inputFormat != null
+ ? inputFormat.getWeightedSize(file.getName(), file.length())
+ : file.length()
+ )
+ );
}
@VisibleForTesting
diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
index 6c32de5f167d..a6b823ae8915 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
@@ -29,6 +29,7 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.File;
@@ -91,4 +92,14 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
{
return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns);
}
+
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
+ if (CompressionUtils.Format.GZ == compressionFormat) {
+ return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
+ }
+ return size;
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
index 058361bd7a8a..7ab05b907196 100644
--- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
@@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
@@ -30,6 +31,7 @@
import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
+import org.apache.commons.compress.utils.FileNameUtils;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
@@ -40,6 +42,7 @@
import org.apache.druid.java.util.common.io.NativeIO;
import org.apache.druid.java.util.common.logger.Logger;
+import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -53,6 +56,7 @@
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Enumeration;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -64,14 +68,61 @@
@PublicApi
public class CompressionUtils
{
+
+ public enum Format
+ {
+ BZ2(".bz2", "bz2"),
+ GZ(".gz", "gz"),
+ SNAPPY(".sz", "sz"),
+ XZ(".xz", "xz"),
+ ZIP(".zip", "zip"),
+ ZSTD(".zst", "zst");
+
+ private static final Map EXTENSION_TO_COMPRESSION_FORMAT;
+
+ static {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ builder.put(BZ2.getExtension(), BZ2);
+ builder.put(GZ.getExtension(), GZ);
+ builder.put(SNAPPY.getExtension(), SNAPPY);
+ builder.put(XZ.getExtension(), XZ);
+ builder.put(ZIP.getExtension(), ZIP);
+ builder.put(ZSTD.getExtension(), ZSTD);
+ EXTENSION_TO_COMPRESSION_FORMAT = builder.build();
+ }
+
+ private final String suffix;
+ private final String extension;
+ Format(String suffix, String extension)
+ {
+ this.suffix = suffix;
+ this.extension = extension;
+ }
+
+ public String getSuffix()
+ {
+ return suffix;
+ }
+
+ public String getExtension()
+ {
+ return extension;
+ }
+
+ @Nullable
+ public static Format fromFileName(@Nullable String fileName)
+ {
+ String extension = FileNameUtils.getExtension(fileName);
+ if (null == extension) {
+ return null;
+ }
+ return EXTENSION_TO_COMPRESSION_FORMAT.get(extension);
+ }
+ }
+
+ public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L;
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
- private static final String BZ2_SUFFIX = ".bz2";
- private static final String GZ_SUFFIX = ".gz";
- private static final String XZ_SUFFIX = ".xz";
- private static final String ZIP_SUFFIX = ".zip";
- private static final String SNAPPY_SUFFIX = ".sz";
- private static final String ZSTD_SUFFIX = ".zst";
private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512
/**
@@ -198,7 +249,7 @@ public static FileUtils.FileCopyResult unzip(
throw Throwables.propagate(e);
}
} else {
- final File tmpFile = File.createTempFile("compressionUtilZipCache", ZIP_SUFFIX);
+ final File tmpFile = File.createTempFile("compressionUtilZipCache", Format.ZIP.getSuffix());
try {
FileUtils.retryCopy(
byteSource,
@@ -527,7 +578,7 @@ public static boolean isZip(String fName)
if (Strings.isNullOrEmpty(fName)) {
return false;
}
- return fName.endsWith(ZIP_SUFFIX); // Technically a file named `.zip` would be fine
+ return fName.endsWith(Format.ZIP.getSuffix()); // Technically a file named `.zip` would be fine
}
/**
@@ -542,7 +593,7 @@ public static boolean isGz(String fName)
if (Strings.isNullOrEmpty(fName)) {
return false;
}
- return fName.endsWith(GZ_SUFFIX) && fName.length() > GZ_SUFFIX.length();
+ return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length();
}
/**
@@ -568,17 +619,17 @@ public static String getGzBaseName(String fname)
*/
public static InputStream decompress(final InputStream in, final String fileName) throws IOException
{
- if (fileName.endsWith(GZ_SUFFIX)) {
+ if (fileName.endsWith(Format.GZ.getSuffix())) {
return gzipInputStream(in);
- } else if (fileName.endsWith(BZ2_SUFFIX)) {
+ } else if (fileName.endsWith(Format.BZ2.getSuffix())) {
return new BZip2CompressorInputStream(in, true);
- } else if (fileName.endsWith(XZ_SUFFIX)) {
+ } else if (fileName.endsWith(Format.XZ.getSuffix())) {
return new XZCompressorInputStream(in, true);
- } else if (fileName.endsWith(SNAPPY_SUFFIX)) {
+ } else if (fileName.endsWith(Format.SNAPPY.getSuffix())) {
return new FramedSnappyCompressorInputStream(in);
- } else if (fileName.endsWith(ZSTD_SUFFIX)) {
+ } else if (fileName.endsWith(Format.ZSTD.getSuffix())) {
return new ZstdCompressorInputStream(in);
- } else if (fileName.endsWith(ZIP_SUFFIX)) {
+ } else if (fileName.endsWith(Format.ZIP.getSuffix())) {
// This reads the first file in the archive.
final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8);
try {
diff --git a/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java
new file mode 100644
index 000000000000..6384073a4d36
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/data/input/InputFormatTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class InputFormatTest
+{
+ private static final InputFormat INPUT_FORMAT = new InputFormat()
+ {
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+ {
+ return null;
+ }
+ };
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, INPUT_FORMAT.getWeightedSize("file.json", unweightedSize));
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java
index 5f5a52ef15af..7768e09d9dc3 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
@@ -185,4 +186,23 @@ public void testHasHeaderRowWithMissingColumnsReturningItsValue()
final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0);
Assert.assertTrue(format.isFindColumnsFromHeader());
}
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0);
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, format.getWeightedSize("file.csv", unweightedSize));
+ }
+
+ @Test
+ public void test_getWeightedSize_withGzCompression()
+ {
+ final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0);
+ final long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+ format.getWeightedSize("file.csv.gz", unweightedSize)
+ );
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java
index 35f35ab9ccd1..9cf6db93c3f2 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.utils.CompressionUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -139,4 +140,22 @@ public void testHasHeaderRowWithMissingColumnsReturningItsValue()
final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0);
Assert.assertTrue(format.isFindColumnsFromHeader());
}
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0);
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, format.getWeightedSize("file.tsv", unweightedSize));
+ }
+
+ @Test
+ public void test_getWeightedSize_withGzCompression()
+ {
+ final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0);
+ final long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+ format.getWeightedSize("file.tsv.gz", unweightedSize)
+ );
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
index acc8a5703fef..d2f1474c2aad 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
@@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.utils.CompressionUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -116,4 +117,35 @@ public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets()
);
Assert.assertFalse(format.isKeepNullColumns());
}
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final JsonInputFormat format = new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ false,
+ null,
+ null
+ );
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, format.getWeightedSize("file.json", unweightedSize));
+ }
+
+ @Test
+ public void test_getWeightedSize_withGzCompression()
+ {
+ final JsonInputFormat format = new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ false,
+ null,
+ null
+ );
+ final long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+ format.getWeightedSize("file.json.gz", unweightedSize)
+ );
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
index 9754a75ee299..d8c5f49eb3d1 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.utils.CompressionUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -69,4 +70,30 @@ public void testIgnoreCompiledPatternInJson() throws IOException
final Map map = mapper.readValue(json, Map.class);
Assert.assertFalse(map.containsKey("compiledPattern"));
}
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final RegexInputFormat format = new RegexInputFormat(
+ "//[^\\r\\n]*[\\r\\n]",
+ "|",
+ ImmutableList.of("col1", "col2", "col3")
+ );
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt", unweightedSize));
+ }
+ @Test
+ public void test_getWeightedSize_withGzCompression()
+ {
+ final RegexInputFormat format = new RegexInputFormat(
+ "//[^\\r\\n]*[\\r\\n]",
+ "|",
+ ImmutableList.of("col1", "col2", "col3")
+ );
+ final long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+ format.getWeightedSize("file.txt.gz", unweightedSize)
+ );
+ }
}