Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9eb5323
Add support to optionally parse numbers from strings for text-based p…
abhishekrb19 Sep 16, 2024
df2706c
Remove parameter from deprecated parse specs.
abhishekrb19 Sep 16, 2024
54847c9
Remove delta change.
abhishekrb19 Sep 16, 2024
b0c538b
checkstyle fix.
abhishekrb19 Sep 16, 2024
7336e79
Remove unneeded test
abhishekrb19 Sep 16, 2024
423d437
More checkstyle fixes.
abhishekrb19 Sep 16, 2024
5d62471
Fix and update test.
abhishekrb19 Sep 16, 2024
f43d544
Fix and add test in CsvReaderTest.
abhishekrb19 Sep 16, 2024
43c5a85
Javadocs and more test cases.
abhishekrb19 Sep 16, 2024
5dd1fff
Cleanup, checkstyle fix and some test fixes.
abhishekrb19 Sep 16, 2024
6076209
Refactor and more tests.
abhishekrb19 Sep 16, 2024
c0ac9ae
Add some null and empty "" cases.
abhishekrb19 Sep 16, 2024
8e4dc2c
Add public-facing docs.
abhishekrb19 Sep 16, 2024
7d6389e
More new lines.
abhishekrb19 Sep 16, 2024
c8f8325
Remove VisibleForTesting package method as there's sufficient coverage.
abhishekrb19 Sep 16, 2024
4e12258
spelling.
abhishekrb19 Sep 17, 2024
220f93d
Tests for CsvReader and DelimmitedReader.
abhishekrb19 Sep 17, 2024
c2d591e
Add boolean values to the test data -- retain them as stringy bools.
abhishekrb19 Sep 17, 2024
5338d8d
fix up.
abhishekrb19 Sep 17, 2024
94c35f0
Use tryParse instead of nested exception handling.
abhishekrb19 Sep 17, 2024
e6a2b59
Review: rename to getTransformationFunction.
abhishekrb19 Sep 19, 2024
6dd3986
Rename shouldParseNumbers to tryParseNumbers.
abhishekrb19 Sep 19, 2024
c2c3ee9
Better docs
abhishekrb19 Sep 19, 2024
dccb108
Merge branch 'master' into text_format_parse_numbers
abhishekrb19 Sep 19, 2024
153bff5
Rename fix and change .spelling entry order.
abhishekrb19 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void prepareData() throws Exception
@Setup(Level.Trial)
public void prepareFormat()
{
format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1);
format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1, null);
}

@Benchmark
Expand Down
2 changes: 2 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Configure the CSV `inputFormat` to load CSV data as follows:
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
| tryParseNumbers| Boolean| If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by `listDelimiter`. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |

For example:

Expand All @@ -150,6 +151,7 @@ Configure the TSV `inputFormat` to load TSV data as follows:
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
| tryParseNumbers| Boolean| If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by `listDelimiter`. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |

Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down Expand Up @@ -584,7 +584,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.storage.sql.CatalogManager;
import org.apache.druid.catalog.storage.sql.SQLCatalogManager;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule;
import org.apache.druid.server.security.Access;
Expand All @@ -35,8 +33,6 @@
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;

import java.util.Arrays;

public class CatalogTests
{
public static final String TEST_AUTHORITY = "test";
Expand Down Expand Up @@ -74,17 +70,6 @@ public Access authorize(
}
}

public static InputFormat csvFormat()
{
return new CsvInputFormat(
Arrays.asList("x", "y", "z"),
null, // listDelimiter
false, // hasHeaderRow
false, // findColumnsFromHeader
0 // skipHeaderRows
);
}

public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();

public static class DbFixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
null
);

Expand Down Expand Up @@ -453,7 +453,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
null,
false,
null,
0
0,
null
);

