From 1765575cd67ead99f5f3c29129c5338b5642e185 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 9 Jan 2026 17:50:01 +0800 Subject: [PATCH 1/8] add cdc stream tvf --- be/src/io/fs/http_file_reader.cpp | 114 ++++++++--- be/src/io/fs/http_file_reader.h | 1 + .../doris/job/cdc/DataSourceConfigKeys.java | 2 + .../job/cdc/request/CompareOffsetRequest.java | 2 +- .../job/cdc/request/FetchRecordRequest.java | 13 +- .../cdc/request/FetchTableSplitsRequest.java | 2 +- .../doris/job/cdc/request/JobBaseConfig.java | 2 +- .../job/cdc/request/JobBaseRecordRequest.java | 4 - .../job/cdc/request/WriteRecordRequest.java | 10 - .../catalog/BuiltinTableValuedFunctions.java | 4 +- .../streaming/StreamingMultiTblTask.java | 2 +- .../offset/jdbc/JdbcSourceOffsetProvider.java | 4 +- .../doris/job/util/StreamingJobUtils.java | 8 +- .../functions/table/CdcStream.java | 55 ++++++ .../visitor/TableValuedFunctionVisitor.java | 5 + .../CdcStreamTableValuedFunction.java | 125 ++++++++++++ .../apache/doris/cdcclient/common/Env.java | 16 +- .../config/GlobalExceptionHandler.java | 7 + .../controller/ClientController.java | 20 +- .../cdcclient/exception/StreamException.java | 44 +++++ .../service/PipelineCoordinator.java | 179 +++++++++++------- .../cdcclient/sink/DorisBatchStreamLoad.java | 8 +- .../reader/JdbcIncrementalSourceReader.java | 13 +- .../cdcclient/source/reader/SourceReader.java | 4 +- .../reader/mysql/MySqlSourceReader.java | 25 ++- .../reader/postgres/PostgresSourceReader.java | 25 +-- .../doris/cdcclient/utils/ConfigUtil.java | 22 ++- 27 files changed, 532 insertions(+), 184 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index fb243179baf557..6f02f2100b6cac 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -22,7 +22,11 @@ #include +#include "common/config.h" #include "common/logging.h" +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cdc_client_mgr.h" +#include "runtime/exec_env.h" namespace doris::io { @@ -83,6 +87,14 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url) } } + // Parse chunk response configuration + auto chunk_iter = _extend_kv.find("http.chunk.response"); + if (chunk_iter != _extend_kv.end()) { + std::string value = chunk_iter->second; + std::transform(value.begin(), value.end(), value.begin(), ::tolower); + _enable_chunk_response = (value == "true" || value == "1"); + } + _read_buffer = std::make_unique(READ_BUFFER_SIZE); } @@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) { return Status::OK(); } - // Step 1: HEAD request to get file metadata - RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); - _client->set_method(HttpMethod::HEAD); - RETURN_IF_ERROR(_client->execute()); + // start CDC client + auto enable_cdc_iter = _extend_kv.find("enable_cdc_client"); + if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") { + LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url; + ExecEnv* env = ExecEnv::GetInstance(); + if (env == nullptr || env->cdc_client_mgr() == nullptr) { + return Status::InternalError("ExecEnv or CdcClientMgr is not initialized"); + } + + PRequestCdcClientResult result; + Status start_st = env->cdc_client_mgr()->start_cdc_client(&result); + if (!start_st.ok()) { + LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); + return start_st; + } + + _url = fmt::format(_url, doris::config::cdc_client_port); + _range_supported = false; + LOG(INFO) << "CDC client started successfully for " << _url; + } + + // Step 1: HEAD request to get file metadata (skip for chunk response) + if (_enable_chunk_response) { + // Chunk streaming response, skip HEAD request + _size_known = false; + _range_supported = false; + LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url; + } else { + // Normal mode: execute HEAD request to get file metadata + RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); + _client->set_method(HttpMethod::HEAD); + RETURN_IF_ERROR(_client->execute()); - uint64_t content_length = 0; - RETURN_IF_ERROR(_client->get_content_length(&content_length)); + uint64_t content_length = 0; + RETURN_IF_ERROR(_client->get_content_length(&content_length)); - _file_size = content_length; - _size_known = true; + _file_size = content_length; + _size_known = true; + } // Step 2: Check if Range request is disabled by configuration - if (!_enable_range_request) { - // User explicitly disabled Range requests, use non-Range mode directly + if (!_enable_range_request || _enable_chunk_response) { + // User explicitly disabled Range requests or chunk response mode, use non-Range mode _range_supported = false; - LOG(INFO) << "Range requests disabled by configuration for " << _url - << ", using non-Range mode. File size: " << _file_size << " bytes"; - - // Check if file size exceeds limit for non-Range mode - if (_file_size > _max_request_size_bytes) { - return Status::InternalError( - "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, " - "configured by http.max.request.size.bytes). URL: {}", - _file_size, _max_request_size_bytes, _url); + LOG(INFO) << "Range requests disabled for " << _url << " (config=" << !_enable_range_request + << ", chunk_response=" << _enable_chunk_response << ")"; + + // Skip file size check for chunk response (size unknown at this point) + if (_enable_chunk_response) { + LOG(INFO) << "Chunk response mode, file size check skipped for " << _url; + } else { + // Check if file size exceeds limit for non-Range mode (non-chunk only) + if (_file_size > _max_request_size_bytes) { + return Status::InternalError( + "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} " + "bytes, " + "configured by http.max.request.size.bytes). URL: {}", + _file_size, _max_request_size_bytes, _url); + } + LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size + << " bytes, max allowed: " << _max_request_size_bytes << " bytes"; } - - LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size - << " bytes, max allowed: " << _max_request_size_bytes << " bytes"; } else { // Step 3: Range request is enabled (default), detect Range support VLOG(1) << "Detecting Range support for URL: " << _url; @@ -207,9 +253,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len << " with_range=" << _range_supported; - // Prepare and initialize the HTTP client for GET request + // Prepare and initialize the HTTP client for request RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false)); - _client->set_method(HttpMethod::GET); + + // Determine HTTP method from configuration (default: GET) + HttpMethod method = HttpMethod::GET; + auto method_iter = _extend_kv.find("http.method"); + if (method_iter != _extend_kv.end()) { + method = to_http_method(method_iter->second.c_str()); + if (method == HttpMethod::UNKNOWN) { + LOG(WARNING) << "Invalid http.method value: " << method_iter->second + << ", falling back to GET"; + method = HttpMethod::GET; + } + } + _client->set_method(method); + + // Set payload if configured (supports POST, PUT, DELETE, etc.) + auto payload_iter = _extend_kv.find("http.payload"); + if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) { + _client->set_payload(payload_iter->second); + _client->set_content_type("application/json"); + VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size(); + } _client->set_header("Expect", ""); _client->set_header("Connection", "close"); diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 607eedf3d1a50b..72a19fe79b0c4b 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -82,6 +82,7 @@ class HttpFileReader final : public FileReader { // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required size_t _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE; // Max size for non-Range downloads + bool _enable_chunk_response = false; // Whether server returns chunk streaming response }; } // namespace doris::io diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index 9e1918e561431b..a704ab615d6028 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -19,6 +19,7 @@ public class DataSourceConfigKeys { public static final String JDBC_URL = "jdbc_url"; + public static final String TYPE = "type"; public static final String DRIVER_URL = "driver_url"; public static final String DRIVER_CLASS = "driver_class"; public static final String USER = "user"; @@ -26,6 +27,7 @@ public class DataSourceConfigKeys { public static final String DATABASE = "database"; public static final String SCHEMA = "schema"; public static final String INCLUDE_TABLES = "include_tables"; + public static final String TABLE = "table"; public static final String EXCLUDE_TABLES = "exclude_tables"; // initial,earliest,latest,{binlog,postion},\d{13} public static final String OFFSET = "offset"; diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java index 8449afd96281e1..c0b70832ffc997 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java @@ -37,7 +37,7 @@ public CompareOffsetRequest(Long jobId, Map sourceProperties, Map offsetFirst, Map offsetSecond) { - super(jobId, sourceType, sourceProperties); + super(jobId.toString(), sourceType, sourceProperties); this.offsetFirst = offsetFirst; this.offsetSecond = offsetSecond; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java index f11539e68324d1..65ffd36966cd47 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java @@ -23,16 +23,5 @@ @Data @EqualsAndHashCode(callSuper = true) public class FetchRecordRequest extends JobBaseRecordRequest { - private boolean reload = true; - private int fetchSize; - - @Override - public boolean isReload() { - return reload; - } - - @Override - public int getFetchSize() { - return fetchSize; - } + private String taskId; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java index f855e373958687..c6c8a0fb550f09 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java @@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig { public FetchTableSplitsRequest(Long jobId, String name, Map sourceProperties, String snapshotTable) { - super(jobId, name, sourceProperties); + super(jobId.toString(), name, sourceProperties); this.snapshotTable = snapshotTable; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java index bfdbf6a34558a9..89acfd4a929646 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java @@ -27,7 +27,7 @@ @AllArgsConstructor @NoArgsConstructor public class JobBaseConfig { - private Long jobId; + private String jobId; private String dataSource; private Map config; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java index a9a1be374dbede..282913e2dd2d0e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java @@ -28,8 +28,4 @@ @EqualsAndHashCode(callSuper = true) public abstract class JobBaseRecordRequest extends JobBaseConfig { protected Map meta; - - public abstract boolean isReload(); - - public abstract int getFetchSize(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java index a75edfcf7fb718..befacd39a94ae0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java @@ -28,14 +28,4 @@ public class WriteRecordRequest extends JobBaseRecordRequest { private String token; private String frontendAddress; private String taskId; - - @Override - public boolean isReload() { - return true; - } - - @Override - public int getFetchSize() { - return Integer.MAX_VALUE; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index a2fb673b12f220..c51274fc03153c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; @@ -77,7 +78,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(ParquetMeta.class, "parquet_meta"), tableValued(ParquetFileMetadata.class, "parquet_file_metadata"), tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"), - tableValued(ParquetBloomProbe.class, "parquet_bloom_probe") + tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"), + tableValued(CdcStream.class, "cdc_stream") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 50bb0fd2acd6f1..fe3890fa58df01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -167,7 +167,7 @@ private String getToken() throws JobException { private WriteRecordRequest buildRequestParams() throws JobException { JdbcOffset offset = (JdbcOffset) runningOffset; WriteRecordRequest request = new WriteRecordRequest(); - request.setJobId(getJobId()); + request.setJobId(getJobId() + ""); request.setConfig(sourceProperties); request.setDataSource(dataSourceType.name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 2c898b04a07c37..6ce1de4bf95b52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -184,7 +184,7 @@ public void updateOffset(Offset offset) { @Override public void fetchRemoteMeta(Map properties) throws Exception { Backend backend = StreamingJobUtils.selectBackend(jobId); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/fetchEndOffset") .setParams(new Gson().toJson(requestParams)).build(); @@ -494,7 +494,7 @@ public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); Backend backend = StreamingJobUtils.selectBackend(jobId); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/close") .setParams(new Gson().toJson(requestParams)).build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 0281503448cd7d..0d4ed6d2ce565d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -202,7 +202,7 @@ private static ConnectContext buildConnectContext() { return ctx; } - private static JdbcClient getJdbcClient(DataSourceType sourceType, Map properties) { + public static JdbcClient getJdbcClient(DataSourceType sourceType, Map properties) { JdbcClientConfig config = new JdbcClientConfig(); config.setCatalog(sourceType.name()); config.setUser(properties.get(DataSourceConfigKeys.USER)); @@ -346,8 +346,8 @@ private static List getColumns(JdbcClient jdbcClient, * The remoteDB implementation differs for each data source; * refer to the hierarchical mapping in the JDBC catalog. */ - private static String getRemoteDbName(DataSourceType sourceType, Map properties) - throws JobException { + public static String getRemoteDbName(DataSourceType sourceType, Map properties) + throws RuntimeException { String remoteDb = null; switch (sourceType) { case MYSQL: @@ -359,7 +359,7 @@ private static String getRemoteDbName(DataSourceType sourceType, Map arguments = getTVFProperties().getMap(); + return new CdcStreamTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build CdcStreamTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitPostgresCdc(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index ee1ccd76478bd0..63b9349cf1c01f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; @@ -81,6 +82,10 @@ default R visitHttp(Http http, C context) { return visitTableValuedFunction(http, context); } + default R visitPostgresCdc(CdcStream cdcStream, C context) { + return visitTableValuedFunction(cdcStream, context); + } + default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) { return visitTableValuedFunction(frontendsDisks, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java new file mode 100644 index 00000000000000..dbc3e930aee268 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private final Map originProps; + public CdcStreamTableValuedFunction(Map properties) throws AnalysisException { + this.originProps = properties; + processProps(properties); + validate(properties); + } + + private void processProps(Map properties) throws AnalysisException { + Map copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + + private void validate(Map properties) { + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); + } + + private void generateFileStatus() { + this.fileStatuses.clear(); + this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false)); + } + + @Override + public List getTableColumns() throws AnalysisException { + DataSourceType dataSourceType = DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase()); + JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(dataSourceType, processedParams); + String database = StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams); + String table = processedParams.get(DataSourceConfigKeys.TABLE); + boolean tableExist = jdbcClient.isTableExist(database, table); + Preconditions.checkArgument(tableExist, "Table does not exist: " + table); + return jdbcClient.getColumnsFromJdbc(database, table); + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_HTTP; + } + + @Override + public String getFilePath() { + return URI; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("CdcStreamTvfBroker", StorageType.HTTP, originProps); + } + + @Override + public String getTableName() { + return "CdcStreamTableValuedFunction"; + } + + @Override + public List getPathPartitionKeys() { + return new ArrayList<>(); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java index ff4056a8b5b3b1..176607f98683a7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java @@ -37,8 +37,8 @@ public class Env { private static final Logger LOG = LoggerFactory.getLogger(Env.class); private static volatile Env INSTANCE; - private final Map jobContexts; - private final Map jobLocks; + private final Map jobContexts; + private final Map jobLocks; @Setter private int backendHttpPort; private Env() { @@ -79,9 +79,9 @@ private DataSource resolveDataSource(String source) { } private SourceReader getOrCreateReader( - Long jobId, DataSource dataSource, Map config) { - Objects.requireNonNull(jobId, "jobId"); - Objects.requireNonNull(dataSource, "dataSource"); + String jobId, DataSource dataSource, Map config) { + Objects.requireNonNull(jobId, "jobId is null"); + Objects.requireNonNull(dataSource, "dataSource is null"); JobContext context = jobContexts.get(jobId); if (context != null) { return context.getReader(dataSource); @@ -106,7 +106,7 @@ private SourceReader getOrCreateReader( } } - public void close(Long jobId) { + public void close(String jobId) { Lock lock = jobLocks.get(jobId); if (lock != null) { lock.lock(); @@ -123,12 +123,12 @@ public void close(Long jobId) { } private static final class JobContext { - private final long jobId; + private final String jobId; private volatile SourceReader reader; private volatile Map config; private volatile DataSource dataSource; - private JobContext(long jobId, DataSource dataSource, Map config) { + private JobContext(String jobId, DataSource dataSource, Map config) { this.jobId = jobId; this.dataSource = dataSource; this.config = config; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java index 8b4883b6203183..ecde5c67779c4c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java @@ -17,6 +17,7 @@ package org.apache.doris.cdcclient.config; +import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.rest.RestResponse; import jakarta.servlet.http.HttpServletRequest; @@ -35,4 +36,10 @@ public Object exceptionHandler(HttpServletRequest request, Exception e) { log.error("Unexpected exception", e); return RestResponse.internalError(e.getMessage()); } + + @ExceptionHandler(StreamException.class) + public Object streamExceptionHandler(StreamException e) { + log.error("Exception in streaming response, re-throwing to client", e); + throw e; + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 2f444260559003..9df36325469805 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -32,10 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; @RestController public class ClientController { @@ -56,15 +58,15 @@ public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { } } - /** Fetch records from source reader */ - @RequestMapping(path = "/api/fetchRecords", method = RequestMethod.POST) - public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) { - try { - return RestResponse.success(pipelineCoordinator.fetchRecords(recordReq)); - } catch (Exception ex) { - LOG.error("Failed fetch record, jobId={}", recordReq.getJobId(), ex); - return RestResponse.internalError(ex.getMessage()); - } + @RequestMapping(path = "/api/fetchRecordStream", method = RequestMethod.POST) + public StreamingResponseBody fetchRecordStream(@RequestBody FetchRecordRequest recordReq) + throws Exception { + return pipelineCoordinator.fetchRecordStream(recordReq); + } + + @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.POST) + public Object getTaskIdOffset(@PathVariable String taskId) { + return RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId)); } /** Fetch records from source reader and Write records to backend */ diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java new file mode 100644 index 00000000000000..73f9b34fbcdbfc --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.exception; + +public class StreamException extends RuntimeException { + public StreamException() { + super(); + } + + public StreamException(String message) { + super(message); + } + + public StreamException(String message, Throwable cause) { + super(message, cause); + } + + public StreamException(Throwable cause) { + super(cause); + } + + protected StreamException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 591c4790e6ca42..0299b3da894133 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -18,20 +18,29 @@ package org.apache.doris.cdcclient.service; import org.apache.doris.cdcclient.common.Env; -import org.apache.doris.cdcclient.model.response.RecordWithMeta; +import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; +import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; import org.apache.doris.job.cdc.split.BinlogSplit; -import org.apache.doris.job.cdc.split.SnapshotSplit; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import io.debezium.data.Envelope; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -42,24 +51,20 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import io.debezium.data.Envelope; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - /** Pipeline coordinator. */ @Component public class PipelineCoordinator { private static final Logger LOG = LoggerFactory.getLogger(PipelineCoordinator.class); private static final String SPLIT_ID = "splitId"; // jobId - private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); + private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); + // taskId, offset + private final Map> taskOffsetCache = new ConcurrentHashMap<>(); private final ThreadPoolExecutor executor; private static final int MAX_CONCURRENT_TASKS = 10; private static final int QUEUE_CAPACITY = 128; private static ObjectMapper objectMapper = new ObjectMapper(); + private final byte[] LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8); public PipelineCoordinator() { this.executor = @@ -79,19 +84,54 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } - public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) throws Exception { - SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchRecordRequest); - SplitReadResult readResult = sourceReader.readSplitRecords(fetchRecordRequest); - return buildRecordResponse(sourceReader, fetchRecordRequest, readResult); + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchReq); + SplitReadResult readResult = sourceReader.readSplitRecords(fetchReq); + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; } - /** build RecordWithMeta */ - private RecordWithMeta buildRecordResponse( - SourceReader sourceReader, FetchRecordRequest fetchRecord, SplitReadResult readResult) + private Map generateMeta(Map cdcConfig) { + Map meta = new HashMap<>(); + String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET); + if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)) { + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } else { + throw new RuntimeException("Unsupported offset:" + offset); + } + return meta; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) throws Exception { - RecordWithMeta recordResponse = new RecordWithMeta(); SourceSplit split = readResult.getSplit(); - int count = 0; + Map lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); try { // Serialize records and add them to the response (collect from iterator) Iterator iterator = readResult.getRecordIterator(); @@ -99,60 +139,65 @@ private RecordWithMeta buildRecordResponse( SourceRecord element = iterator.next(); List serializedRecords = sourceReader.deserialize(fetchRecord.getConfig(), element); - if (!CollectionUtils.isEmpty(serializedRecords)) { - recordResponse.getRecords().addAll(serializedRecords); - count += serializedRecords.size(); - if (sourceReader.isBinlogSplit(split)) { - // put offset for event - Map lastMeta = - sourceReader.extractBinlogStateOffset(readResult.getSplitState()); - lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - recordResponse.setMeta(lastMeta); - } - if (count >= fetchRecord.getFetchSize()) { - return recordResponse; - } + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); } + rowCount += serializedRecords.size(); } + // force flush buffer + bos.flush(); } finally { sourceReader.finishSplitRecords(); } - if (readResult.getSplitState() != null) { - // Set meta information for hw + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + if (rowCount > 0) { if (sourceReader.isSnapshotSplit(split)) { - Map offsetRes = - sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); - offsetRes.put(SPLIT_ID, split.splitId()); - recordResponse.setMeta(offsetRes); - return recordResponse; - } - // set meta for binlog event - if (sourceReader.isBinlogSplit(split)) { - Map offsetRes = - sourceReader.extractBinlogStateOffset(readResult.getSplitState()); - offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + // Set meta information for hw + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } else if (sourceReader.isBinlogSplit(split)) { + // set meta for binlog event + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); } - } - - // no data in this split, set meta info - if (CollectionUtils.isEmpty(recordResponse.getRecords())) { + } else { + // no data in this split, set meta info if (sourceReader.isBinlogSplit(split)) { - Map offsetRes = - sourceReader.extractBinlogOffset(readResult.getSplit()); - offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - recordResponse.setMeta(offsetRes); + lastMeta = sourceReader.extractBinlogOffset(readResult.getSplit()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); } else { - SnapshotSplit snapshotSplit = - objectMapper.convertValue(fetchRecord.getMeta(), SnapshotSplit.class); - Map meta = new HashMap<>(); - meta.put(SPLIT_ID, snapshotSplit.getSplitId()); // chunk no data - recordResponse.setMeta(meta); + throw new RuntimeException("Chunk has no data," + split.splitId()); } } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); - return recordResponse; + if (!isLong(fetchRecord.getJobId())) { + // TVF requires closing the window after each execution, while PG requires dropping the + // slot. + sourceReader.close(fetchRecord); + } + } + + private boolean isLong(String s) { + if (s == null || s.isEmpty()) return false; + try { + Long.parseLong(s); + return true; + } catch (NumberFormatException e) { + return false; + } } public CompletableFuture writeRecordsAsync(WriteRecordRequest writeRecordRequest) { @@ -255,7 +300,9 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception batchStreamLoad.commitOffset(offsetRes, scannedRows, scannedBytes); return; } else { - throw new RuntimeException("should not happen"); + // chunk no data + throw new RuntimeException( + "Chunk has no data," + readResult.getSplit().splitId()); } } @@ -278,7 +325,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } } - private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targetDb) { + private DorisBatchStreamLoad getOrCreateBatchStreamLoad(String jobId, String targetDb) { return batchStreamLoadMap.computeIfAbsent( jobId, k -> { @@ -287,7 +334,7 @@ private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targe }); } - public void closeJobStreamLoad(Long jobId) { + public void closeJobStreamLoad(String jobId) { DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId); if (batchStreamLoad != null) { LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId); @@ -300,4 +347,8 @@ private String extractTable(SourceRecord record) { Struct value = (Struct) record.value(); return value.getStruct(Envelope.FieldName.SOURCE).getString("table"); } + + public Map getOffsetWithTaskId(String taskId) { + return taskOffsetCache.get(taskId); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java index bf6a4102801059..8e8ae13bb8d013 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java @@ -88,10 +88,10 @@ public class DorisBatchStreamLoad implements Serializable { private final Map bufferMapLock = new ConcurrentHashMap<>(); @Setter private String currentTaskId; private String targetDb; - private long jobId; + private String jobId; @Setter private String token; - public DorisBatchStreamLoad(long jobId, String targetDb) { + public DorisBatchStreamLoad(String jobId, String targetDb) { this.hostPort = Env.getCurrentEnv().getBackendHostPort(); this.flushQueue = new LinkedBlockingDeque<>(1); // maxBlockedBytes is two times of FLUSH_MAX_BYTE_SIZE @@ -299,9 +299,9 @@ private ReadWriteLock getLock(String bufferKey) { class LoadAsyncExecutor implements Runnable { private int flushQueueSize; - private long jobId; + private String jobId; - public LoadAsyncExecutor(int flushQueueSize, long jobId) { + public LoadAsyncExecutor(int flushQueueSize, String jobId) { this.flushQueueSize = flushQueueSize; this.jobId = jobId; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index f9e11f6b029aa3..49bbba848217d1 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -97,7 +97,7 @@ public JdbcIncrementalSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -164,14 +164,12 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); if (currentSplitRecords == null) { Fetcher currentReader = this.getCurrentReader(); - if (currentReader == null || baseReq.isReload()) { - LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); + if (currentReader == null) { + LOG.info("No current reader , create new split reader"); // build split Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); split = splitFlag.f0; - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -615,6 +613,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } private Map getTableSchemas(JobBaseConfig config) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java index d5feeef45a32c8..bf84cef505bf0a 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java @@ -36,7 +36,7 @@ public interface SourceReader { String SPLIT_ID = "splitId"; /** Initialization, called when the program starts */ - void initialize(long jobId, DataSource dataSource, Map config); + void initialize(String jobId, DataSource dataSource, Map config); /** Divide the data to be read. For example: split mysql to chunks */ List getSourceSplits(FetchTableSplitsRequest config); @@ -81,5 +81,5 @@ public interface SourceReader { * Commits the given offset with the source database. Used by some source like Postgres to * indicate how far the source TX log can be discarded. */ - default void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {} + default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {} } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 27fbf3be88b363..a22ff3be4de863 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -70,7 +70,6 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -120,7 +119,7 @@ public MySqlSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -180,15 +179,13 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); if (currentSplitRecords == null) { DebeziumReader currentReader = this.getCurrentReader(); - if (currentReader == null || baseReq.isReload()) { - LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); + if (currentReader == null) { + LOG.info("No current reader, create new split reader"); // build split Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); split = splitFlag.f0; // reset binlog reader - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -598,12 +595,11 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str configFactory.includeSchemaChanges(false); - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> databaseName + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); + // Set table list + String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig); + com.google.common.base.Preconditions.checkArgument( + tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // setting startMode String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); @@ -718,6 +714,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } @Override diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 3e94700f2b464f..ea5eaf355b756d 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -25,7 +25,6 @@ import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.JobBaseConfig; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; @@ -50,7 +49,6 @@ import org.apache.flink.table.types.DataType; import java.time.Instant; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,7 +80,7 @@ public PostgresSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId); PostgresDialect dialect = new PostgresDialect(sourceConfig); LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); @@ -133,7 +131,8 @@ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) { } /** Generate PostgreSQL source config from Map config */ - private PostgresSourceConfig generatePostgresConfig(Map cdcConfig, Long jobId) { + private PostgresSourceConfig generatePostgresConfig( + Map cdcConfig, String jobId) { PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory(); // Parse JDBC URL to extract connection info @@ -163,19 +162,15 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi configFactory.includeSchemaChanges(false); // Set table list - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - if (StringUtils.isNotEmpty(includingTables)) { - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> schema + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); - } + String[] tableList = ConfigUtil.getTableList(schema, cdcConfig); + Preconditions.checkArgument(tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // Set startup options String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { - configFactory.startupOptions(StartupOptions.initial()); + // do not need set offset when initial + // configFactory.startupOptions(StartupOptions.initial()); } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.earliest()); } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { @@ -208,7 +203,7 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi return configFactory.create(0); } - private String getSlotName(Long jobId) { + private String getSlotName(String jobId) { return "doris_cdc_" + jobId; } @@ -340,7 +335,7 @@ protected FetchTask createFetchTaskFromSplit( * `CommitFeOffset` fails, Data after the startOffset will not be cleared. */ @Override - public void commitSourceOffset(Long jobId, SourceSplit sourceSplit) { + public void commitSourceOffset(String jobId, SourceSplit sourceSplit) { try { if (sourceSplit instanceof StreamSplit) { Offset offsetToCommit = ((StreamSplit) sourceSplit).getStartingOffset(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index b8503adf7b5302..305b08a12b9d38 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -17,9 +17,12 @@ package org.apache.doris.cdcclient.utils; +import org.apache.doris.job.cdc.DataSourceConfigKeys; + import org.apache.commons.lang3.StringUtils; import java.time.ZoneId; +import java.util.Arrays; import java.util.Map; import com.fasterxml.jackson.core.JsonProcessingException; @@ -35,8 +38,8 @@ public class ConfigUtil { private static ObjectMapper objectMapper = new ObjectMapper(); private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); - public static String getServerId(long jobId) { - return String.valueOf(Math.abs(String.valueOf(jobId).hashCode())); + public static String getServerId(String jobId) { + return String.valueOf(Math.abs(jobId.hashCode())); } public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) { @@ -93,6 +96,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static String[] getTableList(String schema, Map cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only support one table"); + return new String[] {schema + "." + table.trim()}; + } else { + return new String[0]; + } + } + public static boolean is13Timestamp(String s) { return s != null && s.matches("\\d{13}"); } From 6daeb9f00080299a6183b537b072875c0ac0b2a7 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 20 Jan 2026 17:53:39 +0800 Subject: [PATCH 2/8] fix cdc tvf --- .../offset/jdbc/JdbcSourceOffsetProvider.java | 6 +- .../controller/ClientController.java | 11 ++ .../service/PipelineCoordinator.java | 152 ++++++++++++++++-- .../reader/JdbcIncrementalSourceReader.java | 46 ++---- .../reader/mysql/MySqlSourceReader.java | 56 ++----- .../reader/postgres/PostgresSourceReader.java | 22 +-- .../doris/cdcclient/utils/ConfigUtil.java | 13 +- 7 files changed, 196 insertions(+), 110 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 51f3c07aa41bf0..dc288af1b101c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -185,7 +185,7 @@ public void updateOffset(Offset offset) { @Override public void fetchRemoteMeta(Map properties) throws Exception { Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/fetchEndOffset") .setParams(new Gson().toJson(requestParams)).build(); @@ -512,7 +512,7 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { */ private void initSourceReader() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/initReader") .setParams(new Gson().toJson(requestParams)).build(); @@ -559,7 +559,7 @@ public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/close") .setParams(new Gson().toJson(requestParams)).build(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index a4c4f74eb7e43f..bb2e56cca261dd 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -72,6 +72,17 @@ public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { } } + /** Fetch records from source reader, for debug */ + @RequestMapping(path = "/api/fetchRecords", method = RequestMethod.POST) + public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) { + try { + return RestResponse.success(pipelineCoordinator.fetchRecords(recordReq)); + } catch (Exception ex) { + LOG.error("Failed fetch record, jobId={}", recordReq.getJobId(), ex); + return RestResponse.internalError(ex.getMessage()); + } + } + @RequestMapping(path = "/api/fetchRecordStream", method = RequestMethod.POST) public StreamingResponseBody fetchRecordStream(@RequestBody FetchRecordRequest recordReq) throws Exception { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 3245e00d4ccb74..20e5fcb8cc5344 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -18,20 +18,26 @@ package org.apache.doris.cdcclient.service; import org.apache.doris.cdcclient.common.Env; +import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.response.RecordWithMeta; import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; +import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; import org.apache.doris.job.cdc.split.BinlogSplit; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -48,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; /** Pipeline coordinator. */ @Component @@ -55,13 +62,16 @@ public class PipelineCoordinator { private static final Logger LOG = LoggerFactory.getLogger(PipelineCoordinator.class); private static final String SPLIT_ID = "splitId"; // jobId - private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); + private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); // taskId -> writeFailReason private final Map taskErrorMaps = new ConcurrentHashMap<>(); + // taskId, offset + private final Map> taskOffsetCache = new ConcurrentHashMap<>(); private final ThreadPoolExecutor executor; private static final int MAX_CONCURRENT_TASKS = 10; private static final int QUEUE_CAPACITY = 128; private static ObjectMapper objectMapper = new ObjectMapper(); + private final byte[] LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8); public PipelineCoordinator() { this.executor = @@ -81,19 +91,139 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchReq); + SplitReadResult readResult = sourceReader.readSplitRecords(fetchReq); + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Serialize records and add them to the response (collect from iterator) + Iterator iterator = readResult.getRecordIterator(); + while (iterator != null && iterator.hasNext()) { + SourceRecord element = iterator.next(); + List serializedRecords = + sourceReader.deserialize(fetchRecord.getConfig(), element); + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); + } + rowCount += serializedRecords.size(); + } + // force flush buffer + bos.flush(); + } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + sourceReader.finishSplitRecords(); + } + + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(split)) { + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } + + // set meta for binlog event + if (sourceReader.isBinlogSplit(split)) { + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } + } else { + throw new RuntimeException("split state is null"); + } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + if (!isLong(fetchRecord.getJobId())) { + // TVF requires closing the window after each execution, + // while PG requires dropping the slot. + sourceReader.close(fetchRecord); + } + } + + private boolean isLong(String s) { + if (s == null || s.isEmpty()) return false; + try { + Long.parseLong(s); + return true; + } catch (NumberFormatException e) { + return false; + } + } + + private Map generateMeta(Map cdcConfig) { + Map meta = new HashMap<>(); + String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET); + if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)) { + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } else { + throw new RuntimeException("Unsupported offset:" + offset); + } + return meta; + } + + /** pull data from api for test */ public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) throws Exception { SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchRecordRequest); SplitReadResult readResult = sourceReader.readSplitRecords(fetchRecordRequest); return buildRecordResponse(sourceReader, fetchRecordRequest, readResult); } - /** build RecordWithMeta */ + /** build RecordWithMeta for test */ private RecordWithMeta buildRecordResponse( SourceReader sourceReader, FetchRecordRequest fetchRecord, SplitReadResult readResult) throws Exception { RecordWithMeta recordResponse = new RecordWithMeta(); SourceSplit split = readResult.getSplit(); - int count = 0; try { // Serialize records and add them to the response (collect from iterator) Iterator iterator = readResult.getRecordIterator(); @@ -103,14 +233,6 @@ private RecordWithMeta buildRecordResponse( sourceReader.deserialize(fetchRecord.getConfig(), element); if (!CollectionUtils.isEmpty(serializedRecords)) { recordResponse.getRecords().addAll(serializedRecords); - count += serializedRecords.size(); - if (sourceReader.isBinlogSplit(split)) { - // put offset for event - Map lastMeta = - sourceReader.extractBinlogStateOffset(readResult.getSplitState()); - lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - recordResponse.setMeta(lastMeta); - } } } } finally { @@ -271,7 +393,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } } - private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targetDb) { + private DorisBatchStreamLoad getOrCreateBatchStreamLoad(String jobId, String targetDb) { return batchStreamLoadMap.computeIfAbsent( jobId, k -> { @@ -280,7 +402,7 @@ private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targe }); } - public void closeJobStreamLoad(Long jobId) { + public void closeJobStreamLoad(String jobId) { DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId); if (batchStreamLoad != null) { LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId); @@ -298,4 +420,8 @@ public String getTaskFailReason(String taskId) { String taskReason = taskErrorMaps.remove(taskId); return taskReason == null ? "" : taskReason; } + + public Map getOffsetWithTaskId(String taskId) { + return taskOffsetCache.get(taskId); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 5b404932c3f2d7..6e5380c29487ef 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -95,7 +95,7 @@ public JdbcIncrementalSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -154,39 +154,17 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/stream split based on offset and start the reader. - SourceSplitBase split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - Fetcher currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader for job {}", - baseReq.isReload(), - baseReq.getJobId()); - // build split - Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof IncrementalSourceStreamFetcher) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/stream split based on offset and start the reader. + LOG.info( + "Create new split reader for job {} with offset {}", + baseReq.getJobId(), + offsetMeta); + // build split + Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); + SourceSplitBase split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; // build response with iterator SplitReadResult result = new SplitReadResult(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index d20225c0974cbb..d2bab90c242039 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -70,7 +70,6 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -118,7 +117,7 @@ public MySqlSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -170,39 +169,14 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/binlog split based on offset and start the reader. - MySqlSplit split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - DebeziumReader currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); - // build split - Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // reset binlog reader - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof BinlogSplitReader) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/binlog split based on offset and start the reader. + LOG.info("create new split reader for {} with offset {}", baseReq.getJobId(), offsetMeta); + // build split + Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); + MySqlSplit split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; // build response with iterator SplitReadResult result = new SplitReadResult(); @@ -449,6 +423,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j return new SplitRecords(currentSplitId, sourceRecords.iterator()); } + /** Poll data from the current reader, only for binlog reader */ private SplitRecords pollSplitRecordsWithCurrentReader( DebeziumReader currentReader) throws Exception { Iterator dataIt = null; @@ -568,12 +543,11 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str configFactory.includeSchemaChanges(false); - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> databaseName + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); + // Set table list + String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig); + com.google.common.base.Preconditions.checkArgument( + tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // setting startMode String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index c1e062df5a6b34..d1f3cc26d0818c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -25,7 +25,6 @@ import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.JobBaseConfig; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; @@ -51,7 +50,6 @@ import org.apache.flink.table.types.DataType; import java.time.Instant; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,7 +81,7 @@ public PostgresSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId); PostgresDialect dialect = new PostgresDialect(sourceConfig); LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); @@ -137,7 +135,8 @@ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) { } /** Generate PostgreSQL source config from Map config */ - private PostgresSourceConfig generatePostgresConfig(Map cdcConfig, Long jobId) { + private PostgresSourceConfig generatePostgresConfig( + Map cdcConfig, String jobId) { PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory(); // Parse JDBC URL to extract connection info @@ -167,14 +166,9 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi configFactory.includeSchemaChanges(false); // Set table list - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - if (StringUtils.isNotEmpty(includingTables)) { - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> schema + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); - } + String[] tableList = ConfigUtil.getTableList(schema, cdcConfig); + Preconditions.checkArgument(tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // Set startup options String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); @@ -212,7 +206,7 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi return configFactory.create(0); } - private String getSlotName(Long jobId) { + private String getSlotName(String jobId) { return "doris_cdc_" + jobId; } @@ -338,7 +332,7 @@ protected FetchTask createFetchTaskFromSplit( * `CommitFeOffset` fails, Data after the startOffset will not be cleared. */ @Override - public void commitSourceOffset(Long jobId, SourceSplit sourceSplit) { + public void commitSourceOffset(String jobId, SourceSplit sourceSplit) { try { if (sourceSplit instanceof StreamSplit) { Offset offsetToCommit = ((StreamSplit) sourceSplit).getStartingOffset(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index f1e2585fa0e5ae..e6e0a9f039bc11 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -19,19 +19,22 @@ import org.apache.doris.cdcclient.common.Constants; import org.apache.doris.job.cdc.DataSourceConfigKeys; + +import org.apache.commons.lang3.StringUtils; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.mysql.cj.conf.ConnectionUrl; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.ZoneId; -import java.util.Arrays; -import java.util.Map; -import java.util.Properties; public class ConfigUtil { private static ObjectMapper objectMapper = new ObjectMapper(); From e2c178d26b5ff1422d1a8402a74d2dbf01c2285d Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 21 Jan 2026 17:16:28 +0800 Subject: [PATCH 3/8] fix tvf --- be/src/io/fs/http_file_reader.cpp | 15 ++++ .../catalog/BuiltinTableValuedFunctions.java | 2 +- .../functions/table/CdcStream.java | 4 + .../visitor/TableValuedFunctionVisitor.java | 2 +- .../CdcStreamTableValuedFunction.java | 6 +- .../config/GlobalExceptionHandler.java | 10 +++ .../cdcclient/exception/CommonException.java | 44 ++++++++++ .../service/PipelineCoordinator.java | 45 +++++++--- .../reader/mysql/MySqlSourceReader.java | 12 ++- .../reader/postgres/PostgresSourceReader.java | 6 -- .../cdc/tvf/test_cdc_stream_tvf.out | 5 ++ .../cdc/tvf/test_cdc_stream_tvf.groovy | 82 +++++++++++++++++++ 12 files changed, 208 insertions(+), 25 deletions(-) create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java create mode 100644 regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index 2f8f24169857ee..220718f4e259e9 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -335,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r long http_status = _client->get_http_status(); VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size(); + // Check for HTTP error status codes (4xx, 5xx) + if (http_status >= 400) { + std::string error_body; + if (buf.empty()) { + error_body = "(empty response body)"; + } else { + // Limit error message to 1024 bytes to avoid excessive logging + size_t max_len = std::min(buf.size(), static_cast(1024)); + error_body = buf.substr(0, max_len); + } + + return Status::InternalError( + "HTTP request failed with status {}: {}.",http_status, error_body); + } + if (buf.empty()) { *bytes_read = buffer_offset; return Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index c51274fc03153c..8936d3e6287ea7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.File; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; @@ -39,7 +40,6 @@ import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; -import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java index 8afaa64b15d9d6..32937c780f5b9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java @@ -24,8 +24,12 @@ import org.apache.doris.nereids.types.coercion.AnyDataType; import org.apache.doris.tablefunction.CdcStreamTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; + import java.util.Map; +/** + * CdcStream TVF. + */ public class CdcStream extends TableValuedFunction { public CdcStream(Properties tvfProperties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 63b9349cf1c01f..620d8d53248604 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.File; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; @@ -36,7 +37,6 @@ import org.apache.doris.nereids.trees.expressions.functions.table.ParquetMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; -import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.Query; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index dbc3e930aee268..abc4ca41db736c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -28,8 +28,10 @@ import org.apache.doris.job.util.StreamingJobUtils; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileType; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -41,6 +43,7 @@ public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunctio private static final ObjectMapper objectMapper = new ObjectMapper(); private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; private final Map originProps; + public CdcStreamTableValuedFunction(Map properties) throws AnalysisException { this.originProps = properties; processProps(properties); @@ -89,7 +92,8 @@ private void generateFileStatus() { @Override public List getTableColumns() throws AnalysisException { - DataSourceType dataSourceType = DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase()); + DataSourceType dataSourceType = + DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase()); JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(dataSourceType, processedParams); String database = StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams); String table = processedParams.get(DataSourceConfigKeys.TABLE); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java index ecde5c67779c4c..4c719f0f9bb129 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java @@ -17,11 +17,14 @@ package org.apache.doris.cdcclient.config; +import org.apache.doris.cdcclient.exception.CommonException; import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.rest.RestResponse; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; @@ -39,7 +42,14 @@ public Object exceptionHandler(HttpServletRequest request, Exception e) { @ExceptionHandler(StreamException.class) public Object streamExceptionHandler(StreamException e) { + // Directly throwing an exception allows curl to detect anomalies in the streaming response. log.error("Exception in streaming response, re-throwing to client", e); throw e; } + + @ExceptionHandler(CommonException.class) + public Object commonExceptionHandler(CommonException e) { + log.error("Unexpected common exception", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage()); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java new file mode 100644 index 00000000000000..b27afe56dfe780 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.exception; + +public class CommonException extends RuntimeException { + public CommonException() { + super(); + } + + public CommonException(String message) { + super(message); + } + + public CommonException(String message, Throwable cause) { + super(message, cause); + } + + public CommonException(Throwable cause) { + super(cause); + } + + protected CommonException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 20e5fcb8cc5344..9390a07ad2ef5b 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -18,11 +18,13 @@ package org.apache.doris.cdcclient.service; import org.apache.doris.cdcclient.common.Env; +import org.apache.doris.cdcclient.exception.CommonException; import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.response.RecordWithMeta; import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; +import org.apache.doris.cdcclient.utils.ConfigUtil; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; @@ -48,6 +50,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import io.debezium.data.Envelope; @@ -93,18 +97,25 @@ public PipelineCoordinator() { /** return data for http_file_reader */ public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { - if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { - LOG.info( - "Generate initial meta for fetch record request, jobId={}, taskId={}", - fetchReq.getJobId(), - fetchReq.getTaskId()); - // means the request did not originate from the job, only tvf - Map meta = generateMeta(fetchReq.getConfig()); - fetchReq.setMeta(meta); + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.readSplitRecords(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); } - SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchReq); - SplitReadResult readResult = sourceReader.readSplitRecords(fetchReq); return outputStream -> { try { buildRecords(sourceReader, fetchReq, readResult, outputStream); @@ -200,13 +211,21 @@ private boolean isLong(String s) { } } - private Map generateMeta(Map cdcConfig) { + /** Generate split meta from request.offset */ + private Map generateMeta(Map cdcConfig) + throws JsonProcessingException { Map meta = new HashMap<>(); String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET); - if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)) { + if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) { + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } else if (ConfigUtil.isJson(offset)) { + Map startOffset = + objectMapper.readValue(offset, new TypeReference<>() {}); meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + meta.put("startingOffset", startOffset); } else { - throw new RuntimeException("Unsupported offset:" + offset); + throw new RuntimeException("Unsupported offset: " + offset); } return meta; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index d2bab90c242039..025cd5eac67824 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -552,8 +552,7 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str // setting startMode String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { - // do not need set offset when initial - // configFactory.startupOptions(StartupOptions.initial()); + configFactory.startupOptions(StartupOptions.initial()); } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.earliest()); BinlogOffset binlogOffset = @@ -573,7 +572,14 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str } if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY) && offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) { - BinlogOffset binlogOffset = new BinlogOffset(offsetMap); + BinlogOffset binlogOffset = + BinlogOffset.builder() + .setBinlogFilePosition( + offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY), + Long.parseLong( + offsetMap.get( + BinlogOffset.BINLOG_POSITION_OFFSET_KEY))) + .build(); configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset)); } else { throw new RuntimeException("Incorrect offset " + startupMode); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index d1f3cc26d0818c..966f4e58f104d7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -174,16 +174,10 @@ private PostgresSourceConfig generatePostgresConfig( String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.initial()); - } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { - configFactory.startupOptions(StartupOptions.earliest()); } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { throw new RuntimeException("Unsupported json offset " + startupMode); - } else if (ConfigUtil.is13Timestamp(startupMode)) { - // start from timestamp - Long ts = Long.parseLong(startupMode); - configFactory.startupOptions(StartupOptions.timestamp(ts)); } else { throw new RuntimeException("Unknown offset " + startupMode); } diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out new file mode 100644 index 00000000000000..fcb9b28f1514d0 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_tvf -- +C1 3 +D1 4 + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy new file mode 100644 index 00000000000000..01fbb66ca32662 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_cdc_stream_tvf" + def mysqlDb = "test_cdc_db" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + def offset = "" + + // create test + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2);""" + + def result = sql_return_maparray "show master status" + def file = result[0]["File"] + def position = result[0]["Position"] + offset = """{"file":"${file}","pos":"${position}"}""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('D1', 4);""" + } + + log.info("offset: " + offset) + qt_select_tvf """select * from cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${table1}", + "offset" = '${offset}' + ) + """ + + test { + sql """ + select * from cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${table1}", + "offset" = 'initial') + """ + exception "Unsupported offset: initial" + } + } +} From b7c3c9bc6cb08b3e2757206269c33995fd8400cb Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 21 Jan 2026 17:44:38 +0800 Subject: [PATCH 4/8] fix --- ..._tvf.out => test_cdc_stream_tvf_mysql.out} | 0 ...roovy => test_cdc_stream_tvf_mysql.groovy} | 2 +- .../tvf/test_cdc_stream_tvf_postgres.groovy | 81 +++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) rename regression-test/data/job_p0/streaming_job/cdc/tvf/{test_cdc_stream_tvf.out => test_cdc_stream_tvf_mysql.out} (100%) rename regression-test/suites/job_p0/streaming_job/cdc/tvf/{test_cdc_stream_tvf.groovy => test_cdc_stream_tvf_mysql.groovy} (97%) create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out similarity index 100% rename from regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.out rename to regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy similarity index 97% rename from regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy rename to regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy index 01fbb66ca32662..6f1190da0176fb 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_cdc_stream_tvf", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { +suite("test_cdc_stream_tvf_mysql", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { def currentDb = (sql "select database()")[0][0] def table1 = "user_info_cdc_stream_tvf" def mysqlDb = "test_cdc_db" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy new file mode 100644 index 00000000000000..2d383390834b98 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "postgres" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // create test + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "name" varchar(200), + "age" int2, + PRIMARY KEY ("name") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('B1', 2);""" + } + + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'initial') + """ + exception "Unsupported offset: initial" + } + + // Here, because PG consumption requires creating a slot first, + // we only verify whether the execution can be successful. + def result = sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'latest') + """ + log.info("result:", result) + } +} From d84fcddbe4aa9142e912da69ab6bafe20075ca2e Mon Sep 17 00:00:00 2001 From: JNSimba Date: Wed, 21 Jan 2026 17:56:22 +0800 Subject: [PATCH 5/8] clang format --- be/src/io/fs/http_file_reader.cpp | 6 +++--- be/src/io/fs/http_file_reader.h | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index 220718f4e259e9..fe90a29a185909 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -345,9 +345,9 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r size_t max_len = std::min(buf.size(), static_cast(1024)); error_body = buf.substr(0, max_len); } - - return Status::InternalError( - "HTTP request failed with status {}: {}.",http_status, error_body); + + return Status::InternalError("HTTP request failed with status {}: {}.", http_status, + error_body); } if (buf.empty()) { diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 08c97eb5b19d97..9c83370dea47bc 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -82,10 +82,10 @@ class HttpFileReader final : public FileReader { // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required size_t _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE; // Max size for non-Range downloads - + // Full file cache for non-Range mode to avoid repeated downloads - std::string _full_file_cache; // Cache complete file content - bool _full_file_cached = false; // Whether full file has been cached + std::string _full_file_cache; // Cache complete file content + bool _full_file_cached = false; // Whether full file has been cached bool _enable_chunk_response = false; // Whether server returns chunk streaming response }; From 6a53b0c7a666d1341c219b822b915ca3435709d4 Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 22 Jan 2026 10:06:02 +0800 Subject: [PATCH 6/8] fix --- be/src/io/fs/http_file_reader.h | 7 ++++--- .../trees/expressions/functions/table/CdcStream.java | 2 +- .../expressions/visitor/TableValuedFunctionVisitor.java | 2 +- .../doris/cdcclient/service/PipelineCoordinator.java | 1 - .../java/org/apache/doris/cdcclient/utils/ConfigUtil.java | 2 +- .../cdc/tvf/test_cdc_stream_tvf_postgres.groovy | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 9c83370dea47bc..b4cf31b56d1d44 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -82,10 +82,11 @@ class HttpFileReader final : public FileReader { // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required size_t _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE; // Max size for non-Range downloads - + // Full file cache for non-Range mode to avoid repeated downloads - std::string _full_file_cache; // Cache complete file content - bool _full_file_cached = false; // Whether full file has been cached + std::string _full_file_cache; // Cache complete file content + bool _full_file_cached = false; // Whether full file has been cached + bool _enable_chunk_response = false; // Whether server returns chunk streaming response }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java index 32937c780f5b9f..3a6cdaf61da965 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java @@ -54,6 +54,6 @@ protected TableValuedFunctionIf toCatalogFunction() { @Override public R accept(ExpressionVisitor visitor, C context) { - return visitor.visitPostgresCdc(this, context); + return visitor.visitCdcStream(this, context); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 620d8d53248604..7b5e5654585bf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -82,7 +82,7 @@ default R visitHttp(Http http, C context) { return visitTableValuedFunction(http, context); } - default R visitPostgresCdc(CdcStream cdcStream, C context) { + default R visitCdcStream(CdcStream cdcStream, C context) { return visitTableValuedFunction(cdcStream, context); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 9390a07ad2ef5b..8e9ed62e099eeb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -193,7 +193,6 @@ private void buildRecords( taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); } - sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); if (!isLong(fetchRecord.getJobId())) { // TVF requires closing the window after each execution, // while PG requires dropping the slot. diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index e6e0a9f039bc11..50588deee8fc55 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -106,7 +106,7 @@ public static String[] getTableList(String schema, Map cdcConfig .map(t -> schema + "." + t.trim()) .toArray(String[]::new); } else if (StringUtils.isNotEmpty(table)) { - Preconditions.checkArgument(!table.contains(","), "table only support one table"); + Preconditions.checkArgument(!table.contains(","), "table only supports one table"); return new String[] {schema + "." + table.trim()}; } else { return new String[0]; diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy index 2d383390834b98..f9a7befdcca1de 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy @@ -21,7 +21,7 @@ suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_d def pgDB = "postgres" def pgSchema = "cdc_test" def pgUser = "postgres" - def pgPassword = "postgres" + def pgPassword = "123456" String enabled = context.config.otherConfigs.get("enableJdbcTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { From b56d1d11f9b7fee728fc06d90ddd84506f443adf Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 22 Jan 2026 10:56:56 +0800 Subject: [PATCH 7/8] fix --- be/src/io/fs/http_file_reader.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index b4cf31b56d1d44..acf0022adfe540 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -82,11 +82,11 @@ class HttpFileReader final : public FileReader { // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required size_t _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE; // Max size for non-Range downloads - + // Full file cache for non-Range mode to avoid repeated downloads std::string _full_file_cache; // Cache complete file content bool _full_file_cached = false; // Whether full file has been cached - + bool _enable_chunk_response = false; // Whether server returns chunk streaming response }; From e02811eaefda6a3a1631ee0683594c271015510b Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 22 Jan 2026 14:35:42 +0800 Subject: [PATCH 8/8] fix --- .../tablefunction/CdcStreamTableValuedFunction.java | 3 ++- .../doris/cdcclient/controller/ClientController.java | 10 +++++----- .../doris/cdcclient/service/PipelineCoordinator.java | 8 ++++++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index abc4ca41db736c..896300385dfa35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -41,7 +41,7 @@ public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { private static final ObjectMapper objectMapper = new ObjectMapper(); - private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private static final String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; private final Map originProps; public CdcStreamTableValuedFunction(Map properties) throws AnalysisException { @@ -83,6 +83,7 @@ private void validate(Map properties) { Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required"); Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required"); Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET), "offset is required"); } private void generateFileStatus() { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index bb2e56cca261dd..bc9474bd90f41e 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -89,11 +89,6 @@ public StreamingResponseBody fetchRecordStream(@RequestBody FetchRecordRequest r return pipelineCoordinator.fetchRecordStream(recordReq); } - @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.POST) - public Object getTaskIdOffset(@PathVariable String taskId) { - return RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId)); - } - /** Fetch records from source reader and Write records to backend */ @RequestMapping(path = "/api/writeRecords", method = RequestMethod.POST) public Object writeRecord(@RequestBody WriteRecordRequest recordReq) { @@ -138,4 +133,9 @@ public Object close(@RequestBody JobBaseConfig jobConfig) { public Object getFailReason(@PathVariable("taskId") String taskId) { return RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId)); } + + @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.POST) + public Object getTaskIdOffset(@PathVariable String taskId) { + return RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId)); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 8e9ed62e099eeb..241fa6ea81cc51 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -210,7 +210,10 @@ private boolean isLong(String s) { } } - /** Generate split meta from request.offset */ + /** + * Generate split meta from request.offset. This only applies to TVF, so initial is not + * supported because initial requires a job to obtain split information. + */ private Map generateMeta(Map cdcConfig) throws JsonProcessingException { Map meta = new HashMap<>(); @@ -440,6 +443,7 @@ public String getTaskFailReason(String taskId) { } public Map getOffsetWithTaskId(String taskId) { - return taskOffsetCache.get(taskId); + Map taskOffset = taskOffsetCache.remove(taskId); + return taskOffset == null ? new HashMap<>() : taskOffset; } }