diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java index 8d42db250..e37b236e8 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java @@ -126,7 +126,10 @@ private Record parseLine(final String s) { final String[] tokens = CsvRowConverter.parseLine(s, this.separator); if (tokens.length != fieldTypes.size()) throw new IllegalStateException( - String.format("Error while parsing CSV file %s at line %s, using separator %s", sourcePath, s, separator)); + String.format( + "CSV file '%s': data row has %d columns but expected %d " + + "(separator '%s'). Line: '%s'.", + sourcePath, tokens.length, fieldTypes.size(), separator, s)); // now tokens.length == fieldtypes.size final Object[] objects = new Object[tokens.length]; @@ -168,12 +171,51 @@ private static Stream streamLines(final String path) { () -> new IllegalStateException(String.format("No file system found for %s", path))); try { final Iterator lineIterator = createLineIterator(fileSystem, path); - lineIterator.next(); // skip header row + if (!lineIterator.hasNext()) { + throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path)); + } + String headerLine = lineIterator.next(); // read and skip header line + validateHeaderLine(path, headerLine); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false); } catch (final IOException e) { throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e); } + } + /** + * Validates the CSV header for Calcite compatibility. + * Checks that each column follows the 'name:type' format + * (e.g., 'id:int,name:string,email:string') and that commas + * are used as the header separator. + * + * @param path the filesystem path to the CSV file + * @param headerLine the first line of the CSV file + */ + private static void validateHeaderLine(final String path, final String headerLine) { + final String[] headerColumns = headerLine.split(","); // split header row into columns + + int colonCount = 0; + for (int i = 0; i < headerLine.length(); i++) { + if (headerLine.charAt(i) == ':') { + colonCount++; + } + } + + for (final String column : headerColumns) { + if (!column.trim().contains(":")) { + throw new IllegalStateException(String.format( + "CSV file '%s': header column '%s' missing required type. " + + "Expected 'name:type' format (e.g., 'id:int'). Header: '%s'.", + path, column.trim(), headerLine)); + } + } + + if (headerColumns.length != colonCount) { + throw new IllegalStateException(String.format( + "CSV file '%s': column count mismatch. Expected %d comma-separated 'name:type' columns " + + "but found %d. Header: '%s'.", + path, colonCount, headerColumns.length, headerLine)); + } } /**