public static class ConstructorTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,8 @@ public void testValueInCsvFormat() throws IOException
null,
false,
false,
0
0,
null
),
"kafka.newheader.",
"kafka.newkey.key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,8 @@ public void testValueInCsvFormat() throws IOException
null,
false,
false,
0
0,
null
),
"kinesis.newts.partitionKey",
"kinesis.newts.timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public CSVFlatDataParser(
this.valueColumn,
Arrays.toString(columns.toArray())
);
CSVParser csvParser = new CSVParser(null, columns, hasHeaderRow, skipHeaderRows);
CSVParser csvParser = new CSVParser(null, columns, hasHeaderRow, skipHeaderRows, false);
csvParser.startFileFromBeginning();
this.parser = new DelegateParser(
csvParser,
Expand Down Expand Up @@ -355,13 +355,13 @@ public List<String> getColumns()
@JsonProperty
public String getKeyColumn()
{
return this.keyColumn;
return keyColumn;
}

@JsonProperty
public String getValueColumn()
{
return this.valueColumn;
return valueColumn;
}

@Override
Expand Down Expand Up @@ -431,7 +431,8 @@ public TSVFlatDataParser(
StringUtils.emptyToNullNonDruidDataString(delimiter),
StringUtils.emptyToNullNonDruidDataString(listDelimiter),
hasHeaderRow,
skipHeaderRows
skipHeaderRows,
false
);
delegate.startFileFromBeginning();
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testCSVWithHeader()
// The third row will parse to data
Assert.assertEquals(ImmutableMap.of("val2", "val3"), parser.getParser().parseToMap("val1,val2,val3"));
}

@Test(expected = IllegalArgumentException.class)
public void testBadCSV()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testIngestWithSanitizedNullByte() throws IOException
.dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead), SystemFields.none()),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("agent_category", ColumnType.STRING)
Expand Down Expand Up @@ -255,7 +255,7 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce
.dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead), SystemFields.none()),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("agent_category", ColumnType.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ public void testGroupByWithLimitAndOrdering(String contextName, Map<String, Obje
.setDataSource(
new ExternalDataSource(
new InlineInputSource("dim1\nabc\nxyz\ndef\nxyz\nabc\nxyz\nabc\nxyz\ndef\nbbb\naaa"),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder().add("dim1", ColumnType.STRING).build()
)
)
Expand Down Expand Up @@ -2376,7 +2376,7 @@ public void testSelectRowsGetUntruncatedByDefault(String contextName, Map<String
Collections.nCopies(numFiles, toRead),
SystemFields.none()
),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder().add("timestamp", ColumnType.STRING).build()
))
.intervals(querySegmentSpec(Filtration.eternity()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down Expand Up @@ -1063,7 +1063,7 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);
try (CloseableIterator<InputRow> readerIterator = reader.read()) {
Expand Down Expand Up @@ -1111,7 +1111,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,8 @@ private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appen
"|",
null,
false,
0
0,
null
),
appendToExisting,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
false,
0
0,
null
);

private static final DataSchema DATA_SCHEMA =
Expand Down Expand Up @@ -473,7 +474,7 @@ public void testTransformSpec() throws Exception
indexIngestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
dimensionsSpec,
new CsvInputFormat(columns, listDelimiter, null, false, 0),
new CsvInputFormat(columns, listDelimiter, null, false, 0, null),
transformSpec,
null,
tuningConfig,
Expand Down Expand Up @@ -901,7 +902,7 @@ public void testCSVFileWithHeader() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -941,7 +942,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1341,7 +1342,7 @@ public void testIgnoreParseException() throws Exception
parseExceptionIgnoreSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1391,7 +1392,7 @@ public void testReportParseException() throws Exception
indexIngestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1632,7 +1633,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1751,7 +1752,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc
ingestionSpec = createIngestionSpec(
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1845,7 +1846,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception
ingestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1915,7 +1916,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception
ingestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
csvParseSpec.getListDelimiter(),
getColumnsFromHeader ? null : true,
getColumnsFromHeader ? true : null,
csvParseSpec.getSkipHeaderRows()
csvParseSpec.getSkipHeaderRows(),
null
);
} else if (parseSpec instanceof DelimitedParseSpec) {
DelimitedParseSpec delimitedParseSpec = (DelimitedParseSpec) parseSpec;
Expand All @@ -324,7 +325,8 @@ public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
delimitedParseSpec.getDelimiter(),
getColumnsFromHeader ? null : true,
getColumnsFromHeader ? true : null,
delimitedParseSpec.getSkipHeaderRows()
delimitedParseSpec.getSkipHeaderRows(),
null
);
} else if (parseSpec instanceof RegexParseSpec) {
RegexParseSpec regexParseSpec = (RegexParseSpec) parseSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
false,
false,
0
0,
null
);
public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING =
TuningConfigBuilder.forParallelIndexTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class HashPartitionAdjustingCorePartitionSizeTest extends AbstractMultiPh
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2020-01-01/P1M");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final String INPUT_FILTER = "test_*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexin
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");

Expand Down
Loading