Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2dd79f2
Support date_format function
KevinyhZou Jul 23, 2025
3ad14c9
consider about timezone
KevinyhZou Jul 24, 2025
2573bbc
fix ci
KevinyhZou Jul 24, 2025
57471ad
fix review
KevinyhZou Jul 30, 2025
c05c6b1
Merge branch 'main' into support_dateformat
KevinyhZou Jul 30, 2025
b7e055a
run and check timezone set error
KevinyhZou Jul 30, 2025
86da02c
check error
KevinyhZou Jul 30, 2025
f99bf27
Merge branch 'main' into support_dateformat
KevinyhZou Aug 27, 2025
9700c1f
fix reviews
KevinyhZou Aug 27, 2025
ccc347c
Merge branch 'main' into support_dateformat
KevinyhZou Sep 1, 2025
b07357a
test on remove arrow.memory.core from flink.yml
KevinyhZou Sep 1, 2025
8b95500
fix ci
KevinyhZou Sep 1, 2025
af21e3a
Merge branch 'main' into support_dateformat
KevinyhZou Sep 10, 2025
4bff55b
remove some useless changes
KevinyhZou Sep 10, 2025
527eb42
fix ci
KevinyhZou Sep 10, 2025
6e20d64
fix ut add timestamp_ltz type for datetime format
KevinyhZou Sep 12, 2025
1ee5c76
fix reviews
KevinyhZou Sep 17, 2025
16db006
update flink.yml for test
KevinyhZou Sep 19, 2025
0bd0d77
for ci testing
KevinyhZou Sep 19, 2025
d97c6f2
Merge branch 'main' into support_dateformat
KevinyhZou Sep 19, 2025
f53101e
check ci
KevinyhZou Sep 22, 2025
f19774c
Merge branch 'main' into support_dateformat
KevinyhZou Sep 22, 2025
ca257e9
update flink.yml
KevinyhZou Sep 22, 2025
948472f
remove disabled label
KevinyhZou Sep 22, 2025
5d0e7e5
fix reviews
KevinyhZou Sep 22, 2025
3638ce2
revert flink.yml
KevinyhZou Sep 25, 2025
40da044
fix reviews
KevinyhZou Sep 26, 2025
17a8ebf
update flink.yml
KevinyhZou Sep 26, 2025
e40a221
update flink.md
KevinyhZou Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ jobs:
run: |
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec
cd velox4j && git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
2 changes: 1 addition & 1 deletion gluten-flink/docs/Flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec
git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class RexCallConverterFactory {
Map.entry("AND", Arrays.asList(() -> new DefaultRexCallConverter("and"))),
Map.entry("SPLIT_INDEX", Arrays.asList(() -> new SplitIndexRexCallConverter())),
Map.entry("SEARCH", Arrays.asList(() -> new DefaultRexCallConverter("in"))),
Map.entry("DATE_FORMAT", Arrays.asList(() -> new DefaultRexCallConverter("date_format"))),
Map.entry(
">=",
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.table.runtime.config;

import io.github.zhztheplayer.velox4j.config.Config;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.api.config.TableConfigOptions;

import java.util.HashMap;
import java.util.Map;

public class VeloxQueryConfig {

private static final String keyVeloxAdjustTimestampToSessionTimeZone =
"adjust_timestamp_to_session_timezone";
private static final String keyVeloxSessionTimezone = "session_timezone";

public static Config getConfig(RuntimeContext context) {
if (!(context instanceof StreamingRuntimeContext)) {
return Config.empty();
}
Configuration config = ((StreamingRuntimeContext) context).getJobConfiguration();
Map<String, String> configMap = new HashMap<>();
configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true");
String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE);
// As flink's default timezone value is `default`, it is not a valid timezone id, so we should
// convert it to `UTC` timezone.
if (TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(localTimeZone)) {
configMap.put(keyVeloxSessionTimezone, "UTC");
Comment thread
philo-he marked this conversation as resolved.
} else {
configMap.put(keyVeloxSessionTimezone, localTimeZone);
}
return Config.create(configMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.streaming.api.operators.GlutenOperator;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.Config;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
Expand Down Expand Up @@ -99,7 +99,9 @@ public void open() throws Exception {
mockInput.addTarget(glutenPlan);
LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
LOG.debug("OutTypes: {}", outputTypes.keySet());
query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
query =
new Query(
mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
ExternalStreamConnectorSplit split =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.Config;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.data.RowVector;
Expand Down Expand Up @@ -92,7 +92,9 @@ public void run(SourceContext<RowData> sourceContext) throws Exception {
LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
query =
new Query(
planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);

SerialTask task = session.queryOps().execute(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.streaming.api.operators.GlutenOperator;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.Config;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
Expand Down Expand Up @@ -94,7 +94,9 @@ void initGlutenTask() {
mockInput.addTarget(glutenPlan);
LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
LOG.debug("OutTypes: {}", outputTypes.keySet());
query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
query =
new Query(
mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
task = session.queryOps().execute(query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.table.runtime.config.VeloxQueryConfig;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.Config;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
import io.github.zhztheplayer.velox4j.iterator.UpIterator;
Expand Down Expand Up @@ -98,7 +99,9 @@ public void open(Configuration parameters) throws Exception {
if (memoryManager == null) {
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
query =
new Query(
planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);

task = session.queryOps().execute(query);
Expand Down Expand Up @@ -150,7 +153,9 @@ public void initializeState(FunctionInitializationContext context) throws Except
LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
query =
new Query(
planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);

task = session.queryOps().execute(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.gluten.table.runtime.operators;

import org.apache.gluten.streaming.api.operators.GlutenOperator;
import org.apache.gluten.table.runtime.config.VeloxQueryConfig;

import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.config.Config;
import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
Expand Down Expand Up @@ -94,11 +94,13 @@ public GlutenVectorTwoInputOperator(
private void initGlutenTask() {
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
query =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems missing the GlutenVectorOneInputOperator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

new Query(
glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), ConnectorConfig.empty());
task = session.queryOps().execute(query);
LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
LOG.debug("OutTypes: {}", outputTypes.keySet());
LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
LOG.debug("RuntimeContext: {}", getRuntimeContext().getClass().getName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ private interface VLTypeConverter {
Type valueType = toVLType(mapType.getValueType());
return io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType);
}),
// Map the flink's `TimestampLTZ` type to velox `Timestamp` type. And the timezone would
// be specified by using flink's table config `LOCAL_TIME_ZONE`, which would be passed to
// velox's `session_timezone` config.
// TODO: may need precision
Map.entry(
LocalZonedTimestampType.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ protected void setValue(int index, Integer value) {

class StructVectorWriter extends BaseVectorWriter<StructVector, RowData> {
private final int fieldCount;
private BufferAllocator allocator;
private final List<ArrowVectorWriter> fieldWriters;

public StructVectorWriter(Type fieldType, BufferAllocator allocator, FieldVector vector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public static RowVector fromRowData(
List<Type> fieldTypes = rowType.getChildren();
List<String> fieldNames = rowType.getNames();
for (int i = 0; i < rowType.size(); i++) {
Type fieldType = rowType.getChildren().get(i);
ArrowVectorWriter writer =
ArrowVectorWriter.create(fieldNames.get(i), fieldTypes.get(i), allocator);
writer.write(i, row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -226,4 +230,68 @@ void testDecimal() {
query = "select b + e as x from tblDecimal where a > 0";
runAndCheck(query, Arrays.asList("+I[2.0]", "+I[5.0]", "+I[7.0]"));
}

@Test
void testDateFormat() {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
List<Row> rows =
Arrays.asList(
Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)),
Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter)));
createSimpleBoundedValuesTable("timestampTable", "a int, b Timestamp(3)", rows);
String query =
"select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampTable";
runAndCheck(
query,
Arrays.asList(
"+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]"));
tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
runAndCheck(
query,
Arrays.asList(
"+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]"));
Comment thread
philo-he marked this conversation as resolved.

rows =
Arrays.asList(
Row.of(
1, LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)),
Row.of(
2,
LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC)));
createSimpleBoundedValuesTable("timestampLtzTable", "a int, b Timestamp_LTZ(3)", rows);
query =
"select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss') from timestampLtzTable";
tEnv().getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
runAndCheck(
query,
Arrays.asList(
"+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 2025-02-28 04:12:12]"));

tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
runAndCheck(
query,
Arrays.asList(
"+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 2025-02-28 20:12:12]"));

rows =
Arrays.asList(
Row.of(
1,
LocalDateTime.parse("2024-12-31 12:12:12", formatter),
LocalDateTime.parse("2024-12-31 12:12:12", formatter).toInstant(ZoneOffset.UTC)),
Row.of(
2,
LocalDateTime.parse("2025-02-28 12:12:12", formatter),
LocalDateTime.parse("2024-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC)));
createSimpleBoundedValuesTable(
"timestampTable0", "a int, b Timestamp(3), c Timestamp_LTZ(3)", rows);
query =
"select a, DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss'), DATE_FORMAT(c, 'yyyy-MM-dd HH:mm:ss') from timestampTable0";
tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
runAndCheck(
query,
Arrays.asList(
"+I[1, 2024-12-31 12:12:12, 2024-12-31 20:12:12]",
"+I[2, 2025-02-28 12:12:12, 2024-02-28 20:12:12]"));
}
}