From 19cbcab8167935dbaf9d659c6afe18283af1229d Mon Sep 17 00:00:00 2001 From: Prathamesh9284 Date: Tue, 17 Feb 2026 18:46:03 +0000 Subject: [PATCH 1/4] fix: improve CSV header validation and error messages --- .../sql/sources/fs/JavaCSVTableSource.java | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java index 8d42db250..c72c45757 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java @@ -116,7 +116,7 @@ public Tuple, Collection> eval } private Stream createStream(final String actualInputPath) { - return streamLines(actualInputPath).map(this::parseLine); + return this.streamLines(actualInputPath).map(this::parseLine); } private Record parseLine(final String s) { @@ -126,7 +126,12 @@ private Record parseLine(final String s) { final String[] tokens = CsvRowConverter.parseLine(s, this.separator); if (tokens.length != fieldTypes.size()) throw new IllegalStateException( - String.format("Error while parsing CSV file %s at line %s, using separator %s", sourcePath, s, separator)); + String.format( + "Column count mismatch in CSV file '%s': expected %d columns but found %d " + + "(separator '%s'). Line: '%s'. " + + "Ensure the header uses 'name:type' format with commas " + + "and data rows use '%s' as delimiter.", + sourcePath, fieldTypes.size(), tokens.length, separator, s, separator)); // now tokens.length == fieldtypes.size final Object[] objects = new Object[tokens.length]; @@ -163,12 +168,20 @@ public List getSupportedOutputChannels(final int index) { * @param path of the file * @return the {@link Stream} */ - private static Stream streamLines(final String path) { + private Stream streamLines(final String path) { final FileSystem fileSystem = FileSystems.getFileSystem(path).orElseThrow( () -> new IllegalStateException(String.format("No file system found for %s", path))); try { final Iterator lineIterator = createLineIterator(fileSystem, path); - lineIterator.next(); // skip header row + + if (!lineIterator.hasNext()) { + throw new IllegalStateException(String.format( + "CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').", + path)); + } + + final String headerLine = lineIterator.next(); // read & skip header + validateHeaderLine(headerLine); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false); } catch (final IOException e) { throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); @@ -176,6 +189,37 @@ private static Stream streamLines(final String path) { } + /** + * Validates the CSV header for Calcite compatibility. + * Checks that the header is present, uses comma separators (not the data + * delimiter), and each column follows the 'name:type' format + * (e.g., 'id:int,name:string,email:string'). Note that Calcite hardcodes + * commas for header parsing, while data rows use Wayang's configurable + * separator (default ';'). + * + * @param path the filesystem path to the CSV file + */ + private void validateHeaderLine(final String headerLine) { + final String[] headerColumns = headerLine.split(","); + + if (headerColumns.length != fieldTypes.size()) { + throw new IllegalStateException(String.format( + "CSV file '%s': header has %d comma-separated columns but table schema expects %d. " + + "Ensure the header uses commas with typed columns " + + "(e.g., 'id:int,name:string,email:string,country:string'). Header: '%s'.", + sourcePath, headerColumns.length, fieldTypes.size(), headerLine)); + } + + for (final String column : headerColumns) { + if (!column.trim().contains(":")) { + throw new IllegalStateException(String.format( + "CSV file '%s': header column '%s' missing required type. " + + "Expected 'name:type' format (e.g., 'id:int'). Full header: '%s'.", + sourcePath, column.trim(), headerLine)); + } + } + } + /** * Creates an {@link Iterator} over the lines of a given {@code path} (that * resides in the given {@code fileSystem}). From 788e91a2050fb38b3e8cb3d1f6772079e2a29b05 Mon Sep 17 00:00:00 2001 From: Prathamesh9284 Date: Wed, 18 Feb 2026 10:39:57 +0000 Subject: [PATCH 2/4] refactor: move validateHeaderLine from streamLines to createStream and revert streamLines to its original form --- .../sql/sources/fs/JavaCSVTableSource.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java index c72c45757..9723720a2 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java @@ -116,7 +116,8 @@ public Tuple, Collection> eval } private Stream createStream(final String actualInputPath) { - return this.streamLines(actualInputPath).map(this::parseLine); + validateHeaderLine(actualInputPath); + return streamLines(actualInputPath).map(this::parseLine); } private Record parseLine(final String s) { @@ -168,20 +169,12 @@ public List getSupportedOutputChannels(final int index) { * @param path of the file * @return the {@link Stream} */ - private Stream streamLines(final String path) { + private static Stream streamLines(final String path) { final FileSystem fileSystem = FileSystems.getFileSystem(path).orElseThrow( () -> new IllegalStateException(String.format("No file system found for %s", path))); try { final Iterator lineIterator = createLineIterator(fileSystem, path); - - if (!lineIterator.hasNext()) { - throw new IllegalStateException(String.format( - "CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').", - path)); - } - - final String headerLine = lineIterator.next(); // read & skip header - validateHeaderLine(headerLine); + lineIterator.next(); // skip header row return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false); } catch (final IOException e) { throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); @@ -199,24 +192,37 @@ private Stream streamLines(final String path) { * * @param path the filesystem path to the CSV file */ - private void validateHeaderLine(final String headerLine) { - final String[] headerColumns = headerLine.split(","); - - if (headerColumns.length != fieldTypes.size()) { - throw new IllegalStateException(String.format( - "CSV file '%s': header has %d comma-separated columns but table schema expects %d. " - + "Ensure the header uses commas with typed columns " - + "(e.g., 'id:int,name:string,email:string,country:string'). Header: '%s'.", - sourcePath, headerColumns.length, fieldTypes.size(), headerLine)); - } + private void validateHeaderLine(final String path) { + final FileSystem fileSystem = FileSystems.getFileSystem(path).orElseThrow( + () -> new IllegalStateException(String.format("No file system found for %s", path))); + try { + final Iterator lineIterator = createLineIterator(fileSystem, path); - for (final String column : headerColumns) { - if (!column.trim().contains(":")) { + if (!lineIterator.hasNext()) { + throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path)); + } + + final String headerLine = lineIterator.next(); // read header row + final String[] headerColumns = headerLine.split(","); // split header row into columns + + if (headerColumns.length != fieldTypes.size()) { throw new IllegalStateException(String.format( - "CSV file '%s': header column '%s' missing required type. " - + "Expected 'name:type' format (e.g., 'id:int'). Full header: '%s'.", - sourcePath, column.trim(), headerLine)); + "CSV file '%s': header has %d comma-separated columns but table schema expects %d. " + + "Ensure the header uses commas with typed columns " + + "(e.g., 'id:int,name:string,email:string,country:string'). Header: '%s'.", + sourcePath, headerColumns.length, fieldTypes.size(), headerLine)); } + + for (final String column : headerColumns) { + if (!column.trim().contains(":")) { + throw new IllegalStateException(String.format( + "CSV file '%s': header column '%s' missing required type. " + + "Expected 'name:type' format (e.g., 'id:int'). Full header: '%s'.", + sourcePath, column.trim(), headerLine)); + } + } + } catch (final IOException e) { + throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); } } From cb454b70178fe79fd59eaa159f961b02890ef536 Mon Sep 17 00:00:00 2001 From: Prathamesh9284 Date: Wed, 18 Feb 2026 14:32:03 +0000 Subject: [PATCH 3/4] fix: enhance CSV header validation to ensure correct separator usage --- .../wayang/api/sql/sources/fs/JavaCSVTableSource.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java index 9723720a2..5ed3d5523 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java @@ -205,12 +205,12 @@ private void validateHeaderLine(final String path) { final String headerLine = lineIterator.next(); // read header row final String[] headerColumns = headerLine.split(","); // split header row into columns - if (headerColumns.length != fieldTypes.size()) { + if (headerColumns.length == 1 && headerLine.contains(String.valueOf(separator))) { throw new IllegalStateException(String.format( - "CSV file '%s': header has %d comma-separated columns but table schema expects %d. " - + "Ensure the header uses commas with typed columns " - + "(e.g., 'id:int,name:string,email:string,country:string'). Header: '%s'.", - sourcePath, headerColumns.length, fieldTypes.size(), headerLine)); + "CSV file '%s': header uses '%s' as separator, but Calcite requires commas. " + + "Header: '%s'. " + + "Expected format: %s.", + sourcePath, separator, headerLine, headerLine.replace(String.valueOf(separator), ","))); } for (final String column : headerColumns) { From 455a05ad4550569e3f08d9b8d5475e2903ddb641 Mon Sep 17 00:00:00 2001 From: Prathamesh9284 Date: Mon, 23 Feb 2026 10:02:45 +0000 Subject: [PATCH 4/4] refactor: move header validation into streamLines to avoid opening file twice --- .../sql/sources/fs/JavaCSVTableSource.java | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java index 5ed3d5523..e37b236e8 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java @@ -116,7 +116,6 @@ public Tuple, Collection> eval } private Stream createStream(final String actualInputPath) { - validateHeaderLine(actualInputPath); return streamLines(actualInputPath).map(this::parseLine); } @@ -128,11 +127,9 @@ private Record parseLine(final String s) { if (tokens.length != fieldTypes.size()) throw new IllegalStateException( String.format( - "Column count mismatch in CSV file '%s': expected %d columns but found %d " - + "(separator '%s'). Line: '%s'. " - + "Ensure the header uses 'name:type' format with commas " - + "and data rows use '%s' as delimiter.", - sourcePath, fieldTypes.size(), tokens.length, separator, s, separator)); + "CSV file '%s': data row has %d columns but expected %d " + + "(separator '%s'). Line: '%s'.", + sourcePath, tokens.length, fieldTypes.size(), separator, s)); // now tokens.length == fieldtypes.size final Object[] objects = new Object[tokens.length]; @@ -174,55 +171,50 @@ private static Stream streamLines(final String path) { () -> new IllegalStateException(String.format("No file system found for %s", path))); try { final Iterator lineIterator = createLineIterator(fileSystem, path); - lineIterator.next(); // skip header row + if (!lineIterator.hasNext()) { + throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path)); + } + String headerLine = lineIterator.next(); // read and skip header line + validateHeaderLine(path, headerLine); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false); } catch (final IOException e) { throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); } - } /** * Validates the CSV header for Calcite compatibility. - * Checks that the header is present, uses comma separators (not the data - * delimiter), and each column follows the 'name:type' format - * (e.g., 'id:int,name:string,email:string'). Note that Calcite hardcodes - * commas for header parsing, while data rows use Wayang's configurable - * separator (default ';'). + * Checks that each column follows the 'name:type' format + * (e.g., 'id:int,name:string,email:string') and that commas + * are used as the header separator. * - * @param path the filesystem path to the CSV file + * @param path the filesystem path to the CSV file + * @param headerLine the first line of the CSV file */ - private void validateHeaderLine(final String path) { - final FileSystem fileSystem = FileSystems.getFileSystem(path).orElseThrow( - () -> new IllegalStateException(String.format("No file system found for %s", path))); - try { - final Iterator lineIterator = createLineIterator(fileSystem, path); + private static void validateHeaderLine(final String path, final String headerLine) { + final String[] headerColumns = headerLine.split(","); // split header row into columns - if (!lineIterator.hasNext()) { - throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path)); + int colonCount = 0; + for (int i = 0; i < headerLine.length(); i++) { + if (headerLine.charAt(i) == ':') { + colonCount++; } - - final String headerLine = lineIterator.next(); // read header row - final String[] headerColumns = headerLine.split(","); // split header row into columns + } - if (headerColumns.length == 1 && headerLine.contains(String.valueOf(separator))) { + for (final String column : headerColumns) { + if (!column.trim().contains(":")) { throw new IllegalStateException(String.format( - "CSV file '%s': header uses '%s' as separator, but Calcite requires commas. " - + "Header: '%s'. " - + "Expected format: %s.", - sourcePath, separator, headerLine, headerLine.replace(String.valueOf(separator), ","))); + "CSV file '%s': header column '%s' missing required type. " + + "Expected 'name:type' format (e.g., 'id:int'). Header: '%s'.", + path, column.trim(), headerLine)); } + } - for (final String column : headerColumns) { - if (!column.trim().contains(":")) { - throw new IllegalStateException(String.format( - "CSV file '%s': header column '%s' missing required type. " - + "Expected 'name:type' format (e.g., 'id:int'). Full header: '%s'.", - sourcePath, column.trim(), headerLine)); - } - } - } catch (final IOException e) { - throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); + if (headerColumns.length != colonCount) { + throw new IllegalStateException(String.format( + "CSV file '%s': column count mismatch. Expected %d comma-separated 'name:type' columns " + + "but found %d. Header: '%s'.", + path, colonCount, headerColumns.length, headerLine)); } }