diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index f9fab96ffb203e..fe90a29a185909 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -22,7 +22,11 @@ #include +#include "common/config.h" #include "common/logging.h" +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cdc_client_mgr.h" +#include "runtime/exec_env.h" namespace doris::io { @@ -83,6 +87,14 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url) } } + // Parse chunk response configuration + auto chunk_iter = _extend_kv.find("http.chunk.response"); + if (chunk_iter != _extend_kv.end()) { + std::string value = chunk_iter->second; + std::transform(value.begin(), value.end(), value.begin(), ::tolower); + _enable_chunk_response = (value == "true" || value == "1"); + } + _read_buffer = std::make_unique(READ_BUFFER_SIZE); } @@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) { return Status::OK(); } - // Step 1: HEAD request to get file metadata - RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); - _client->set_method(HttpMethod::HEAD); - RETURN_IF_ERROR(_client->execute()); + // start CDC client + auto enable_cdc_iter = _extend_kv.find("enable_cdc_client"); + if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") { + LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url; + ExecEnv* env = ExecEnv::GetInstance(); + if (env == nullptr || env->cdc_client_mgr() == nullptr) { + return Status::InternalError("ExecEnv or CdcClientMgr is not initialized"); + } + + PRequestCdcClientResult result; + Status start_st = env->cdc_client_mgr()->start_cdc_client(&result); + if (!start_st.ok()) { + LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); + return start_st; + } + + _url = fmt::format(_url, doris::config::cdc_client_port); + _range_supported = false; + LOG(INFO) << "CDC client started successfully for " << _url; + } + + // Step 1: HEAD request to get file metadata (skip for chunk response) + if (_enable_chunk_response) { + // Chunk streaming response, skip HEAD request + _size_known = false; + _range_supported = false; + LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url; + } else { + // Normal mode: execute HEAD request to get file metadata + RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); + _client->set_method(HttpMethod::HEAD); + RETURN_IF_ERROR(_client->execute()); - uint64_t content_length = 0; - RETURN_IF_ERROR(_client->get_content_length(&content_length)); + uint64_t content_length = 0; + RETURN_IF_ERROR(_client->get_content_length(&content_length)); - _file_size = content_length; - _size_known = true; + _file_size = content_length; + _size_known = true; + } // Step 2: Check if Range request is disabled by configuration - if (!_enable_range_request) { - // User explicitly disabled Range requests, use non-Range mode directly + if (!_enable_range_request || _enable_chunk_response) { + // User explicitly disabled Range requests or chunk response mode, use non-Range mode _range_supported = false; - LOG(INFO) << "Range requests disabled by configuration for " << _url - << ", using non-Range mode. File size: " << _file_size << " bytes"; - - // Check if file size exceeds limit for non-Range mode - if (_file_size > _max_request_size_bytes) { - return Status::InternalError( - "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, " - "configured by http.max.request.size.bytes). URL: {}", - _file_size, _max_request_size_bytes, _url); + LOG(INFO) << "Range requests disabled for " << _url << " (config=" << !_enable_range_request + << ", chunk_response=" << _enable_chunk_response << ")"; + + // Skip file size check for chunk response (size unknown at this point) + if (_enable_chunk_response) { + LOG(INFO) << "Chunk response mode, file size check skipped for " << _url; + } else { + // Check if file size exceeds limit for non-Range mode (non-chunk only) + if (_file_size > _max_request_size_bytes) { + return Status::InternalError( + "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} " + "bytes, " + "configured by http.max.request.size.bytes). URL: {}", + _file_size, _max_request_size_bytes, _url); + } + LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size + << " bytes, max allowed: " << _max_request_size_bytes << " bytes"; } - - LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size - << " bytes, max allowed: " << _max_request_size_bytes << " bytes"; } else { // Step 3: Range request is enabled (default), detect Range support VLOG(1) << "Detecting Range support for URL: " << _url; @@ -223,9 +269,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len << " with_range=" << _range_supported; - // Prepare and initialize the HTTP client for GET request + // Prepare and initialize the HTTP client for request RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false)); - _client->set_method(HttpMethod::GET); + + // Determine HTTP method from configuration (default: GET) + HttpMethod method = HttpMethod::GET; + auto method_iter = _extend_kv.find("http.method"); + if (method_iter != _extend_kv.end()) { + method = to_http_method(method_iter->second.c_str()); + if (method == HttpMethod::UNKNOWN) { + LOG(WARNING) << "Invalid http.method value: " << method_iter->second + << ", falling back to GET"; + method = HttpMethod::GET; + } + } + _client->set_method(method); + + // Set payload if configured (supports POST, PUT, DELETE, etc.) + auto payload_iter = _extend_kv.find("http.payload"); + if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) { + _client->set_payload(payload_iter->second); + _client->set_content_type("application/json"); + VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size(); + } _client->set_header("Expect", ""); _client->set_header("Connection", "close"); @@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r long http_status = _client->get_http_status(); VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size(); + // Check for HTTP error status codes (4xx, 5xx) + if (http_status >= 400) { + std::string error_body; + if (buf.empty()) { + error_body = "(empty response body)"; + } else { + // Limit error message to 1024 bytes to avoid excessive logging + size_t max_len = std::min(buf.size(), static_cast(1024)); + error_body = buf.substr(0, max_len); + } + + return Status::InternalError("HTTP request failed with status {}: {}.", http_status, + error_body); + } + if (buf.empty()) { *bytes_read = buffer_offset; return Status::OK(); diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 6f4b68dce8b035..acf0022adfe540 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -86,6 +86,8 @@ class HttpFileReader final : public FileReader { // Full file cache for non-Range mode to avoid repeated downloads std::string _full_file_cache; // Cache complete file content bool _full_file_cached = false; // Whether full file has been cached + + bool _enable_chunk_response = false; // Whether server returns chunk streaming response }; } // namespace doris::io diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index 9e1918e561431b..a704ab615d6028 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -19,6 +19,7 @@ public class DataSourceConfigKeys { public static final String JDBC_URL = "jdbc_url"; + public static final String TYPE = "type"; public static final String DRIVER_URL = "driver_url"; public static final String DRIVER_CLASS = "driver_class"; public static final String USER = "user"; @@ -26,6 +27,7 @@ public class DataSourceConfigKeys { public static final String DATABASE = "database"; public static final String SCHEMA = "schema"; public static final String INCLUDE_TABLES = "include_tables"; + public static final String TABLE = "table"; public static final String EXCLUDE_TABLES = "exclude_tables"; // initial,earliest,latest,{binlog,postion},\d{13} public static final String OFFSET = "offset"; diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java index 8449afd96281e1..c0b70832ffc997 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java @@ -37,7 +37,7 @@ public CompareOffsetRequest(Long jobId, Map sourceProperties, Map offsetFirst, Map offsetSecond) { - super(jobId, sourceType, sourceProperties); + super(jobId.toString(), sourceType, sourceProperties); this.offsetFirst = offsetFirst; this.offsetSecond = offsetSecond; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java index f11539e68324d1..65ffd36966cd47 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java @@ -23,16 +23,5 @@ @Data @EqualsAndHashCode(callSuper = true) public class FetchRecordRequest extends JobBaseRecordRequest { - private boolean reload = true; - private int fetchSize; - - @Override - public boolean isReload() { - return reload; - } - - @Override - public int getFetchSize() { - return fetchSize; - } + private String taskId; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java index f855e373958687..c6c8a0fb550f09 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java @@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig { public FetchTableSplitsRequest(Long jobId, String name, Map sourceProperties, String snapshotTable) { - super(jobId, name, sourceProperties); + super(jobId.toString(), name, sourceProperties); this.snapshotTable = snapshotTable; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java index bfdbf6a34558a9..89acfd4a929646 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java @@ -27,7 +27,7 @@ @AllArgsConstructor @NoArgsConstructor public class JobBaseConfig { - private Long jobId; + private String jobId; private String dataSource; private Map config; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java index a9a1be374dbede..282913e2dd2d0e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java @@ -28,8 +28,4 @@ @EqualsAndHashCode(callSuper = true) public abstract class JobBaseRecordRequest extends JobBaseConfig { protected Map meta; - - public abstract boolean isReload(); - - public abstract int getFetchSize(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java index a75edfcf7fb718..befacd39a94ae0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java @@ -28,14 +28,4 @@ public class WriteRecordRequest extends JobBaseRecordRequest { private String token; private String frontendAddress; private String taskId; - - @Override - public boolean isReload() { - return true; - } - - @Override - public int getFetchSize() { - return Integer.MAX_VALUE; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index a2fb673b12f220..8936d3e6287ea7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.File; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; @@ -77,7 +78,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(ParquetMeta.class, "parquet_meta"), tableValued(ParquetFileMetadata.class, "parquet_file_metadata"), tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"), - tableValued(ParquetBloomProbe.class, "parquet_bloom_probe") + tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"), + tableValued(CdcStream.class, "cdc_stream") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 76dc77dfab7836..b8fe765109dddb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -170,7 +170,7 @@ private String getToken() throws JobException { private WriteRecordRequest buildRequestParams() throws JobException { JdbcOffset offset = (JdbcOffset) runningOffset; WriteRecordRequest request = new WriteRecordRequest(); - request.setJobId(getJobId()); + request.setJobId(getJobId() + ""); request.setConfig(sourceProperties); request.setDataSource(dataSourceType.name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 51f3c07aa41bf0..dc288af1b101c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -185,7 +185,7 @@ public void updateOffset(Offset offset) { @Override public void fetchRemoteMeta(Map properties) throws Exception { Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/fetchEndOffset") .setParams(new Gson().toJson(requestParams)).build(); @@ -512,7 +512,7 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { */ private void initSourceReader() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/initReader") .setParams(new Gson().toJson(requestParams)).build(); @@ -559,7 +559,7 @@ public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); Backend backend = StreamingJobUtils.selectBackend(); - JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/close") .setParams(new Gson().toJson(requestParams)).build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 4164dcaa262827..ccfce61e01670a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -210,7 +210,7 @@ private static ConnectContext buildConnectContext() { return ctx; } - private static JdbcClient getJdbcClient(DataSourceType sourceType, Map properties) { + public static JdbcClient getJdbcClient(DataSourceType sourceType, Map properties) { JdbcClientConfig config = new JdbcClientConfig(); config.setCatalog(sourceType.name()); config.setUser(properties.get(DataSourceConfigKeys.USER)); @@ -390,8 +390,8 @@ public static List getColumns(JdbcClient jdbcClient, * The remoteDB implementation differs for each data source; * refer to the hierarchical mapping in the JDBC catalog. */ - private static String getRemoteDbName(DataSourceType sourceType, Map properties) - throws JobException { + public static String getRemoteDbName(DataSourceType sourceType, Map properties) + throws RuntimeException { String remoteDb = null; switch (sourceType) { case MYSQL: @@ -403,7 +403,7 @@ private static String getRemoteDbName(DataSourceType sourceType, Map arguments = getTVFProperties().getMap(); + return new CdcStreamTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build CdcStreamTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCdcStream(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index ee1ccd76478bd0..7b5e5654585bf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Backends; import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs; +import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream; import org.apache.doris.nereids.trees.expressions.functions.table.File; import org.apache.doris.nereids.trees.expressions.functions.table.Frontends; import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks; @@ -81,6 +82,10 @@ default R visitHttp(Http http, C context) { return visitTableValuedFunction(http, context); } + default R visitCdcStream(CdcStream cdcStream, C context) { + return visitTableValuedFunction(cdcStream, context); + } + default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) { return visitTableValuedFunction(frontendsDisks, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java new file mode 100644 index 00000000000000..896300385dfa35 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private final Map originProps; + + public CdcStreamTableValuedFunction(Map properties) throws AnalysisException { + this.originProps = properties; + processProps(properties); + validate(properties); + } + + private void processProps(Map properties) throws AnalysisException { + Map copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + + private void validate(Map properties) { + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET), "offset is required"); + } + + private void generateFileStatus() { + this.fileStatuses.clear(); + this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false)); + } + + @Override + public List getTableColumns() throws AnalysisException { + DataSourceType dataSourceType = + DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase()); + JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(dataSourceType, processedParams); + String database = StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams); + String table = processedParams.get(DataSourceConfigKeys.TABLE); + boolean tableExist = jdbcClient.isTableExist(database, table); + Preconditions.checkArgument(tableExist, "Table does not exist: " + table); + return jdbcClient.getColumnsFromJdbc(database, table); + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_HTTP; + } + + @Override + public String getFilePath() { + return URI; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("CdcStreamTvfBroker", StorageType.HTTP, originProps); + } + + @Override + public String getTableName() { + return "CdcStreamTableValuedFunction"; + } + + @Override + public List getPathPartitionKeys() { + return new ArrayList<>(); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java index ff4056a8b5b3b1..176607f98683a7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java @@ -37,8 +37,8 @@ public class Env { private static final Logger LOG = LoggerFactory.getLogger(Env.class); private static volatile Env INSTANCE; - private final Map jobContexts; - private final Map jobLocks; + private final Map jobContexts; + private final Map jobLocks; @Setter private int backendHttpPort; private Env() { @@ -79,9 +79,9 @@ private DataSource resolveDataSource(String source) { } private SourceReader getOrCreateReader( - Long jobId, DataSource dataSource, Map config) { - Objects.requireNonNull(jobId, "jobId"); - Objects.requireNonNull(dataSource, "dataSource"); + String jobId, DataSource dataSource, Map config) { + Objects.requireNonNull(jobId, "jobId is null"); + Objects.requireNonNull(dataSource, "dataSource is null"); JobContext context = jobContexts.get(jobId); if (context != null) { return context.getReader(dataSource); @@ -106,7 +106,7 @@ private SourceReader getOrCreateReader( } } - public void close(Long jobId) { + public void close(String jobId) { Lock lock = jobLocks.get(jobId); if (lock != null) { lock.lock(); @@ -123,12 +123,12 @@ public void close(Long jobId) { } private static final class JobContext { - private final long jobId; + private final String jobId; private volatile SourceReader reader; private volatile Map config; private volatile DataSource dataSource; - private JobContext(long jobId, DataSource dataSource, Map config) { + private JobContext(String jobId, DataSource dataSource, Map config) { this.jobId = jobId; this.dataSource = dataSource; this.config = config; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java index 8b4883b6203183..4c719f0f9bb129 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java @@ -17,10 +17,14 @@ package org.apache.doris.cdcclient.config; +import org.apache.doris.cdcclient.exception.CommonException; +import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.rest.RestResponse; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; @@ -35,4 +39,17 @@ public Object exceptionHandler(HttpServletRequest request, Exception e) { log.error("Unexpected exception", e); return RestResponse.internalError(e.getMessage()); } + + @ExceptionHandler(StreamException.class) + public Object streamExceptionHandler(StreamException e) { + // Directly throwing an exception allows curl to detect anomalies in the streaming response. + log.error("Exception in streaming response, re-throwing to client", e); + throw e; + } + + @ExceptionHandler(CommonException.class) + public Object commonExceptionHandler(CommonException e) { + log.error("Unexpected common exception", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage()); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index b3302e2c78519c..bc9474bd90f41e 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; @RestController public class ClientController { @@ -71,7 +72,7 @@ public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { } } - /** Fetch records from source reader */ + /** Fetch records from source reader, for debug */ @RequestMapping(path = "/api/fetchRecords", method = RequestMethod.POST) public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) { try { @@ -82,6 +83,12 @@ public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) { } } + @RequestMapping(path = "/api/fetchRecordStream", method = RequestMethod.POST) + public StreamingResponseBody fetchRecordStream(@RequestBody FetchRecordRequest recordReq) + throws Exception { + return pipelineCoordinator.fetchRecordStream(recordReq); + } + /** Fetch records from source reader and Write records to backend */ @RequestMapping(path = "/api/writeRecords", method = RequestMethod.POST) public Object writeRecord(@RequestBody WriteRecordRequest recordReq) { @@ -126,4 +133,9 @@ public Object close(@RequestBody JobBaseConfig jobConfig) { public Object getFailReason(@PathVariable("taskId") String taskId) { return RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId)); } + + @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.POST) + public Object getTaskIdOffset(@PathVariable String taskId) { + return RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId)); + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java new file mode 100644 index 00000000000000..b27afe56dfe780 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.exception; + +public class CommonException extends RuntimeException { + public CommonException() { + super(); + } + + public CommonException(String message) { + super(message); + } + + public CommonException(String message, Throwable cause) { + super(message, cause); + } + + public CommonException(Throwable cause) { + super(cause); + } + + protected CommonException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java new file mode 100644 index 00000000000000..73f9b34fbcdbfc --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.exception; + +public class StreamException extends RuntimeException { + public StreamException() { + super(); + } + + public StreamException(String message) { + super(message); + } + + public StreamException(String message, Throwable cause) { + super(message, cause); + } + + public StreamException(Throwable cause) { + super(cause); + } + + protected StreamException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 3245e00d4ccb74..241fa6ea81cc51 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -18,20 +18,28 @@ package org.apache.doris.cdcclient.service; import org.apache.doris.cdcclient.common.Env; +import org.apache.doris.cdcclient.exception.CommonException; +import org.apache.doris.cdcclient.exception.StreamException; import org.apache.doris.cdcclient.model.response.RecordWithMeta; import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; +import org.apache.doris.cdcclient.utils.ConfigUtil; +import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; import org.apache.doris.job.cdc.split.BinlogSplit; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -42,12 +50,15 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import io.debezium.data.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; /** Pipeline coordinator. */ @Component @@ -55,13 +66,16 @@ public class PipelineCoordinator { private static final Logger LOG = LoggerFactory.getLogger(PipelineCoordinator.class); private static final String SPLIT_ID = "splitId"; // jobId - private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); + private final Map batchStreamLoadMap = new ConcurrentHashMap<>(); // taskId -> writeFailReason private final Map taskErrorMaps = new ConcurrentHashMap<>(); + // taskId, offset + private final Map> taskOffsetCache = new ConcurrentHashMap<>(); private final ThreadPoolExecutor executor; private static final int MAX_CONCURRENT_TASKS = 10; private static final int QUEUE_CAPACITY = 128; private static ObjectMapper objectMapper = new ObjectMapper(); + private final byte[] LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8); public PipelineCoordinator() { this.executor = @@ -81,19 +95,156 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.readSplitRecords(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); + } + + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Serialize records and add them to the response (collect from iterator) + Iterator iterator = readResult.getRecordIterator(); + while (iterator != null && iterator.hasNext()) { + SourceRecord element = iterator.next(); + List serializedRecords = + sourceReader.deserialize(fetchRecord.getConfig(), element); + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); + } + rowCount += serializedRecords.size(); + } + // force flush buffer + bos.flush(); + } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + sourceReader.finishSplitRecords(); + } + + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(split)) { + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } + + // set meta for binlog event + if (sourceReader.isBinlogSplit(split)) { + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } + } else { + throw new RuntimeException("split state is null"); + } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + + if (!isLong(fetchRecord.getJobId())) { + // TVF requires closing the window after each execution, + // while PG requires dropping the slot. + sourceReader.close(fetchRecord); + } + } + + private boolean isLong(String s) { + if (s == null || s.isEmpty()) return false; + try { + Long.parseLong(s); + return true; + } catch (NumberFormatException e) { + return false; + } + } + + /** + * Generate split meta from request.offset. This only applies to TVF, so initial is not + * supported because initial requires a job to obtain split information. + */ + private Map generateMeta(Map cdcConfig) + throws JsonProcessingException { + Map meta = new HashMap<>(); + String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET); + if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) { + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } else if (ConfigUtil.isJson(offset)) { + Map startOffset = + objectMapper.readValue(offset, new TypeReference<>() {}); + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + meta.put("startingOffset", startOffset); + } else { + throw new RuntimeException("Unsupported offset: " + offset); + } + return meta; + } + + /** pull data from api for test */ public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) throws Exception { SourceReader sourceReader = Env.getCurrentEnv().getReader(fetchRecordRequest); SplitReadResult readResult = sourceReader.readSplitRecords(fetchRecordRequest); return buildRecordResponse(sourceReader, fetchRecordRequest, readResult); } - /** build RecordWithMeta */ + /** build RecordWithMeta for test */ private RecordWithMeta buildRecordResponse( SourceReader sourceReader, FetchRecordRequest fetchRecord, SplitReadResult readResult) throws Exception { RecordWithMeta recordResponse = new RecordWithMeta(); SourceSplit split = readResult.getSplit(); - int count = 0; try { // Serialize records and add them to the response (collect from iterator) Iterator iterator = readResult.getRecordIterator(); @@ -103,14 +254,6 @@ private RecordWithMeta buildRecordResponse( sourceReader.deserialize(fetchRecord.getConfig(), element); if (!CollectionUtils.isEmpty(serializedRecords)) { recordResponse.getRecords().addAll(serializedRecords); - count += serializedRecords.size(); - if (sourceReader.isBinlogSplit(split)) { - // put offset for event - Map lastMeta = - sourceReader.extractBinlogStateOffset(readResult.getSplitState()); - lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); - recordResponse.setMeta(lastMeta); - } } } } finally { @@ -271,7 +414,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } } - private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targetDb) { + private DorisBatchStreamLoad getOrCreateBatchStreamLoad(String jobId, String targetDb) { return batchStreamLoadMap.computeIfAbsent( jobId, k -> { @@ -280,7 +423,7 @@ private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String targe }); } - public void closeJobStreamLoad(Long jobId) { + public void closeJobStreamLoad(String jobId) { DorisBatchStreamLoad batchStreamLoad = batchStreamLoadMap.remove(jobId); if (batchStreamLoad != null) { LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId); @@ -298,4 +441,9 @@ public String getTaskFailReason(String taskId) { String taskReason = taskErrorMaps.remove(taskId); return taskReason == null ? "" : taskReason; } + + public Map getOffsetWithTaskId(String taskId) { + Map taskOffset = taskOffsetCache.remove(taskId); + return taskOffset == null ? new HashMap<>() : taskOffset; + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java index 1604b1c030539e..07bc6092d9f1f2 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java @@ -90,10 +90,10 @@ public class DorisBatchStreamLoad implements Serializable { private final Map bufferMapLock = new ConcurrentHashMap<>(); @Setter private String currentTaskId; private String targetDb; - private long jobId; + private String jobId; @Setter private String token; - public DorisBatchStreamLoad(long jobId, String targetDb) { + public DorisBatchStreamLoad(String jobId, String targetDb) { this.hostPort = Env.getCurrentEnv().getBackendHostPort(); this.flushQueue = new LinkedBlockingDeque<>(1); // maxBlockedBytes is two times of FLUSH_MAX_BYTE_SIZE @@ -311,9 +311,9 @@ private ReadWriteLock getLock(String bufferKey) { class LoadAsyncExecutor implements Runnable { private int flushQueueSize; - private long jobId; + private String jobId; - public LoadAsyncExecutor(int flushQueueSize, long jobId) { + public LoadAsyncExecutor(int flushQueueSize, String jobId) { this.flushQueueSize = flushQueueSize; this.jobId = jobId; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 5b404932c3f2d7..6e5380c29487ef 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -95,7 +95,7 @@ public JdbcIncrementalSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -154,39 +154,17 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/stream split based on offset and start the reader. - SourceSplitBase split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - Fetcher currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader for job {}", - baseReq.isReload(), - baseReq.getJobId()); - // build split - Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof IncrementalSourceStreamFetcher) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/stream split based on offset and start the reader. + LOG.info( + "Create new split reader for job {} with offset {}", + baseReq.getJobId(), + offsetMeta); + // build split + Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); + SourceSplitBase split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; // build response with iterator SplitReadResult result = new SplitReadResult(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java index d5feeef45a32c8..bf84cef505bf0a 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java @@ -36,7 +36,7 @@ public interface SourceReader { String SPLIT_ID = "splitId"; /** Initialization, called when the program starts */ - void initialize(long jobId, DataSource dataSource, Map config); + void initialize(String jobId, DataSource dataSource, Map config); /** Divide the data to be read. For example: split mysql to chunks */ List getSourceSplits(FetchTableSplitsRequest config); @@ -81,5 +81,5 @@ public interface SourceReader { * Commits the given offset with the source database. Used by some source like Postgres to * indicate how far the source TX log can be discarded. */ - default void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {} + default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {} } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index d20225c0974cbb..025cd5eac67824 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -70,7 +70,6 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -118,7 +117,7 @@ public MySqlSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { this.serializer.init(config); } @@ -170,39 +169,14 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/binlog split based on offset and start the reader. - MySqlSplit split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - DebeziumReader currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); - // build split - Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // reset binlog reader - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof BinlogSplitReader) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/binlog split based on offset and start the reader. + LOG.info("create new split reader for {} with offset {}", baseReq.getJobId(), offsetMeta); + // build split + Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); + MySqlSplit split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; // build response with iterator SplitReadResult result = new SplitReadResult(); @@ -449,6 +423,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j return new SplitRecords(currentSplitId, sourceRecords.iterator()); } + /** Poll data from the current reader, only for binlog reader */ private SplitRecords pollSplitRecordsWithCurrentReader( DebeziumReader currentReader) throws Exception { Iterator dataIt = null; @@ -568,18 +543,16 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str configFactory.includeSchemaChanges(false); - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> databaseName + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); + // Set table list + String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig); + com.google.common.base.Preconditions.checkArgument( + tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // setting startMode String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { - // do not need set offset when initial - // configFactory.startupOptions(StartupOptions.initial()); + configFactory.startupOptions(StartupOptions.initial()); } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.earliest()); BinlogOffset binlogOffset = @@ -599,7 +572,14 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str } if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY) && offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) { - BinlogOffset binlogOffset = new BinlogOffset(offsetMap); + BinlogOffset binlogOffset = + BinlogOffset.builder() + .setBinlogFilePosition( + offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY), + Long.parseLong( + offsetMap.get( + BinlogOffset.BINLOG_POSITION_OFFSET_KEY))) + .build(); configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset)); } else { throw new RuntimeException("Incorrect offset " + startupMode); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index c1e062df5a6b34..966f4e58f104d7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -25,7 +25,6 @@ import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.JobBaseConfig; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; @@ -51,7 +50,6 @@ import org.apache.flink.table.types.DataType; import java.time.Instant; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,7 +81,7 @@ public PostgresSourceReader() { } @Override - public void initialize(long jobId, DataSource dataSource, Map config) { + public void initialize(String jobId, DataSource dataSource, Map config) { PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId); PostgresDialect dialect = new PostgresDialect(sourceConfig); LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); @@ -137,7 +135,8 @@ private PostgresSourceConfig generatePostgresConfig(JobBaseConfig config) { } /** Generate PostgreSQL source config from Map config */ - private PostgresSourceConfig generatePostgresConfig(Map cdcConfig, Long jobId) { + private PostgresSourceConfig generatePostgresConfig( + Map cdcConfig, String jobId) { PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory(); // Parse JDBC URL to extract connection info @@ -167,29 +166,18 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi configFactory.includeSchemaChanges(false); // Set table list - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - if (StringUtils.isNotEmpty(includingTables)) { - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> schema + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); - } + String[] tableList = ConfigUtil.getTableList(schema, cdcConfig); + Preconditions.checkArgument(tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // Set startup options String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.initial()); - } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { - configFactory.startupOptions(StartupOptions.earliest()); } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { throw new RuntimeException("Unsupported json offset " + startupMode); - } else if (ConfigUtil.is13Timestamp(startupMode)) { - // start from timestamp - Long ts = Long.parseLong(startupMode); - configFactory.startupOptions(StartupOptions.timestamp(ts)); } else { throw new RuntimeException("Unknown offset " + startupMode); } @@ -212,7 +200,7 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi return configFactory.create(0); } - private String getSlotName(Long jobId) { + private String getSlotName(String jobId) { return "doris_cdc_" + jobId; } @@ -338,7 +326,7 @@ protected FetchTask createFetchTaskFromSplit( * `CommitFeOffset` fails, Data after the startOffset will not be cleared. */ @Override - public void commitSourceOffset(Long jobId, SourceSplit sourceSplit) { + public void commitSourceOffset(String jobId, SourceSplit sourceSplit) { try { if (sourceSplit instanceof StreamSplit) { Offset offsetToCommit = ((StreamSplit) sourceSplit).getStartingOffset(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index 016c9ddf312e0d..50588deee8fc55 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -18,10 +18,12 @@ package org.apache.doris.cdcclient.utils; import org.apache.doris.cdcclient.common.Constants; +import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.commons.lang3.StringUtils; import java.time.ZoneId; +import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -38,8 +40,8 @@ public class ConfigUtil { private static ObjectMapper objectMapper = new ObjectMapper(); private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class); - public static String getServerId(long jobId) { - return String.valueOf(Math.abs(String.valueOf(jobId).hashCode())); + public static String getServerId(String jobId) { + return String.valueOf(Math.abs(jobId.hashCode())); } public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) { @@ -96,6 +98,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static String[] getTableList(String schema, Map cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only supports one table"); + return new String[] {schema + "." + table.trim()}; + } else { + return new String[0]; + } + } + /** Optimized debezium parameters */ public static Properties getDefaultDebeziumProps() { Properties properties = new Properties(); diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out new file mode 100644 index 00000000000000..fcb9b28f1514d0 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_tvf -- +C1 3 +D1 4 + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy new file mode 100644 index 00000000000000..6f1190da0176fb --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf_mysql", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_cdc_stream_tvf" + def mysqlDb = "test_cdc_db" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + def offset = "" + + // create test + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2);""" + + def result = sql_return_maparray "show master status" + def file = result[0]["File"] + def position = result[0]["Position"] + offset = """{"file":"${file}","pos":"${position}"}""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('D1', 4);""" + } + + log.info("offset: " + offset) + qt_select_tvf """select * from cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${table1}", + "offset" = '${offset}' + ) + """ + + test { + sql """ + select * from cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${table1}", + "offset" = 'initial') + """ + exception "Unsupported offset: initial" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy new file mode 100644 index 00000000000000..f9a7befdcca1de --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // create test + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "name" varchar(200), + "age" int2, + PRIMARY KEY ("name") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('B1', 2);""" + } + + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'initial') + """ + exception "Unsupported offset: initial" + } + + // Here, because PG consumption requires creating a slot first, + // we only verify whether the execution can be successful. + def result = sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'latest') + """ + log.info("result:", result) + } +}