From 5756855031b6384c3151b2cb87055dfdc650c2ec Mon Sep 17 00:00:00 2001 From: Chandana K T Date: Sat, 21 Feb 2026 09:37:09 +0000 Subject: [PATCH 1/2] Move JavaCSVTableSource to Java platform and rename to JavaCSVFileSource --- .../sql/calcite/converter/WayangTableScanVisitor.java | 4 ++-- wayang-platforms/wayang-java/pom.xml | 10 ++++++++++ .../wayang/java/operators}/CsvRowConverter.java | 2 +- .../wayang/java/operators/JavaCSVFileSource.java | 11 ++++++----- 4 files changed, 19 insertions(+), 8 deletions(-) rename {wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs => wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators}/CsvRowConverter.java (99%) mode change 100755 => 100644 rename wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java => wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java (94%) mode change 100755 => 100644 diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java index 569384697..8b308660c 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java @@ -23,7 +23,7 @@ import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; import org.apache.wayang.api.sql.calcite.utils.ModelParser; -import org.apache.wayang.api.sql.sources.fs.JavaCSVTableSource; +import org.apache.wayang.java.operators.JavaCSVFileSource; import org.apache.wayang.core.plan.wayangplan.Operator; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.jdbc.operators.JdbcTableSource; @@ -72,7 +72,7 @@ Operator visit(final WayangTableScan wayangRelNode) { final char separator = modelParser.getSchemaDelimiter(tableSource); - return new JavaCSVTableSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator); + return new JavaCSVFileSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator); } else if (wayangRelNode.getTable().getQualifiedName().size() == 1) { // we assume that it is coming from a test environement or in memory db. diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index ce15decfe..23cdd8062 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -38,6 +38,16 @@ + + net.sf.opencsv + opencsv + 2.3 + + + org.apache.calcite + calcite-core + ${calcite.version} + org.apache.wayang wayang-core diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java old mode 100755 new mode 100644 similarity index 99% rename from wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java rename to wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java index 4d8682b1b..7c925c56c --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.wayang.api.sql.sources.fs; +package org.apache.wayang.java.operators; import au.com.bytecode.opencsv.CSVParser; import org.apache.calcite.avatica.util.DateTimeUtils; diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java old mode 100755 new mode 100644 similarity index 94% rename from wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java rename to wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java index e37b236e8..74d53e719 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.wayang.api.sql.sources.fs; +package org.apache.wayang.java.operators; import org.apache.calcite.rel.type.RelDataType; @@ -38,6 +38,7 @@ import org.apache.wayang.java.channels.StreamChannel; import org.apache.wayang.java.execution.JavaExecutor; import org.apache.wayang.java.operators.JavaExecutionOperator; +import org.apache.wayang.java.operators.CsvRowConverter; import java.io.BufferedReader; import java.io.IOException; @@ -51,7 +52,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -public class JavaCSVTableSource extends UnarySource implements JavaExecutionOperator { +public class JavaCSVFileSource extends UnarySource implements JavaExecutionOperator { private final String sourcePath; @@ -63,12 +64,12 @@ public class JavaCSVTableSource extends UnarySource implements JavaExecuti /** * Table source with default seperator ';'

- * See {@link #JavaCSVTableSource(String, DataSetType, List, char)} for custom seperator + * See {@link #JavaCSVFileSource(String, DataSetType, List, char)} for custom seperator * @param sourcePath * @param type * @param fieldTypes */ - public JavaCSVTableSource(final String sourcePath, final DataSetType type, final List fieldTypes) { + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes) { super(type); this.sourcePath = sourcePath; this.fieldTypes = fieldTypes; @@ -83,7 +84,7 @@ public JavaCSVTableSource(final String sourcePath, final DataSetType type, fi * @param fieldTypes * @param separator */ - public JavaCSVTableSource(final String sourcePath, final DataSetType type, final List fieldTypes, + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes, final char separator) { super(type); this.sourcePath = sourcePath; From 088ed831fac23602f31eba4149416543648b2adc Mon Sep 17 00:00:00 2001 From: Chandana K T Date: Mon, 23 Feb 2026 18:27:57 +0000 Subject: [PATCH 2/2] Remove Calcite dependency from wayang-java by mapping RelDataType in SQL module --- .../converter/WayangTableScanVisitor.java | 26 +++++++++++++-- wayang-platforms/wayang-java/pom.xml | 5 --- .../java/operators/CsvRowConverter.java | 22 ++++++------- .../apache/wayang/java/operators/CsvType.java | 33 +++++++++++++++++++ .../java/operators/JavaCSVFileSource.java | 8 ++--- 5 files changed, 70 insertions(+), 24 deletions(-) create mode 100644 wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java index 8b308660c..aeacbc019 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java @@ -24,6 +24,7 @@ import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; import org.apache.wayang.api.sql.calcite.utils.ModelParser; import org.apache.wayang.java.operators.JavaCSVFileSource; +import org.apache.wayang.java.operators.CsvType; import org.apache.wayang.core.plan.wayangplan.Operator; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.jdbc.operators.JdbcTableSource; @@ -64,11 +65,32 @@ Operator visit(final WayangTableScan wayangRelNode) { "Could not initialize calcite model parser from current Wayang configuration"); } - final List fieldTypes = wayangRelNode.getRowType().getFieldList().stream() + final List fieldTypes = wayangRelNode.getRowType().getFieldList().stream() .map(RelDataTypeField::getType) + .map(type -> { + switch (type.getSqlTypeName()) { + case BOOLEAN: return CsvType.BOOLEAN; + case TINYINT: return CsvType.TINYINT; + case SMALLINT: return CsvType.SMALLINT; + case INTEGER: return CsvType.INTEGER; + case BIGINT: return CsvType.BIGINT; + case FLOAT: return CsvType.FLOAT; + case DOUBLE: return CsvType.DOUBLE; + case DECIMAL: return CsvType.DECIMAL; + case DATE: return CsvType.DATE; + case TIME: return CsvType.TIME; + case TIMESTAMP: return CsvType.TIMESTAMP; + default: return CsvType.STRING; + } + }) .collect(Collectors.toList()); + - final String url = String.format("file:/%s/%s.csv", modelParser.getFsPath(), wayangRelNode.getTableName()); + final String url = String.format( + "file:/%s/%s.csv", + modelParser.getFsPath(), + wayangRelNode.getTableName() + ); final char separator = modelParser.getSchemaDelimiter(tableSource); diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index 23cdd8062..f6dc09206 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -43,11 +43,6 @@ opencsv 2.3 - - org.apache.calcite - calcite-core - ${calcite.version} - org.apache.wayang wayang-core diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java index 7c925c56c..cc74066b2 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java @@ -18,16 +18,14 @@ package org.apache.wayang.java.operators; import au.com.bytecode.opencsv.CSVParser; -import org.apache.calcite.avatica.util.DateTimeUtils; -import org.apache.calcite.rel.type.RelDataType; import org.apache.commons.lang3.time.FastDateFormat; import java.io.IOException; import java.math.BigDecimal; -import java.math.RoundingMode; +//import java.math.RoundingMode; import java.text.ParseException; import java.util.Date; -import java.util.Locale; +//import java.util.Locale; import java.util.TimeZone; /** @@ -57,11 +55,11 @@ public class CsvRowConverter { - public static Object convert(RelDataType fieldType, String string) { + public static Object convert(CsvType fieldType, String string) { if (fieldType == null || string == null) { return string; } - switch (fieldType.getSqlTypeName()) { + switch (fieldType) { case BOOLEAN: if (string.length() == 0) { return null; @@ -101,14 +99,14 @@ public static Object convert(RelDataType fieldType, String string) { if (string.length() == 0) { return null; } - return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string); + return new BigDecimal(string); case DATE: if (string.length() == 0) { return null; } try { Date date = TIME_FORMAT_DATE.parse(string); - return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY); + return (int) (date.getTime() / (24 * 60 * 60 * 1000)); } catch (ParseException e) { return null; } @@ -132,13 +130,13 @@ public static Object convert(RelDataType fieldType, String string) { } catch (ParseException e) { return null; } - case VARCHAR: + case STRING: default: return string; } } - private static BigDecimal parseDecimal(int precision, int scale, String string) { + /**private static BigDecimal parseDecimal(int precision, int scale, String string) { BigDecimal result = new BigDecimal(string); // If the parsed value has more fractional digits than the specified scale, round ties away // from 0. @@ -148,7 +146,7 @@ private static BigDecimal parseDecimal(int precision, int scale, String string) "Decimal value {} exceeds declared scale ({}). Performing rounding to keep the " + "first {} fractional digits.", result, scale, scale);*/ - result = result.setScale(scale, RoundingMode.HALF_UP); + /*result = result.setScale(scale, RoundingMode.HALF_UP); } // Throws an exception if the parsed value has more digits to the left of the decimal point // than the specified value. @@ -158,7 +156,7 @@ private static BigDecimal parseDecimal(int precision, int scale, String string) result, precision, scale)); } return result; - } + }*/ public static String[] parseLine(String s) throws IOException { diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java new file mode 100644 index 000000000..8fb014cc5 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +public enum CsvType { + BOOLEAN, + TINYINT, + SMALLINT, + INTEGER, + BIGINT, + FLOAT, + DOUBLE, + DECIMAL, + DATE, + TIME, + TIMESTAMP, + STRING +} \ No newline at end of file diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java index 74d53e719..5631a0136 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java @@ -17,8 +17,6 @@ package org.apache.wayang.java.operators; -import org.apache.calcite.rel.type.RelDataType; - import org.apache.commons.io.IOUtils; import org.apache.wayang.basic.channels.FileChannel; @@ -56,7 +54,7 @@ public class JavaCSVFileSource extends UnarySource implements JavaExecutio private final String sourcePath; - private final List fieldTypes; + private final List fieldTypes; private final char separator; // Default separator // TODO: incorporate fields later for projectable table scans @@ -69,7 +67,7 @@ public class JavaCSVFileSource extends UnarySource implements JavaExecutio * @param type * @param fieldTypes */ - public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes) { + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes) { super(type); this.sourcePath = sourcePath; this.fieldTypes = fieldTypes; @@ -84,7 +82,7 @@ public JavaCSVFileSource(final String sourcePath, final DataSetType type, fin * @param fieldTypes * @param separator */ - public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes, + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes, final char separator) { super(type); this.sourcePath = sourcePath;