From 2dd79f283c1ac3eb535b3e8bd1566e331025654a Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 23 Jul 2025 03:53:24 +0000 Subject: [PATCH 01/23] Support date_format function --- .../rexnode/functions/RexCallConverterFactory.java | 4 +++- .../runtime/stream/custom/ScalarFunctionsTest.java | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java index 8cf2a4e4cf4a..9da1bdadd78e 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java @@ -77,7 +77,9 @@ public class RexCallConverterFactory { Map.entry("CAST", Arrays.asList(() -> new DefaultRexCallConverter("cast"))), Map.entry("CASE", Arrays.asList(() -> new DefaultRexCallConverter("if"))), Map.entry("AND", Arrays.asList(() -> new DefaultRexCallConverter("and"))), - Map.entry("SEARCH", Arrays.asList(() -> new DefaultRexCallConverter("in")))); + Map.entry("SEARCH", Arrays.asList(() -> new DefaultRexCallConverter("in"))), + Map.entry( + "DATE_FORMAT", Arrays.asList(() -> new DefaultRexCallConverter("date_format")))); public static RexCallConverter getConverter(RexCall callNode, RexConversionContext context) { String operatorName = callNode.getOperator().getName(); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index 1bc5b09f1705..ff7e5b285beb 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -169,4 +169,14 @@ void testDecimal() { query = "select b + e as x from tblDecimal where a > 0"; runAndCheck(query, Arrays.asList("+I[2.0]", "+I[5.0]", "+I[7.0]")); } + + @Test + void testDateFormat() { + List rows = + Arrays.asList(Row.of(1, "2024-12-31 12:12:12"), Row.of(2, "2025-02-28 12:12:12")); + createSimpleBoundedValuesTable("dateFormatTbl", "a int, b string", rows); + String query = + "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd') from dateFormatTbl"; + runAndCheck(query, Arrays.asList("+I[1, 2024-12-31]", "+I[2, 2025-02-28]")); + } } From 3ad14c9a05b1534e8bb35287664f2d28d5662023 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Thu, 24 Jul 2025 09:38:27 +0000 Subject: [PATCH 02/23] consider about timezone --- .../functions/ModRexCallConverter.java | 1 - .../runtime/config/VeloxQueryConfig.java | 59 +++++++++++++++++++ .../operators/GlutenSingleInputOperator.java | 6 +- .../operators/GlutenSourceFunction.java | 6 +- .../operators/GlutenVectorSourceFunction.java | 7 ++- .../GlutenVectorTwoInputOperator.java | 6 +- .../FlinkRowToVLVectorConvertor.java | 1 - .../common/GlutenStreamingTestBase.java | 8 +++ .../stream/custom/ScalarFunctionsTest.java | 21 ++++++- 9 files changed, 103 insertions(+), 12 deletions(-) create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/ModRexCallConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/ModRexCallConverter.java index 9d96a1dd4457..b66235d8d6ab 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/ModRexCallConverter.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/ModRexCallConverter.java @@ -55,7 +55,6 @@ public ValidationResult isSuitable(RexCall callNode, RexConversionContext contex @Override public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) { List params = getParams(callNode, context); - List alignedParams = TypeUtils.promoteTypeForArithmeticExpressions(params); // Use the divisor's type as the result type Type resultType = params.get(1).getReturnType(); return new CallTypedExpr(resultType, params, functionName); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java new file mode 100644 index 000000000000..9c61397f4567 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.config; + +import io.github.zhztheplayer.velox4j.config.Config; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.api.config.TableConfigOptions; + +import java.util.HashMap; +import java.util.Map; + +public class VeloxQueryConfig { + + public static ConfigOption ADJUST_TIMESTMP_TO_SESSION_TIMEZONE = + ConfigOptions.key("velox.adjust_timestamp_to_session_timezone") + .booleanType() + .defaultValue(false) + .withDescription( + "adjust the timestamp accroding to the given session timezone in the velox backend"); + + private static final String keyVeloxAdjustTimestampToSessionTimeZone = + "adjust_timestamp_to_session_timezone"; + private static final String keyVeloxSessionTimezone = "session_timezone"; + + public static Config getConfig(RuntimeContext context) { + if (!(context instanceof StreamingRuntimeContext)) { + return Config.empty(); + } + Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration(); + Map configMap = new HashMap<>(); + if (config.get(ADJUST_TIMESTMP_TO_SESSION_TIMEZONE)) { + String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); + configMap.put(keyVeloxSessionTimezone, localTimeZone); + } else { + configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "false"); + } + return Config.create(configMap); + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java index 0bec1bd886d4..5266281794b8 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java @@ -17,10 +17,10 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; @@ -99,7 +99,9 @@ public void open() throws Exception { mockInput.addTarget(glutenPlan); LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - query = new Query(mockInput, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); ExternalStreamConnectorSplit split = diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index ddcd5cae9436..360eb39a848f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -16,10 +16,10 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; import io.github.zhztheplayer.velox4j.data.RowVector; @@ -92,7 +92,9 @@ public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); SerialTask task = session.queryOps().execute(query); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index bd0cb79d5626..aee5d8e14905 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -16,8 +16,9 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; + import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; import io.github.zhztheplayer.velox4j.iterator.UpIterator; @@ -91,7 +92,9 @@ public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); SerialTask task = session.queryOps().execute(query); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java index 0982d38b138e..d1e90b81d998 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java @@ -17,10 +17,10 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreams; @@ -104,7 +104,9 @@ public void open() throws Exception { rightInputQueue = session.externalStreamOps().newBlockingQueue(); LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); ExternalStreamConnectorSplit leftSplit = diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java index e839806137e8..3abb06c5115d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java @@ -41,7 +41,6 @@ public static RowVector fromRowData( List fieldTypes = rowType.getChildren(); List fieldNames = rowType.getNames(); for (int i = 0; i < rowType.size(); i++) { - Type fieldType = rowType.getChildren().get(i); ArrowVectorWriter writer = ArrowVectorWriter.create(fieldNames.get(i), fieldTypes.get(i), allocator); writer.write(i, row); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index 84aae4740291..d7f26f1c0d6d 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -81,4 +82,11 @@ protected void runAndCheck(String query, List expected) { .collect(Collectors.toList()); assertThat(actual).isEqualTo(expected); } + + protected void runAndCheck(String query, List expected, Map configs) { + for (String key : configs.keySet()) { + tEnv().getConfig().set(key, configs.get(key)); + } + runAndCheck(query, expected); + } } diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index ff7e5b285beb..6bf14d0299d8 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -16,8 +16,10 @@ */ package org.apache.gluten.table.runtime.stream.custom; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; @@ -26,6 +28,7 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.List; +import java.util.Map; class ScalarFunctionsTest extends GlutenStreamingTestBase { @@ -176,7 +179,21 @@ void testDateFormat() { Arrays.asList(Row.of(1, "2024-12-31 12:12:12"), Row.of(2, "2025-02-28 12:12:12")); createSimpleBoundedValuesTable("dateFormatTbl", "a int, b string", rows); String query = - "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd') from dateFormatTbl"; - runAndCheck(query, Arrays.asList("+I[1, 2024-12-31]", "+I[2, 2025-02-28]")); + "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd'), DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); + Map configs = + Map.of( + VeloxQueryConfig.ADJUST_TIMESTMP_TO_SESSION_TIMEZONE.key(), + "true", + TableConfigOptions.LOCAL_TIME_ZONE.key(), + "America/Los_Angeles"); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]"), + configs); } } From 2573bbc7b181a287bdb9f9fb0259fe7e38811d57 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Thu, 24 Jul 2025 11:25:52 +0000 Subject: [PATCH 03/23] fix ci --- .../apache/gluten/table/runtime/config/VeloxQueryConfig.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 9c61397f4567..7a499df89f33 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -51,8 +51,6 @@ public static Config getConfig(RuntimeContext context) { String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); configMap.put(keyVeloxSessionTimezone, localTimeZone); - } else { - configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "false"); } return Config.create(configMap); } From 57471ad80f12e7c9e5636de702b65eff939e4711 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 30 Jul 2025 02:14:00 +0000 Subject: [PATCH 04/23] fix review --- .../table/runtime/config/VeloxQueryConfig.java | 17 +++-------------- .../gluten/vectorized/ArrowVectorWriter.java | 1 - .../stream/custom/ScalarFunctionsTest.java | 7 +------ 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 7a499df89f33..9d54603f1e86 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -19,8 +19,6 @@ import io.github.zhztheplayer.velox4j.config.Config; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.api.config.TableConfigOptions; @@ -30,13 +28,6 @@ public class VeloxQueryConfig { - public static ConfigOption ADJUST_TIMESTMP_TO_SESSION_TIMEZONE = - ConfigOptions.key("velox.adjust_timestamp_to_session_timezone") - .booleanType() - .defaultValue(false) - .withDescription( - "adjust the timestamp accroding to the given session timezone in the velox backend"); - private static final String keyVeloxAdjustTimestampToSessionTimeZone = "adjust_timestamp_to_session_timezone"; private static final String keyVeloxSessionTimezone = "session_timezone"; @@ -47,11 +38,9 @@ public static Config getConfig(RuntimeContext context) { } Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration(); Map configMap = new HashMap<>(); - if (config.get(ADJUST_TIMESTMP_TO_SESSION_TIMEZONE)) { - String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); - configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); - configMap.put(keyVeloxSessionTimezone, localTimeZone); - } + String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); + configMap.put(keyVeloxSessionTimezone, localTimeZone); return Config.create(configMap); } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java index d8a50081f866..d96f48ecbe2f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java @@ -441,7 +441,6 @@ protected void setValue(int index, Integer value) { class StructVectorWriter extends BaseVectorWriter { private final int fieldCount; - private BufferAllocator allocator; private final List fieldWriters; public StructVectorWriter(Type fieldType, BufferAllocator allocator, FieldVector vector) { diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index 6bf14d0299d8..1c1fc4af006b 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -16,7 +16,6 @@ */ package org.apache.gluten.table.runtime.stream.custom; -import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase; import org.apache.flink.table.api.config.TableConfigOptions; @@ -185,11 +184,7 @@ void testDateFormat() { Arrays.asList( "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); Map configs = - Map.of( - VeloxQueryConfig.ADJUST_TIMESTMP_TO_SESSION_TIMEZONE.key(), - "true", - TableConfigOptions.LOCAL_TIME_ZONE.key(), - "America/Los_Angeles"); + Map.of(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); runAndCheck( query, Arrays.asList( From b7e055ae63c393cdf3cbbb984e15497a32750866 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 30 Jul 2025 04:04:18 +0000 Subject: [PATCH 05/23] run and check timezone set error --- .../runtime/config/VeloxQueryConfig.java | 28 ++++++++++++++++++- .../operators/GlutenSingleInputOperator.java | 4 ++- .../common/GlutenStreamingTestBase.java | 17 +++++++++++ .../stream/custom/ScalarFunctionsTest.java | 12 ++++---- 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 9d54603f1e86..37b35e076cc8 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -19,6 +19,8 @@ import io.github.zhztheplayer.velox4j.config.Config; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.api.config.TableConfigOptions; @@ -28,6 +30,12 @@ public class VeloxQueryConfig { + private static final ConfigOption CONFIG_USED_FOR_TEST = + ConfigOptions.key("config.used-for-test") + .booleanType() + .defaultValue(false) + .withDescription("A config to identify whether the config is used for test."); + private static final String keyVeloxAdjustTimestampToSessionTimeZone = "adjust_timestamp_to_session_timezone"; private static final String keyVeloxSessionTimezone = "session_timezone"; @@ -37,10 +45,28 @@ public static Config getConfig(RuntimeContext context) { return Config.empty(); } Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration(); + if (isConfigUsedForTest(config)) { + return getConfigForTest(config); + } Map configMap = new HashMap<>(); + configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(localTimeZone)) { + configMap.put(keyVeloxSessionTimezone, "UTC"); + } else { + configMap.put(keyVeloxSessionTimezone, localTimeZone); + } + return Config.create(configMap); + } + + private static boolean isConfigUsedForTest(Configuration config) { + return config.get(CONFIG_USED_FOR_TEST); + } + + private static Config getConfigForTest(Configuration config) { + Map configMap = new HashMap<>(); configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); - configMap.put(keyVeloxSessionTimezone, localTimeZone); + configMap.put(keyVeloxSessionTimezone, config.get(TableConfigOptions.LOCAL_TIME_ZONE)); return Config.create(configMap); } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java index 5266281794b8..037a4b4bc261 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java @@ -135,7 +135,9 @@ public void processElement(StreamRecord element) { @Override public void close() throws Exception { inputQueue.close(); - task.close(); + if (task != null) { + task.close(); + } session.close(); memoryManager.close(); allocator.close(); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index d7f26f1c0d6d..705d5649f1fa 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -21,6 +21,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.BeforeAll; @@ -89,4 +90,20 @@ protected void runAndCheck(String query, List expected, Map configs) { + for (String key : configs.keySet()) { + tEnv().getConfig().set(key, configs.get(key)); + } + boolean errMatches = false; + try { + CloseableIterator rows = tEnv().executeSql(query).collect(); + while (rows.hasNext()) { + rows.next(); + } + } catch (Exception e) { + errMatches = true; + } + assertThat(errMatches).isEqualTo(true); + } } diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index 1c1fc4af006b..99dea3a662d6 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -26,6 +26,7 @@ import java.math.BigDecimal; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -179,12 +180,11 @@ void testDateFormat() { createSimpleBoundedValuesTable("dateFormatTbl", "a int, b string", rows); String query = "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd'), DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; - runAndCheck( - query, - Arrays.asList( - "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); - Map configs = - Map.of(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); + Map configs = new HashMap<>(); + configs.put("config.used-for-test", "true"); + configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "default"); + runAndCheckException(query, configs); + configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); runAndCheck( query, Arrays.asList( From 86da02cb49586a81cd703d897e9c59f0bd3cf17e Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 30 Jul 2025 04:08:03 +0000 Subject: [PATCH 06/23] check error --- .../runtime/stream/common/GlutenStreamingTestBase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index 705d5649f1fa..8d170dfc6ea4 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -95,15 +95,15 @@ protected void runAndCheckException(String query, Map configs) { for (String key : configs.keySet()) { tEnv().getConfig().set(key, configs.get(key)); } - boolean errMatches = false; + boolean err = false; try { CloseableIterator rows = tEnv().executeSql(query).collect(); while (rows.hasNext()) { rows.next(); } } catch (Exception e) { - errMatches = true; + err = true; } - assertThat(errMatches).isEqualTo(true); + assertThat(err).isEqualTo(true); } } From 9700c1f8302f2cc6a809160667d31720c91b52c7 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 27 Aug 2025 09:05:41 +0000 Subject: [PATCH 07/23] fix reviews --- .../functions/RexCallConverterFactory.java | 6 ++--- .../runtime/config/VeloxQueryConfig.java | 22 ------------------- .../GlutenVectorTwoInputOperator.java | 5 ----- .../stream/custom/ScalarFunctionsTest.java | 1 - 4 files changed, 3 insertions(+), 31 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java index 143ee104c964..1213c7bc4f95 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java @@ -78,21 +78,21 @@ public class RexCallConverterFactory { Map.entry("CASE", Arrays.asList(() -> new DefaultRexCallConverter("if"))), Map.entry("AND", Arrays.asList(() -> new DefaultRexCallConverter("and"))), Map.entry("SEARCH", Arrays.asList(() -> new DefaultRexCallConverter("in"))), + Map.entry("DATE_FORMAT", Arrays.asList(() -> new DefaultRexCallConverter("date_format"))), Map.entry( - "DATE_FORMAT", Arrays.asList(() -> new DefaultRexCallConverter("date_format"))), ">=", Arrays.asList( () -> new BasicArithmeticOperatorRexCallConverter("greaterthanorequal"), () -> new StringCompareRexCallConverter("greaterthanorequal"), () -> new StringNumberCompareRexCallConverter("greaterthanorequal"), - () -> new TimestampIntervalRexCallConverter("greaterthanorequal")), + () -> new TimestampIntervalRexCallConverter("greaterthanorequal"))), Map.entry( "<=", Arrays.asList( () -> new BasicArithmeticOperatorRexCallConverter("lessthanorequal"), () -> new StringCompareRexCallConverter("lessthanorequal"), () -> new StringNumberCompareRexCallConverter("lessthanorequal"), - () -> new TimestampIntervalRexCallConverter("lessthanorequal"))); + () -> new TimestampIntervalRexCallConverter("lessthanorequal")))); public static RexCallConverter getConverter(RexCall callNode, RexConversionContext context) { String operatorName = callNode.getOperator().getName(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 37b35e076cc8..9ddb4b95e14c 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -19,8 +19,6 @@ import io.github.zhztheplayer.velox4j.config.Config; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.api.config.TableConfigOptions; @@ -30,12 +28,6 @@ public class VeloxQueryConfig { - private static final ConfigOption CONFIG_USED_FOR_TEST = - ConfigOptions.key("config.used-for-test") - .booleanType() - .defaultValue(false) - .withDescription("A config to identify whether the config is used for test."); - private static final String keyVeloxAdjustTimestampToSessionTimeZone = "adjust_timestamp_to_session_timezone"; private static final String keyVeloxSessionTimezone = "session_timezone"; @@ -45,9 +37,6 @@ public static Config getConfig(RuntimeContext context) { return Config.empty(); } Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration(); - if (isConfigUsedForTest(config)) { - return getConfigForTest(config); - } Map configMap = new HashMap<>(); configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); @@ -58,15 +47,4 @@ public static Config getConfig(RuntimeContext context) { } return Config.create(configMap); } - - private static boolean isConfigUsedForTest(Configuration config) { - return config.get(CONFIG_USED_FOR_TEST); - } - - private static Config getConfigForTest(Configuration config) { - Map configMap = new HashMap<>(); - configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); - configMap.put(keyVeloxSessionTimezone, config.get(TableConfigOptions.LOCAL_TIME_ZONE)); - return Config.create(configMap); - } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java index fc470fe8794d..9ab0a325c7bc 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java @@ -18,7 +18,6 @@ import org.apache.gluten.streaming.api.operators.GlutenOperator; import org.apache.gluten.table.runtime.config.VeloxQueryConfig; -import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; @@ -103,14 +102,10 @@ public void open() throws Exception { rightInputQueue = session.externalStreamOps().newBlockingQueue(); LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan)); LOG.debug("OutTypes: {}", outputTypes.keySet()); -<<<<<<< HEAD query = new Query( glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); -======= LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName()); - query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty()); ->>>>>>> main allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); ExternalStreamConnectorSplit leftSplit = diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index aa8909d9e0a0..f3ae3b957938 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -183,7 +183,6 @@ void testDateFormat() { String query = "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd'), DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; Map configs = new HashMap<>(); - configs.put("config.used-for-test", "true"); configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "default"); runAndCheckException(query, configs); configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); From b07357a58cb7b342c70ba68173fee6e4493a10a9 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 1 Sep 2025 03:21:42 +0000 Subject: [PATCH 08/23] test on remove arrow.memory.core from flink.yml --- .github/workflows/flink.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index ed579c0729f3..c165c6255004 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -24,12 +24,12 @@ on: env: MAVEN_OPTS: >- -Xmx2g - --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED JAVA_TOOL_OPTIONS: >- - --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED jobs: From 8b95500f4ec47700c3c5726f1efe7b85e3690e32 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 1 Sep 2025 06:12:50 +0000 Subject: [PATCH 09/23] fix ci --- .github/workflows/flink.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index c165c6255004..83d5345e9724 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -24,12 +24,12 @@ on: env: MAVEN_OPTS: >- -Xmx2g - --add-opens=java.base/java.nio=ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED JAVA_TOOL_OPTIONS: >- - --add-opens=java.base/java.nio=ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=org.apache.arrow.memory.core,ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED jobs: @@ -41,7 +41,9 @@ jobs: - name: Prepare run: | source /opt/rh/gcc-toolset-11/enable + sudo yum update -y sudo dnf install -y patchelf + sudo yum install tzdata -y git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j && git reset --hard a5e3e9d7f11440f8c4eafeff88ae6945186d02c1 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch From 4bff55bd904bf695d6e3ca2f967e74d3a4419af3 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 10 Sep 2025 02:55:17 +0000 Subject: [PATCH 10/23] remove some useless changes --- .github/workflows/flink.yml | 2 -- .../runtime/operators/GlutenOneInputOperator.java | 4 +--- .../operators/GlutenVectorSourceFunction.java | 14 +++----------- .../operators/GlutenVectorTwoInputOperator.java | 9 +++------ .../stream/common/GlutenStreamingTestBase.java | 2 -- 5 files changed, 7 insertions(+), 24 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 88e7144384b1..0026e6669cbe 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -57,9 +57,7 @@ jobs: - name: Prepare run: | source /opt/rh/gcc-toolset-11/enable - sudo yum update -y sudo dnf install -y patchelf - sudo yum install tzdata -y git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j && git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 43c2c3b9b8d2..d8f6f1a0b268 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -135,9 +135,7 @@ public void processElement(StreamRecord element) { @Override public void close() throws Exception { inputQueue.close(); - if (task != null) { - task.close(); - } + task.close(); session.close(); memoryManager.close(); allocator.close(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index 560025075264..d42a92267750 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -100,7 +100,9 @@ public void open(Configuration parameters) throws Exception { if (memoryManager == null) { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); @@ -112,16 +114,6 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); - memoryManager = MemoryManager.create(AllocationListener.NOOP); - session = Velox4j.newSession(memoryManager); - query = - new Query( - planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); - allocator = new RootAllocator(Long.MAX_VALUE); - - SerialTask task = session.queryOps().execute(query); - task.addSplit(id, split); - task.noMoreSplits(id); while (isRunning) { UpIterator.State state = task.advance(); if (state == UpIterator.State.AVAILABLE) { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java index 23c2c9731d8f..a5c4da5f0ceb 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java @@ -94,16 +94,13 @@ public GlutenVectorTwoInputOperator( private void initGlutenTask() { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty()); - task = session.queryOps().execute(query); - LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan)); - LOG.debug("OutTypes: {}", outputTypes.keySet()); query = new Query( glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); - LOG.debug("RuntimeContext: {}", getRuntimeContext().getClass().getName()); - allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); + LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan)); + LOG.debug("OutTypes: {}", outputTypes.keySet()); + LOG.debug("RuntimeContext: {}", getRuntimeContext().getClass().getName()); } @Override diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index c75db1e04293..a403d7f2b3fc 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -28,7 +28,6 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.BeforeAll; @@ -41,7 +40,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; From 527eb420b55422d0b21b007e7bec131c3b1f4e37 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 10 Sep 2025 06:45:15 +0000 Subject: [PATCH 11/23] fix ci --- .../operators/GlutenVectorSourceFunction.java | 4 +++- .../stream/common/GlutenStreamingTestBase.java | 17 ----------------- .../stream/custom/ScalarFunctionsTest.java | 2 -- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index d42a92267750..57203742efb6 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -154,7 +154,9 @@ public void initializeState(FunctionInitializationContext context) throws Except if (memoryManager == null) { memoryManager = MemoryManager.create(AllocationListener.NOOP); session = Velox4j.newSession(memoryManager); - query = new Query(planNode, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); allocator = new RootAllocator(Long.MAX_VALUE); task = session.queryOps().execute(query); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index a403d7f2b3fc..c19ef32e0d9a 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -27,7 +27,6 @@ import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.BeforeAll; @@ -165,20 +164,4 @@ protected void runAndCheck(String query, List expected, Map configs) { - for (String key : configs.keySet()) { - tEnv().getConfig().set(key, configs.get(key)); - } - boolean err = false; - try { - CloseableIterator rows = tEnv().executeSql(query).collect(); - while (rows.hasNext()) { - rows.next(); - } - } catch (Exception e) { - err = true; - } - assertThat(err).isEqualTo(true); - } } diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index f41cc2b3a9ab..efe565739c93 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -202,8 +202,6 @@ void testDateFormat() { String query = "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd'), DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; Map configs = new HashMap<>(); - configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "default"); - runAndCheckException(query, configs); configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); runAndCheck( query, From 6e20d641e53f6009e98a5e5a590fecfafab33587 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 12 Sep 2025 03:05:20 +0000 Subject: [PATCH 12/23] fix ut add timestamp_ltz type for datetime format --- .../stream/custom/ScalarFunctionsTest.java | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index efe565739c93..d68b2518745d 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -26,6 +26,9 @@ import org.junit.jupiter.api.Test; import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -196,17 +199,42 @@ void testDecimal() { @Test void testDateFormat() { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); List rows = - Arrays.asList(Row.of(1, "2024-12-31 12:12:12"), Row.of(2, "2025-02-28 12:12:12")); - createSimpleBoundedValuesTable("dateFormatTbl", "a int, b string", rows); + Arrays.asList( + Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)), + Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter))); + createSimpleBoundedValuesTable("dateFormatTbl", "a int, b Timestamp(3)", rows); String query = - "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd'), DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); + Map configs = new HashMap<>(); configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); + rows = + Arrays.asList( + Row.of( + 1, LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)), + Row.of( + 2, + LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); + createSimpleBoundedValuesTable("dateFormatTblLTZ", "a int, b Timestamp_LTZ(3)", rows); + String query1 = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from dateFormatTblLTZ"; runAndCheck( - query, + query1, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 2025-02-28 04:12:12]"), + configs); + + configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "Asia/Shanghai"); + runAndCheck( + query1, Arrays.asList( - "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]"), + "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]"), configs); } } From 1ee5c7671672e6b04e00a9b5f00546fb54ba03d9 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Wed, 17 Sep 2025 09:50:16 +0000 Subject: [PATCH 13/23] fix reviews --- .../runtime/config/VeloxQueryConfig.java | 2 ++ .../gluten/util/LogicalTypeConverter.java | 3 ++ .../common/GlutenStreamingTestBase.java | 8 ----- .../stream/custom/ScalarFunctionsTest.java | 34 +++++++++---------- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java index 9ddb4b95e14c..788ecb4a68b7 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java @@ -40,6 +40,8 @@ public static Config getConfig(RuntimeContext context) { Map configMap = new HashMap<>(); configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); + // As flink's default timezone value is `default`, it is not a valid timezone id, so we should + // convert it to `UTC` timezone. if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(localTimeZone)) { configMap.put(keyVeloxSessionTimezone, "UTC"); } else { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java index d4e33a17a1a7..efb29c041553 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java @@ -105,6 +105,9 @@ private interface VLTypeConverter { Type valueType = toVLType(mapType.getValueType()); return io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType); }), + // Map the flink's `TimestampLTZ` type to velox `Timestamp` type. And the timezone would + // be specified by using flink's table config `LOCAL_TIME_ZONE`, which would be passed to + // velox's `session_timezone` config. // TODO: may need precision Map.entry( LocalZonedTimestampType.class, diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java index c19ef32e0d9a..193d1c8ade88 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java @@ -38,7 +38,6 @@ import java.io.FileReader; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -157,11 +156,4 @@ protected void runAndCheck(String query, List expected) { tEnv().executeSql("drop table if exists printT"); } } - - protected void runAndCheck(String query, List expected, Map configs) { - for (String key : configs.keySet()) { - tEnv().getConfig().set(key, configs.get(key)); - } - runAndCheck(query, expected); - } } diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index d68b2518745d..aa9a6cfbd091 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -18,7 +18,6 @@ import org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase; -import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; @@ -27,12 +26,11 @@ import java.math.BigDecimal; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; class ScalarFunctionsTest extends GlutenStreamingTestBase { @@ -204,16 +202,19 @@ void testDateFormat() { Arrays.asList( Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)), Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter))); - createSimpleBoundedValuesTable("dateFormatTbl", "a int, b Timestamp(3)", rows); + createSimpleBoundedValuesTable("timestampTable", "a int, b Timestamp(3)", rows); String query = - "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from dateFormatTbl"; + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampTable"; + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); runAndCheck( query, Arrays.asList( "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); - Map configs = new HashMap<>(); - configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); rows = Arrays.asList( Row.of( @@ -221,20 +222,19 @@ void testDateFormat() { Row.of( 2, LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); - createSimpleBoundedValuesTable("dateFormatTblLTZ", "a int, b Timestamp_LTZ(3)", rows); - String query1 = - "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from dateFormatTblLTZ"; + createSimpleBoundedValuesTable("timestampLtzTable", "a int, b Timestamp_LTZ(3)", rows); + query = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampLtzTable"; + tEnv().getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); runAndCheck( - query1, + query, Arrays.asList( - "+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 2025-02-28 04:12:12]"), - configs); + "+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 2025-02-28 04:12:12]")); - configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "Asia/Shanghai"); + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); runAndCheck( - query1, + query, Arrays.asList( - "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]"), - configs); + "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]")); } } From 16db006b9fe03dace5b2fe66c9734414f28871bc Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 19 Sep 2025 07:52:16 +0000 Subject: [PATCH 14/23] update flink.yml for test --- .github/workflows/flink.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 0026e6669cbe..f4a52a2d935d 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -58,8 +58,8 @@ jobs: run: | source /opt/rh/gcc-toolset-11/enable sudo dnf install -y patchelf - git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de + git clone -b test_date_format_tmp https://github.com/KevinyhZou/velox4j.git + cd velox4j && git reset --hard a08a3abe8c8a9cd960f5f16fa8cd7c451fe140f8 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 0bd0d777458d7bef6614a84ce42d1e68a28b25cb Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 19 Sep 2025 10:01:04 +0000 Subject: [PATCH 15/23] for ci testing --- .../gluten/table/runtime/stream/custom/ScalarFunctionsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index aa9a6cfbd091..20cca30e8893 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.List; +@Disabled class ScalarFunctionsTest extends GlutenStreamingTestBase { @Override From f53101e9917de250b0d55d13777e6e0ae0645c20 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 22 Sep 2025 01:44:47 +0000 Subject: [PATCH 16/23] check ci --- .../org/apache/gluten/table/runtime/stream/custom/ScanTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java index 7d097763d6f2..3f9d0189fdfa 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java @@ -21,6 +21,7 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Map; +@Disabled class ScanTest extends GlutenStreamingTestBase { private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class); From ca257e91c0aea775977fd3c391f11ecfc0168e30 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 22 Sep 2025 07:51:52 +0000 Subject: [PATCH 17/23] update flink.yml --- .github/workflows/flink.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index f4a52a2d935d..6f4476f05344 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -58,6 +58,7 @@ jobs: run: | source /opt/rh/gcc-toolset-11/enable sudo dnf install -y patchelf + sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y git clone -b test_date_format_tmp https://github.com/KevinyhZou/velox4j.git cd velox4j && git reset --hard a08a3abe8c8a9cd960f5f16fa8cd7c451fe140f8 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch From 948472f0ae087a0c65cc574672c341c186c84ad0 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 22 Sep 2025 09:03:16 +0000 Subject: [PATCH 18/23] remove disabled label --- .../gluten/table/runtime/stream/custom/ScalarFunctionsTest.java | 2 -- .../org/apache/gluten/table/runtime/stream/custom/ScanTest.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index f33078198624..052c1953c24a 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -21,7 +21,6 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.math.BigDecimal; @@ -32,7 +31,6 @@ import java.util.Arrays; import java.util.List; -@Disabled class ScalarFunctionsTest extends GlutenStreamingTestBase { @Override diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java index 3f9d0189fdfa..7d097763d6f2 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java @@ -21,7 +21,6 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Map; -@Disabled class ScanTest extends GlutenStreamingTestBase { private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class); From 5d0e7e570771c1408ebbe848e48a826ffb65659e Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Mon, 22 Sep 2025 10:16:47 +0000 Subject: [PATCH 19/23] fix reviews --- .../stream/custom/ScalarFunctionsTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index 052c1953c24a..51f1b42e853d 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.math.BigDecimal; @@ -271,5 +272,26 @@ void testDateFormat() { query, Arrays.asList( "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]")); + + rows = + Arrays.asList( + Row.of( + 1, + LocalDateTime.parse("2024-12-31 12:12:12", formatter), + LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)), + Row.of( + 2, + LocalDateTime.parse("2025-02-28 12:12:12", formatter), + LocalDateTime.parse("2024-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); + createSimpleBoundedValuesTable( + "timestampTable0", "a int, b Timestamp(3), c Timestamp_LTZ(3)", rows); + query = + "select a, DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss'), DATE_FORMAT(c, 'yyyy-MM-dd HH:mm:ss') from timestampTable0"; + tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + runAndCheck( + query, + Arrays.asList( + "+I[1, 2024-12-31 12:12:12, 2024-12-31 20:12:12]", + "+I[2, 2025-02-28 12:12:12, 2024-02-28 20:12:12]")); } } From 3638ce2189d3720574147c24a9174d0d7a053bc7 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Thu, 25 Sep 2025 01:55:43 +0000 Subject: [PATCH 20/23] revert flink.yml --- .github/workflows/flink.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 6f4476f05344..5cbaa03a3780 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -59,8 +59,8 @@ jobs: source /opt/rh/gcc-toolset-11/enable sudo dnf install -y patchelf sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y - git clone -b test_date_format_tmp https://github.com/KevinyhZou/velox4j.git - cd velox4j && git reset --hard a08a3abe8c8a9cd960f5f16fa8cd7c451fe140f8 + git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git + cd velox4j && git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 40da04412db0b11f9bbd10aaf8243aee0bbdaaa1 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 26 Sep 2025 02:03:25 +0000 Subject: [PATCH 21/23] fix reviews --- .../runtime/operators/GlutenVectorOneInputOperator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java index 5319c61b7992..e859e15ca37b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java @@ -17,9 +17,9 @@ package org.apache.gluten.table.runtime.operators; import org.apache.gluten.streaming.api.operators.GlutenOperator; +import org.apache.gluten.table.runtime.config.VeloxQueryConfig; import io.github.zhztheplayer.velox4j.Velox4j; -import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; @@ -94,7 +94,9 @@ void initGlutenTask() { mockInput.addTarget(glutenPlan); LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput)); LOG.debug("OutTypes: {}", outputTypes.keySet()); - query = new Query(mockInput, Config.empty(), ConnectorConfig.empty()); + query = + new Query( + mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty()); task = session.queryOps().execute(query); } From 17a8ebf05137c22613ffc19defdc3454d3bb9cc1 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 26 Sep 2025 09:49:19 +0000 Subject: [PATCH 22/23] update flink.yml --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 5cbaa03a3780..d6de6118da44 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -60,7 +60,7 @@ jobs: sudo dnf install -y patchelf sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec + cd velox4j && git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From e40a2211531a4af1b1eda98c963f27f00dde532b Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 26 Sep 2025 09:50:47 +0000 Subject: [PATCH 23/23] update flink.md --- gluten-flink/docs/Flink.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md index 6f98d3e14339..92dc6fbc373d 100644 --- a/gluten-flink/docs/Flink.md +++ b/gluten-flink/docs/Flink.md @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow ## fetch velox4j code git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j -git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec +git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true ``` **Get gluten**