diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java index 569384697..aeacbc019 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangTableScanVisitor.java @@ -23,7 +23,8 @@ import org.apache.wayang.api.sql.calcite.rel.WayangTableScan; import org.apache.wayang.api.sql.calcite.utils.ModelParser; -import org.apache.wayang.api.sql.sources.fs.JavaCSVTableSource; +import org.apache.wayang.java.operators.JavaCSVFileSource; +import org.apache.wayang.java.operators.CsvType; import org.apache.wayang.core.plan.wayangplan.Operator; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.jdbc.operators.JdbcTableSource; @@ -64,15 +65,36 @@ Operator visit(final WayangTableScan wayangRelNode) { "Could not initialize calcite model parser from current Wayang configuration"); } - final List fieldTypes = wayangRelNode.getRowType().getFieldList().stream() + final List fieldTypes = wayangRelNode.getRowType().getFieldList().stream() .map(RelDataTypeField::getType) + .map(type -> { + switch (type.getSqlTypeName()) { + case BOOLEAN: return CsvType.BOOLEAN; + case TINYINT: return CsvType.TINYINT; + case SMALLINT: return CsvType.SMALLINT; + case INTEGER: return CsvType.INTEGER; + case BIGINT: return CsvType.BIGINT; + case FLOAT: return CsvType.FLOAT; + case DOUBLE: return CsvType.DOUBLE; + case DECIMAL: return CsvType.DECIMAL; + case DATE: return CsvType.DATE; + case TIME: return CsvType.TIME; + case TIMESTAMP: return CsvType.TIMESTAMP; + default: return CsvType.STRING; + } + }) .collect(Collectors.toList()); + - final String url = String.format("file:/%s/%s.csv", modelParser.getFsPath(), wayangRelNode.getTableName()); + final String url = String.format( + "file:/%s/%s.csv", + modelParser.getFsPath(), + wayangRelNode.getTableName() + ); final char separator = modelParser.getSchemaDelimiter(tableSource); - return new JavaCSVTableSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator); + return new JavaCSVFileSource<>(url, DataSetType.createDefault(Record.class), fieldTypes, separator); } else if (wayangRelNode.getTable().getQualifiedName().size() == 1) { // we assume that it is coming from a test environement or in memory db. diff --git a/wayang-platforms/wayang-java/pom.xml b/wayang-platforms/wayang-java/pom.xml index ce15decfe..f6dc09206 100644 --- a/wayang-platforms/wayang-java/pom.xml +++ b/wayang-platforms/wayang-java/pom.xml @@ -38,6 +38,11 @@ + + net.sf.opencsv + opencsv + 2.3 + org.apache.wayang wayang-core diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java old mode 100755 new mode 100644 similarity index 89% rename from wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java rename to wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java index 4d8682b1b..cc74066b2 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvRowConverter.java @@ -15,19 +15,17 @@ * limitations under the License. */ -package org.apache.wayang.api.sql.sources.fs; +package org.apache.wayang.java.operators; import au.com.bytecode.opencsv.CSVParser; -import org.apache.calcite.avatica.util.DateTimeUtils; -import org.apache.calcite.rel.type.RelDataType; import org.apache.commons.lang3.time.FastDateFormat; import java.io.IOException; import java.math.BigDecimal; -import java.math.RoundingMode; +//import java.math.RoundingMode; import java.text.ParseException; import java.util.Date; -import java.util.Locale; +//import java.util.Locale; import java.util.TimeZone; /** @@ -57,11 +55,11 @@ public class CsvRowConverter { - public static Object convert(RelDataType fieldType, String string) { + public static Object convert(CsvType fieldType, String string) { if (fieldType == null || string == null) { return string; } - switch (fieldType.getSqlTypeName()) { + switch (fieldType) { case BOOLEAN: if (string.length() == 0) { return null; @@ -101,14 +99,14 @@ public static Object convert(RelDataType fieldType, String string) { if (string.length() == 0) { return null; } - return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string); + return new BigDecimal(string); case DATE: if (string.length() == 0) { return null; } try { Date date = TIME_FORMAT_DATE.parse(string); - return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY); + return (int) (date.getTime() / (24 * 60 * 60 * 1000)); } catch (ParseException e) { return null; } @@ -132,13 +130,13 @@ public static Object convert(RelDataType fieldType, String string) { } catch (ParseException e) { return null; } - case VARCHAR: + case STRING: default: return string; } } - private static BigDecimal parseDecimal(int precision, int scale, String string) { + /**private static BigDecimal parseDecimal(int precision, int scale, String string) { BigDecimal result = new BigDecimal(string); // If the parsed value has more fractional digits than the specified scale, round ties away // from 0. @@ -148,7 +146,7 @@ private static BigDecimal parseDecimal(int precision, int scale, String string) "Decimal value {} exceeds declared scale ({}). Performing rounding to keep the " + "first {} fractional digits.", result, scale, scale);*/ - result = result.setScale(scale, RoundingMode.HALF_UP); + /*result = result.setScale(scale, RoundingMode.HALF_UP); } // Throws an exception if the parsed value has more digits to the left of the decimal point // than the specified value. @@ -158,7 +156,7 @@ private static BigDecimal parseDecimal(int precision, int scale, String string) result, precision, scale)); } return result; - } + }*/ public static String[] parseLine(String s) throws IOException { diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java new file mode 100644 index 000000000..8fb014cc5 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/CsvType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +public enum CsvType { + BOOLEAN, + TINYINT, + SMALLINT, + INTEGER, + BIGINT, + FLOAT, + DOUBLE, + DECIMAL, + DATE, + TIME, + TIMESTAMP, + STRING +} \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java old mode 100755 new mode 100644 similarity index 94% rename from wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java rename to wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java index e37b236e8..5631a0136 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaCSVFileSource.java @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.wayang.api.sql.sources.fs; - -import org.apache.calcite.rel.type.RelDataType; +package org.apache.wayang.java.operators; import org.apache.commons.io.IOUtils; @@ -38,6 +36,7 @@ import org.apache.wayang.java.channels.StreamChannel; import org.apache.wayang.java.execution.JavaExecutor; import org.apache.wayang.java.operators.JavaExecutionOperator; +import org.apache.wayang.java.operators.CsvRowConverter; import java.io.BufferedReader; import java.io.IOException; @@ -51,11 +50,11 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -public class JavaCSVTableSource extends UnarySource implements JavaExecutionOperator { +public class JavaCSVFileSource extends UnarySource implements JavaExecutionOperator { private final String sourcePath; - private final List fieldTypes; + private final List fieldTypes; private final char separator; // Default separator // TODO: incorporate fields later for projectable table scans @@ -63,12 +62,12 @@ public class JavaCSVTableSource extends UnarySource implements JavaExecuti /** * Table source with default seperator ';'

- * See {@link #JavaCSVTableSource(String, DataSetType, List, char)} for custom seperator + * See {@link #JavaCSVFileSource(String, DataSetType, List, char)} for custom seperator * @param sourcePath * @param type * @param fieldTypes */ - public JavaCSVTableSource(final String sourcePath, final DataSetType type, final List fieldTypes) { + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes) { super(type); this.sourcePath = sourcePath; this.fieldTypes = fieldTypes; @@ -83,7 +82,7 @@ public JavaCSVTableSource(final String sourcePath, final DataSetType type, fi * @param fieldTypes * @param separator */ - public JavaCSVTableSource(final String sourcePath, final DataSetType type, final List fieldTypes, + public JavaCSVFileSource(final String sourcePath, final DataSetType type, final List fieldTypes, final char separator) { super(type); this.sourcePath = sourcePath;