Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 105 additions & 24 deletions be/src/io/fs/http_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

#include <algorithm>

#include "common/config.h"
#include "common/logging.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/cdc_client_mgr.h"
#include "runtime/exec_env.h"

namespace doris::io {

Expand Down Expand Up @@ -83,6 +87,14 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
}
}

// Parse chunk response configuration
auto chunk_iter = _extend_kv.find("http.chunk.response");
if (chunk_iter != _extend_kv.end()) {
std::string value = chunk_iter->second;
std::transform(value.begin(), value.end(), value.begin(), ::tolower);
_enable_chunk_response = (value == "true" || value == "1");
}

_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
}

Expand All @@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) {
return Status::OK();
}

// Step 1: HEAD request to get file metadata
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
_client->set_method(HttpMethod::HEAD);
RETURN_IF_ERROR(_client->execute());
// start CDC client
auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") {
LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
ExecEnv* env = ExecEnv::GetInstance();
if (env == nullptr || env->cdc_client_mgr() == nullptr) {
return Status::InternalError("ExecEnv or CdcClientMgr is not initialized");
}

PRequestCdcClientResult result;
Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
if (!start_st.ok()) {
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
return start_st;
}

_url = fmt::format(_url, doris::config::cdc_client_port);
_range_supported = false;
LOG(INFO) << "CDC client started successfully for " << _url;
}

// Step 1: HEAD request to get file metadata (skip for chunk response)
if (_enable_chunk_response) {
// Chunk streaming response, skip HEAD request
_size_known = false;
_range_supported = false;
LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url;
} else {
// Normal mode: execute HEAD request to get file metadata
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
_client->set_method(HttpMethod::HEAD);
RETURN_IF_ERROR(_client->execute());

uint64_t content_length = 0;
RETURN_IF_ERROR(_client->get_content_length(&content_length));
uint64_t content_length = 0;
RETURN_IF_ERROR(_client->get_content_length(&content_length));

_file_size = content_length;
_size_known = true;
_file_size = content_length;
_size_known = true;
}

