Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
37c8553
add cdc client read data
JNSimba Nov 24, 2025
cd4e1e9
add a part of stream load sink
JNSimba Nov 25, 2025
4b56052
fix write split chunk api
JNSimba Nov 26, 2025
03fa875
add be request cdc client
JNSimba Nov 28, 2025
aa85742
add fe rpc interface and test
JNSimba Nov 28, 2025
c12ca09
fix cdc client manager compile
JNSimba Nov 28, 2025
3f90ae6
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Nov 28, 2025
56ac92c
add streaming job split chunks and streaming multi task
JNSimba Dec 2, 2025
38507ae
add fe create streamt multi ask
JNSimba Dec 4, 2025
3c2d358
fix be forward request
JNSimba Dec 5, 2025
124d2ae
add multi table sync and checkstyle
JNSimba Dec 5, 2025
06c8ae7
add build script
JNSimba Dec 6, 2025
8f8f676
update check style
JNSimba Dec 6, 2025
59a5cf2
fix be fork
JNSimba Dec 8, 2025
2b783cd
fix offset bug
JNSimba Dec 8, 2025
6b2443f
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 8, 2025
79a71ed
fix create table and offset consumer bug
JNSimba Dec 9, 2025
4f3a1e5
add case for mysql sync
JNSimba Dec 10, 2025
8d9b14b
Merge branch 'master' into mysql-cdc
JNSimba Dec 10, 2025
f63e984
code style
JNSimba Dec 10, 2025
f4e1a9e
fix multi table bug
JNSimba Dec 10, 2025
0a27f8a
fix case
JNSimba Dec 11, 2025
7748211
fix
JNSimba Dec 11, 2025
b26a7b2
fix
JNSimba Dec 11, 2025
91c3bd2
fix
JNSimba Dec 11, 2025
3f8813d
extend to entity fe-common
JNSimba Dec 11, 2025
cfff8f9
extend fe model to common
JNSimba Dec 11, 2025
948aaf8
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 11, 2025
2dd280b
rename cdc clientmgr
JNSimba Dec 15, 2025
84a87f3
add create alter case
JNSimba Dec 15, 2025
4c30029
fix restart case
JNSimba Dec 15, 2025
3576a29
add priv and dup case
JNSimba Dec 16, 2025
2007303
add exclude tbls
JNSimba Dec 16, 2025
96405ff
fix delete job meta tbl when drop job
JNSimba Dec 17, 2025
0aff3ec
Merge branch 'master' into mysql-cdc
JNSimba Dec 17, 2025
59ae5d7
fix params
JNSimba Dec 17, 2025
4713bbc
be format
JNSimba Dec 17, 2025
f9cc4ff
add all type case
JNSimba Dec 18, 2025
4bdb638
fix
JNSimba Dec 18, 2025
9210541
fix
JNSimba Dec 18, 2025
d0c9124
fix be ut
JNSimba Dec 18, 2025
f8103f7
fix get split more concurrency
JNSimba Dec 19, 2025
985a736
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 19, 2025
3b2ff9a
fix
JNSimba Dec 19, 2025
585a0cb
fix be coverage
JNSimba Dec 20, 2025
e3d2aca
fix ut
JNSimba Dec 20, 2025
ba9ea3a
fix ut
JNSimba Dec 20, 2025
185aa0f
fix ut
JNSimba Dec 20, 2025
32e4834
fix coverage
JNSimba Dec 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down Expand Up @@ -629,6 +631,9 @@ DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
// The buffer size to store stream table function schema info
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB

// request cdc client timeout
DEFINE_mInt32(request_cdc_client_timeout_ms, "60000");

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000");
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down Expand Up @@ -667,6 +670,9 @@ DECLARE_mBool(enable_stream_load_commit_txn_on_be);
// The buffer size to store stream table function schema info
DECLARE_Int64(stream_tvf_buffer_size);

// request cdc client timeout
DECLARE_mInt32(request_cdc_client_timeout_ms);

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
DECLARE_mInt32(olap_table_sink_send_interval_microseconds);
Expand Down
269 changes: 269 additions & 0 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/cdc_client_mgr.h"

#include <brpc/closure_guard.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
#include <google/protobuf/stubs/callback.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>

#include <cstdio>
#ifndef __APPLE__
#include <sys/prctl.h>
#endif

#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include <thread>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_client.h"

namespace doris {

namespace {
// Handle SIGCHLD signal to prevent zombie processes
void handle_sigchld(int sig_no) {
int status = 0;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
}
}

// Check CDC client health
#ifndef BE_TEST
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
const std::string cdc_health_url =
"http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
"/actuator/health";

auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
RETURN_IF_ERROR(client->init(cdc_health_url));
client->set_timeout_ms(5000);
RETURN_IF_ERROR(client->execute(&health_response));
return Status::OK();
};

Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);

if (!status.ok()) {
return Status::InternalError("CDC client health check failed");
}

bool is_up = health_response.find("UP") != std::string::npos;

if (!is_up) {
return Status::InternalError(fmt::format("CDC client unhealthy: {}", health_response));
}

return Status::OK();
}
#endif

} // anonymous namespace

CdcClientMgr::CdcClientMgr() = default;

CdcClientMgr::~CdcClientMgr() {
stop();
}

void CdcClientMgr::stop() {
pid_t pid = _child_pid.load();
if (pid > 0) {
// Check if process is still alive
if (kill(pid, 0) == 0) {
LOG(INFO) << "Stopping CDC client process, pid=" << pid;
// Send SIGTERM for graceful shutdown
kill(pid, SIGTERM);
// Wait a short time for graceful shutdown
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Force kill if still alive
if (kill(pid, 0) == 0) {
LOG(INFO) << "Force killing CDC client process, pid=" << pid;
kill(pid, SIGKILL);
int status = 0;
waitpid(pid, &status, 0);
}
}
_child_pid.store(0);
}

LOG(INFO) << "CdcClientMgr is stopped";
}

Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
std::lock_guard<std::mutex> lock(_start_mutex);

