From 857095c0bfa7946a67f0024b0f2ec1e0bcabc28e Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 14 Jan 2026 10:16:43 +0800 Subject: [PATCH] [Fix](StreamingJob) fix create table issues when create streaming job (#59828) ### What problem does this PR solve? Related PR: https://github.com/apache/doris/pull/58898 1. The length of varchar needs to be multiplied by 3 when creating the table. 2. Columns are ordered according to the primary key. 3. Unsupported column types will result in an error. --- .../doris/httpv2/rest/StreamingJobAction.java | 3 + .../doris/job/util/StreamingJobUtils.java | 27 +- .../doris/job/util/StreamingJobUtilsTest.java | 240 ++++++++++++++++++ .../cdc/test_streaming_mysql_job_all_type.out | 2 +- .../cdc/test_streaming_mysql_job.groovy | 3 +- 5 files changed, 272 insertions(+), 3 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java index 53610142f12bc2..573e0a17f16d90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java @@ -28,6 +28,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import lombok.ToString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.web.bind.annotation.RequestBody; @@ -67,6 +68,7 @@ private Object updateOffset(CommitOffsetRequest offsetRequest) { StreamingInsertJob streamingJob = (StreamingInsertJob) job; try { + LOG.info("Committing offset with {}", offsetRequest.toString()); streamingJob.commitOffset(offsetRequest); return ResponseEntityBuilder.ok("Offset committed successfully"); } catch (Exception e) { @@ -79,6 +81,7 @@ private Object updateOffset(CommitOffsetRequest offsetRequest) { @Getter @Setter @NoArgsConstructor + @ToString public static class CommitOffsetRequest { public long jobId; public long taskId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 4625417b67de36..bac12ae3eba6b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -59,6 +59,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -333,18 +334,42 @@ public static List generateCreateTableCmds(String targetDb, return createtblCmds; } - private static List getColumns(JdbcClient jdbcClient, + public static List getColumns(JdbcClient jdbcClient, String database, String table, List primaryKeys) { List columns = jdbcClient.getColumnsFromJdbc(database, table); columns.forEach(col -> { + Preconditions.checkArgument(!col.getType().isUnsupported(), + "Unsupported column type, table:[%s], column:[%s]", table, col.getName()); + if (col.getType().isVarchar()) { + // The length of varchar needs to be multiplied by 3. + int len = col.getType().getLength() * 3; + if (len > ScalarType.MAX_VARCHAR_LENGTH) { + col.setType(ScalarType.createStringType()); + } else { + col.setType(ScalarType.createVarcharType(len)); + } + } + // string can not to be key if (primaryKeys.contains(col.getName()) && col.getDataType() == PrimitiveType.STRING) { col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); } }); + + // sort columns for primary keys + columns.sort( + Comparator + .comparing((Column col) -> !primaryKeys.contains(col.getName())) + .thenComparing( + col -> primaryKeys.contains(col.getName()) + ? primaryKeys.indexOf(col.getName()) + : Integer.MAX_VALUE + ) + ); + return columns; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java new file mode 100644 index 00000000000000..3e090d4d3fe168 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.util; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.datasource.jdbc.client.JdbcClient; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class StreamingJobUtilsTest { + + @Mock + private JdbcClient jdbcClient; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testGetColumnsWithPrimaryKeySorting() throws Exception { + // Prepare test data + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("id", "name"); + + // Create mock columns in random order + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("age", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.BIGINT))); + mockColumns.add(new Column("email", ScalarType.createVarcharType(100))); + mockColumns.add(new Column("name", ScalarType.createVarcharType(50))); + mockColumns.add(new Column("address", ScalarType.createVarcharType(200))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify primary keys are at the front in correct order + Assert.assertEquals(5, result.size()); + Assert.assertEquals("id", result.get(0).getName()); + Assert.assertEquals("name", result.get(1).getName()); + // Verify varchar primary key columns have their length multiplied by 3 + Column nameColumn = result.get(1); + Assert.assertEquals(150, nameColumn.getType().getLength()); // 50 * 3 + // Verify non-primary key columns follow + Assert.assertEquals("age", result.get(2).getName()); + Assert.assertEquals("email", result.get(3).getName()); + Assert.assertEquals("address", result.get(4).getName()); + // Verify non-primary key varchar columns also have their length multiplied by 3 + Column emailColumn = result.get(3); + Assert.assertEquals(300, emailColumn.getType().getLength()); // 100 * 3 + Column addressColumn = result.get(4); + Assert.assertEquals(600, addressColumn.getType().getLength()); // 200 * 3 + } + + @Test + public void testGetColumnsWithVarcharTypeConversion() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("id"); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("short_name", ScalarType.createVarcharType(50))); + mockColumns.add(new Column("long_name", ScalarType.createVarcharType(20000))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify varchar length multiplication by 3 + Column shortName = result.stream() + .filter(col -> col.getName().equals("short_name")) + .findFirst() + .orElse(null); + Assert.assertNotNull(shortName); + Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3 + + // Verify long varchar becomes STRING type + Column longName = result.stream() + .filter(col -> col.getName().equals("long_name")) + .findFirst() + .orElse(null); + Assert.assertNotNull(longName); + Assert.assertTrue(longName.getType().isStringType()); + } + + @Test + public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("id"); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("id", ScalarType.createStringType())); + mockColumns.add(new Column("name", ScalarType.createVarcharType(50))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify string type primary key is converted to varchar + Column idColumn = result.stream() + .filter(col -> col.getName().equals("id")) + .findFirst() + .orElse(null); + Assert.assertNotNull(idColumn); + Assert.assertTrue(idColumn.getType().isVarchar()); + Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, idColumn.getType().getLength()); + } + + @Test + public void testGetColumnsWithEmptyPrimaryKeys() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = new ArrayList<>(); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("col1", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("col2", ScalarType.createVarcharType(100))); + mockColumns.add(new Column("col3", ScalarType.createType(PrimitiveType.BIGINT))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify columns maintain original order when no primary keys + Assert.assertEquals(3, result.size()); + Assert.assertEquals("col1", result.get(0).getName()); + Assert.assertEquals("col2", result.get(1).getName()); + Assert.assertEquals("col3", result.get(2).getName()); + } + + @Test + public void testGetColumnsWithMultiplePrimaryKeys() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("pk3", "pk1", "pk2"); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("data1", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("pk1", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("data2", ScalarType.createVarcharType(100))); + mockColumns.add(new Column("pk2", ScalarType.createType(PrimitiveType.BIGINT))); + mockColumns.add(new Column("pk3", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("data3", ScalarType.createVarcharType(50))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify primary keys are sorted in the order defined in primaryKeys list + Assert.assertEquals(6, result.size()); + Assert.assertEquals("pk3", result.get(0).getName()); + Assert.assertEquals("pk1", result.get(1).getName()); + Assert.assertEquals("pk2", result.get(2).getName()); + // Verify non-primary keys follow + Assert.assertEquals("data1", result.get(3).getName()); + Assert.assertEquals("data2", result.get(4).getName()); + Assert.assertEquals("data3", result.get(5).getName()); + } + + @Test + public void testGetColumnsWithUnsupportedColumnType() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("id"); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("unsupported_col", new ScalarType(PrimitiveType.UNSUPPORTED))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + // This should throw IllegalArgumentException due to unsupported column type + try { + StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + Assert.fail("Expected IllegalArgumentException to be thrown"); + } catch (IllegalArgumentException e) { + // Verify the exception message contains expected information + String message = e.getMessage(); + Assert.assertTrue(message.contains("Unsupported column type")); + Assert.assertTrue(message.contains("test_table")); + Assert.assertTrue(message.contains("unsupported_col")); + } + } + + @Test + public void testGetColumnsWithVarcharPrimaryKeyLengthMultiplication() throws Exception { + String database = "test_db"; + String table = "test_table"; + List primaryKeys = Arrays.asList("pk_varchar", "pk_int"); + + List mockColumns = new ArrayList<>(); + mockColumns.add(new Column("pk_int", ScalarType.createType(PrimitiveType.INT))); + mockColumns.add(new Column("pk_varchar", ScalarType.createVarcharType(100))); + mockColumns.add(new Column("normal_varchar", ScalarType.createVarcharType(50))); + + Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns); + List result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys); + + // Verify varchar primary key column has length multiplied by 3 + Column pkVarcharColumn = result.stream() + .filter(col -> col.getName().equals("pk_varchar")) + .findFirst() + .orElse(null); + Assert.assertNotNull(pkVarcharColumn); + Assert.assertEquals(300, pkVarcharColumn.getType().getLength()); // 100 * 3 + + // Verify normal varchar column also has length multiplied by 3 + Column normalVarcharColumn = result.stream() + .filter(col -> col.getName().equals("normal_varchar")) + .findFirst() + .orElse(null); + Assert.assertNotNull(normalVarcharColumn); + Assert.assertEquals(150, normalVarcharColumn.getType().getLength()); // 50 * 3 + } +} diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out index 691380f5b33ac8..ca7379dbcf5182 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out @@ -35,7 +35,7 @@ timestamp1 datetime Yes false \N NONE timestamp2 datetime(3) Yes false \N NONE timestamp3 datetime(6) Yes false \N NONE char char(5) Yes false \N NONE -varchar varchar(10) Yes false \N NONE +varchar varchar(30) Yes false \N NONE text text Yes false \N NONE blob text Yes false \N NONE json text Yes false \N NONE diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy index a6bc1d174315e3..2da9437ab4b4ec 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy @@ -58,6 +58,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table3}""" sql """CREATE TABLE ${mysqlDb}.${table1} ( `name` varchar(200) NOT NULL, `age` int DEFAULT NULL, @@ -109,7 +110,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do // check table schema correct def showTbl1 = sql """show create table ${currentDb}.${table1}""" def createTalInfo = showTbl1[0][1]; - assert createTalInfo.contains("`name` varchar(200)"); + assert createTalInfo.contains("`name` varchar(600)"); assert createTalInfo.contains("`age` int"); assert createTalInfo.contains("UNIQUE KEY(`name`)"); assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS AUTO");