// Step 2: Check if Range request is disabled by configuration
if (!_enable_range_request) {
// User explicitly disabled Range requests, use non-Range mode directly
if (!_enable_range_request || _enable_chunk_response) {
// User explicitly disabled Range requests or chunk response mode, use non-Range mode
_range_supported = false;
LOG(INFO) << "Range requests disabled by configuration for " << _url
<< ", using non-Range mode. File size: " << _file_size << " bytes";

// Check if file size exceeds limit for non-Range mode
if (_file_size > _max_request_size_bytes) {
return Status::InternalError(
"Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, "
"configured by http.max.request.size.bytes). URL: {}",
_file_size, _max_request_size_bytes, _url);
LOG(INFO) << "Range requests disabled for " << _url << " (config=" << !_enable_range_request
<< ", chunk_response=" << _enable_chunk_response << ")";

// Skip file size check for chunk response (size unknown at this point)
if (_enable_chunk_response) {
LOG(INFO) << "Chunk response mode, file size check skipped for " << _url;
} else {
// Check if file size exceeds limit for non-Range mode (non-chunk only)
if (_file_size > _max_request_size_bytes) {
return Status::InternalError(
"Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} "
"bytes, "
"configured by http.max.request.size.bytes). URL: {}",
_file_size, _max_request_size_bytes, _url);
}
LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
<< " bytes, max allowed: " << _max_request_size_bytes << " bytes";
}

LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
<< " bytes, max allowed: " << _max_request_size_bytes << " bytes";
} else {
// Step 3: Range request is enabled (default), detect Range support
VLOG(1) << "Detecting Range support for URL: " << _url;
Expand Down Expand Up @@ -223,9 +269,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
<< " with_range=" << _range_supported;

// Prepare and initialize the HTTP client for GET request
// Prepare and initialize the HTTP client for request
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
_client->set_method(HttpMethod::GET);

// Determine HTTP method from configuration (default: GET)
HttpMethod method = HttpMethod::GET;
auto method_iter = _extend_kv.find("http.method");
if (method_iter != _extend_kv.end()) {
method = to_http_method(method_iter->second.c_str());
if (method == HttpMethod::UNKNOWN) {
LOG(WARNING) << "Invalid http.method value: " << method_iter->second
<< ", falling back to GET";
method = HttpMethod::GET;
}
}
_client->set_method(method);

// Set payload if configured (supports POST, PUT, DELETE, etc.)
auto payload_iter = _extend_kv.find("http.payload");
if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
_client->set_payload(payload_iter->second);
_client->set_content_type("application/json");
VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size();
}

_client->set_header("Expect", "");
_client->set_header("Connection", "close");
Expand Down Expand Up @@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
long http_status = _client->get_http_status();
VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();

// Check for HTTP error status codes (4xx, 5xx)
if (http_status >= 400) {
std::string error_body;
if (buf.empty()) {
error_body = "(empty response body)";
} else {
// Limit error message to 1024 bytes to avoid excessive logging
size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
error_body = buf.substr(0, max_len);
}

return Status::InternalError("HTTP request failed with status {}: {}.", http_status,
error_body);
}

if (buf.empty()) {
*bytes_read = buffer_offset;
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/http_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class HttpFileReader final : public FileReader {
// Full file cache for non-Range mode to avoid repeated downloads
std::string _full_file_cache; // Cache complete file content
bool _full_file_cached = false; // Whether full file has been cached

bool _enable_chunk_response = false; // Whether server returns chunk streaming response
};

} // namespace doris::io
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

public class DataSourceConfigKeys {
public static final String JDBC_URL = "jdbc_url";
public static final String TYPE = "type";
public static final String DRIVER_URL = "driver_url";
public static final String DRIVER_CLASS = "driver_class";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DATABASE = "database";
public static final String SCHEMA = "schema";
public static final String INCLUDE_TABLES = "include_tables";
public static final String TABLE = "table";
public static final String EXCLUDE_TABLES = "exclude_tables";
// initial,earliest,latest,{binlog,postion},\d{13}
public static final String OFFSET = "offset";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public CompareOffsetRequest(Long jobId,
Map<String, String> sourceProperties,
Map<String, String> offsetFirst,
Map<String, String> offsetSecond) {
super(jobId, sourceType, sourceProperties);
super(jobId.toString(), sourceType, sourceProperties);
this.offsetFirst = offsetFirst;
this.offsetSecond = offsetSecond;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,5 @@
@Data
@EqualsAndHashCode(callSuper = true)
public class FetchRecordRequest extends JobBaseRecordRequest {
private boolean reload = true;
private int fetchSize;

@Override
public boolean isReload() {
return reload;
}

@Override
public int getFetchSize() {
return fetchSize;
}
private String taskId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig {

public FetchTableSplitsRequest(Long jobId, String name,
Map<String, String> sourceProperties, String snapshotTable) {
super(jobId, name, sourceProperties);
super(jobId.toString(), name, sourceProperties);
this.snapshotTable = snapshotTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@AllArgsConstructor
@NoArgsConstructor
public class JobBaseConfig {
private Long jobId;
private String jobId;
private String dataSource;
private Map<String, String> config;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@
@EqualsAndHashCode(callSuper = true)
public abstract class JobBaseRecordRequest extends JobBaseConfig {
protected Map<String, Object> meta;

public abstract boolean isReload();

public abstract int getFetchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,4 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
private String token;
private String frontendAddress;
private String taskId;

@Override
public boolean isReload() {
return true;
}

@Override
public int getFetchSize() {
return Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
import org.apache.doris.nereids.trees.expressions.functions.table.File;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
Expand Down Expand Up @@ -77,7 +78,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(ParquetMeta.class, "parquet_meta"),
tableValued(ParquetFileMetadata.class, "parquet_file_metadata"),
tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"),
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe")
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"),
tableValued(CdcStream.class, "cdc_stream")
);

public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private String getToken() throws JobException {
private WriteRecordRequest buildRequestParams() throws JobException {
JdbcOffset offset = (JdbcOffset) runningOffset;
WriteRecordRequest request = new WriteRecordRequest();
request.setJobId(getJobId());
request.setJobId(getJobId() + "");
request.setConfig(sourceProperties);
request.setDataSource(dataSourceType.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void updateOffset(Offset offset) {
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
.setParams(new Gson().toJson(requestParams)).build();
Expand Down Expand Up @@ -512,7 +512,7 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
*/
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/initReader")
.setParams(new Gson().toJson(requestParams)).build();
Expand Down Expand Up @@ -559,7 +559,7 @@ public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
.setParams(new Gson().toJson(requestParams)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private static ConnectContext buildConnectContext() {
return ctx;
}

private static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, String> properties) {
public static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, String> properties) {
JdbcClientConfig config = new JdbcClientConfig();
config.setCatalog(sourceType.name());
config.setUser(properties.get(DataSourceConfigKeys.USER));
Expand Down Expand Up @@ -390,8 +390,8 @@ public static List<Column> getColumns(JdbcClient jdbcClient,
* The remoteDB implementation differs for each data source;
* refer to the hierarchical mapping in the JDBC catalog.
*/
private static String getRemoteDbName(DataSourceType sourceType, Map<String, String> properties)
throws JobException {
public static String getRemoteDbName(DataSourceType sourceType, Map<String, String> properties)
throws RuntimeException {
String remoteDb = null;
switch (sourceType) {
case MYSQL:
Expand All @@ -403,7 +403,7 @@ private static String getRemoteDbName(DataSourceType sourceType, Map<String, Str
Preconditions.checkArgument(StringUtils.isNotEmpty(remoteDb), "schema is required");
break;
default:
throw new JobException("Unsupported source type " + sourceType);
throw new RuntimeException("Unsupported source type " + sourceType);
}
return remoteDb;
}
Expand Down
Loading
Loading