diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index c202d31d3da7..d6de6118da44 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -58,8 +58,9 @@ jobs: run: | source /opt/rh/gcc-toolset-11/enable sudo dnf install -y patchelf + sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec + cd velox4j && git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md index 6f98d3e14339..92dc6fbc373d 100644 --- a/gluten-flink/docs/Flink.md +++ b/gluten-flink/docs/Flink.md @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow ## fetch velox4j code git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j -git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec +git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true ``` **Get gluten** diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java index 3d68f333dc95..b49fafaecac5 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java @@ -80,6 +80,7 @@ public class RexCallConverterFactory { Map.entry("AND", Arrays.asList(() -> new DefaultRexCallConverter("and"))), Map.entry("SPLIT_INDEX", Arrays.asList(() -> new SplitIndexRexCallConverter())), Map.entry("SEARCH", Arrays.asList(() -> new DefaultRexCallConverter("in"))), + Map.entry("DATE_FORMAT", Arrays.asList(() -> new DefaultRexCallConverter("date_format"))), Map.entry( ">=", Arrays.asList( diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java new file mode 100644 index 000000000000..788ecb4a68b7 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.config; + +import io.github.zhztheplayer.velox4j.config.Config; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.api.config.TableConfigOptions; + +import java.util.HashMap; +import java.util.Map; + +public class VeloxQueryConfig { + + private static final String keyVeloxAdjustTimestampToSessionTimeZone = + "adjust_timestamp_to_session_timezone"; + private static final String keyVeloxSessionTimezone = "session_timezone"; + + public static Config getConfig(RuntimeContext context) { + if (!(context instanceof StreamingRuntimeContext)) { + return Config.empty(); + } + Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration(); + Map configMap = new HashMap<>(); + configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); + String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + // As flink's default timezone value is `default`, it is not a valid timezone id, so we should + // convert it to `UTC` timezone. + if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(localTimeZone)) { + configMap.put(keyVeloxSessionTimezone, "UTC"); + } else { + configMap.put(keyVeloxSessionTimezone, localTimeZone); + } + return Config.create(configMap); + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index b5c73b2b68a1..d8f6f1a0b268 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -17,10 +17,10 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; @@ -99,7 +99,9 @@ public void open() throws Exception { mockInput.addTarget(glutenPlan); LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - query = new Query(mockInput, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); ExternalStreamConnectorSplit split = diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index ddcd5cae9436..360eb39a848f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -16,10 +16,10 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; import io.github.zhztheplayer.velox4j.data.RowVector; @@ -92,7 +92,9 @@ public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); SerialTask task = session.queryOps().execute(query); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java index 5319c61b7992..e859e15ca37b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java @@ -17,9 +17,9 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; @@ -94,7 +94,9 @@ void initGlutenTask() { mockInput.addTarget(glutenPlan); LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - query = new Query(mockInput, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); task = session.queryOps().execute(query); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index a4d6745163f3..399211ec1b1d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -16,8 +16,9 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; + import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; import io.github.zhztheplayer.velox4j.iterator.UpIterator; @@ -98,7 +99,9 @@ public void open(Configuration parameters) throws Exception { if (memoryManager == null) { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); @@ -150,7 +153,9 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java index 04ed555a3b59..a5c4da5f0ceb 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java @@ -17,9 +17,9 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreams; @@ -94,11 +94,13 @@ public GlutenVectorTwoInputOperator( private void initGlutenTask() { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); task = session.queryOps().execute(query); LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName()); + LOG.debug("RuntimeContext: {}", getRuntimeContext().getClass().getName()); } @Override diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java index e8cbc00eb5a6..8d684fab52e3 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java @@ -107,6 +107,9 @@ private interface VLTypeConverter { Type valueType = toVLType(mapType.getValueType()); return io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType); }), + // Map the flink's `TimestampLTZ` type to velox `Timestamp` type. And the timezone would + // be specified by using flink's table config `LOCAL_TIME_ZONE`, which would be passed to + // velox's `session_timezone` config. // TODO: may need precision Map.entry( LocalZonedTimestampType.class, diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java index d8a50081f866..d96f48ecbe2f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java @@ -441,7 +441,6 @@ protected void setValue(int index, Integer value) { class StructVectorWriter extends BaseVectorWriter { private final int fieldCount; - private BufferAllocator allocator; private final List fieldWriters; public StructVectorWriter(Type fieldType, BufferAllocator allocator, FieldVector vector) { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java index e839806137e8..3abb06c5115d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java @@ -41,7 +41,6 @@ public static RowVector fromRowData( List fieldTypes = rowType.getChildren(); List fieldNames = rowType.getNames(); for (int i = 0; i < rowType.size(); i++) { - Type fieldType = rowType.getChildren().get(i); ArrowVectorWriter writer = ArrowVectorWriter.create(fieldNames.get(i), fieldTypes.get(i), allocator); writer.write(i, row); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index ff99550b81b7..51f1b42e853d 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -25,6 +25,10 @@ import org.junit.jupiter.api.Test; import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @@ -226,4 +230,68 @@ void testDecimal() { query = "select b + e as x from tblDecimal where a > 0"; runAndCheck(query, Arrays.asList("+I[2.0]", "+I[5.0]", "+I[7.0]")); } + + @Test + void testDateFormat() { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + List rows = + Arrays.asList( + Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)), + Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter))); + createSimpleBoundedValuesTable("timestampTable", "a int, b Timestamp(3)", rows); + String query = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampTable"; + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); + + rows = + Arrays.asList( + Row.of( + 1, LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)), + Row.of( + 2, + LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); + createSimpleBoundedValuesTable("timestampLtzTable", "a int, b Timestamp_LTZ(3)", rows); + query = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampLtzTable"; + tEnv().getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 2025-02-28 04:12:12]")); + + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]")); + + rows = + Arrays.asList( + Row.of( + 1, + LocalDateTime.parse("2024-12-31 12:12:12", formatter), + LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)), + Row.of( + 2, + LocalDateTime.parse("2025-02-28 12:12:12", formatter), + LocalDateTime.parse("2024-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); + createSimpleBoundedValuesTable( + "timestampTable0", "a int, b Timestamp(3), c Timestamp_LTZ(3)", rows); + query = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss'), DATE_FORMAT(c, 'yyyy-MM-dd HH:mm:ss') from timestampTable0"; + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31 12:12:12, 2024-12-31 20:12:12]", + "+I[2, 2025-02-28 12:12:12, 2024-02-28 20:12:12]")); + } }