Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ private Record parseLine(final String s) {
final String[] tokens = CsvRowConverter.parseLine(s, this.separator);
if (tokens.length != fieldTypes.size())
throw new IllegalStateException(
String.format("Error while parsing CSV file %s at line %s, using separator %s", sourcePath, s, separator));
String.format(
"CSV file '%s': data row has %d columns but expected %d "
+ "(separator '%s'). Line: '%s'.",
sourcePath, tokens.length, fieldTypes.size(), separator, s));
// now tokens.length == fieldtypes.size

final Object[] objects = new Object[tokens.length];
Expand Down Expand Up @@ -168,12 +171,51 @@ private static Stream<String> streamLines(final String path) {
() -> new IllegalStateException(String.format("No file system found for %s", path)));
try {
final Iterator<String> lineIterator = createLineIterator(fileSystem, path);
lineIterator.next(); // skip header row
if (!lineIterator.hasNext()) {
throw new IllegalStateException(String.format("CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",path));
}
String headerLine = lineIterator.next(); // read and skip header line
validateHeaderLine(path, headerLine);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false);
} catch (final IOException e) {
throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e);
}
}

/**
* Validates the CSV header for Calcite compatibility.
* Checks that each column follows the 'name:type' format
* (e.g., 'id:int,name:string,email:string') and that commas
* are used as the header separator.
*
* @param path the filesystem path to the CSV file
* @param headerLine the first line of the CSV file
*/
private static void validateHeaderLine(final String path, final String headerLine) {
final String[] headerColumns = headerLine.split(","); // split header row into columns

int colonCount = 0;
for (int i = 0; i < headerLine.length(); i++) {
if (headerLine.charAt(i) == ':') {
colonCount++;
}
}

for (final String column : headerColumns) {
if (!column.trim().contains(":")) {
throw new IllegalStateException(String.format(
"CSV file '%s': header column '%s' missing required type. "
+ "Expected 'name:type' format (e.g., 'id:int'). Header: '%s'.",
path, column.trim(), headerLine));
}
}

if (headerColumns.length != colonCount) {
throw new IllegalStateException(String.format(
"CSV file '%s': column count mismatch. Expected %d comma-separated 'name:type' columns "
+ "but found %d. Header: '%s'.",
path, colonCount, headerColumns.length, headerLine));
}
}

/**
Expand Down
Loading