Status st = Status::OK();
pid_t exist_pid = _child_pid.load();
if (exist_pid > 0) {
#ifdef BE_TEST
// In test mode, directly return OK if PID exists
LOG(INFO) << "cdc client already started (BE_TEST mode), pid=" << exist_pid;
return Status::OK();
#else
// Check if process is still alive
if (kill(exist_pid, 0) == 0) {
// Process exists, verify it's actually our CDC client by health check
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
if (check_st.ok()) {
// Process exists and responding, CDC client is running
return Status::OK();
} else {
// Process exists but CDC client not responding
// Either it's a different process (PID reused) or CDC client is unhealthy
// Reset PID and return error
_child_pid.store(0);
st = Status::InternalError(fmt::format("CDC client {} unresponsive", exist_pid));
st.to_protobuf(result->mutable_status());
return st;
}
} else {
// Process is dead, reset PID and continue to start
_child_pid.store(0);
}
#endif
}

const char* doris_home = getenv("DORIS_HOME");
const char* log_dir = getenv("LOG_DIR");
const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
const std::string cdc_jar_port =
"--server.port=" + std::to_string(doris::config::cdc_client_port);
const std::string backend_http_port =
"--backend.http.port=" + std::to_string(config::webserver_port);
const std::string java_opts = "-Dlog.path=" + std::string(log_dir);

// check cdc jar exists
struct stat buffer;
if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
st = Status::InternalError("Can not find cdc-client.jar.");
st.to_protobuf(result->mutable_status());
return st;
}

// Ready to start cdc client
LOG(INFO) << "Ready to start cdc client";
const auto* java_home = getenv("JAVA_HOME");
if (!java_home) {
st = Status::InternalError("Can not find JAVA_HOME");
st.to_protobuf(result->mutable_status());
return st;
}
std::string path(java_home);
std::string java_bin = path + "/bin/java";
// Capture signal to prevent child process from becoming a zombie process
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
sigaction(SIGCHLD, &act, NULL);
LOG(INFO) << "Start to fork cdc client process with " << path;
#ifdef BE_TEST
_child_pid.store(99999);
st = Status::OK();
return st;
#else
pid_t pid = fork();
if (pid < 0) {
// Fork failed
st = Status::InternalError("Fork cdc client failed.");
st.to_protobuf(result->mutable_status());
return st;
} else if (pid == 0) {
// Child process
// When the parent process is killed, the child process also needs to exit
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
// If execlp returns, it means it failed
perror("Cdc client child process error");
exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);

// Waiting for cdc to start, failed after more than 3 * 10 seconds
std::string health_response;
Status status = check_cdc_client_health(3, 10, health_response);
if (!status.ok()) {
// Reset PID if startup failed
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
st.to_protobuf(result->mutable_status());
} else {
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" << health_response;
}
}
#endif //BE_TEST
return st;
}

void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
PRequestCdcClientResult* result,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);

// Start CDC client if not started
Status start_st = start_cdc_client(result);
if (!start_st.ok()) {
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
start_st.to_protobuf(result->mutable_status());
return;
}

std::string cdc_response;
Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
result->set_response(cdc_response);
st.to_protobuf(result->mutable_status());
}

Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
const std::string& params_body,
std::string* response) {
std::string remote_url_prefix =
fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);

auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
if (!params_body.empty()) {
client->set_payload(params_body);
}
client->set_content_type("application/json");
client->set_method(POST);
RETURN_IF_ERROR(client->execute(response));
return Status::OK();
};

return HttpClient::execute_with_retry(3, 1, cdc_request);
}

} // namespace doris
63 changes: 63 additions & 0 deletions be/src/runtime/cdc_client_mgr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/internal_service.pb.h>

#include <atomic>
#include <mutex>
#include <string>

#include "common/status.h"

namespace google::protobuf {
class Closure;
class RpcController;
} // namespace google::protobuf

namespace doris {

class CdcClientMgr {
public:
CdcClientMgr();
~CdcClientMgr();

void stop();

// Request CDC client to handle a request
void request_cdc_client_impl(const PRequestCdcClientRequest* request,
PRequestCdcClientResult* result, google::protobuf::Closure* done);

Status send_request_to_cdc_client(const std::string& api, const std::string& params_body,
std::string* response);

Status start_cdc_client(PRequestCdcClientResult* result);

#ifdef BE_TEST
// For testing only: get current child PID
pid_t get_child_pid() const { return _child_pid.load(); }
// For testing only: set child PID directly
void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
#endif

private:
std::mutex _start_mutex;
std::atomic<pid_t> _child_pid {0};
};

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class HeartbeatFlags;
class FrontendServiceClient;
class FileMetaCache;
class GroupCommitMgr;
class CdcClientMgr;
class TabletSchemaCache;
class TabletColumnObjectPool;
class UserFunctionCache;
Expand Down Expand Up @@ -277,6 +278,7 @@ class ExecEnv {
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; }
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }
CdcClientMgr* cdc_client_mgr() { return _cdc_client_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }

Expand Down Expand Up @@ -509,6 +511,7 @@ class ExecEnv {
// ip:brpc_port -> frontend_indo
std::map<TNetworkAddress, FrontendInfo> _frontends;
GroupCommitMgr* _group_commit_mgr = nullptr;
CdcClientMgr* _cdc_client_mgr = nullptr;

// Maybe we should use unique_ptr, but it need complete type, which means we need
// to include many headers, and for some cpp file that do not need class like TabletSchemaCache,
Expand Down
Loading
Loading