Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop.";

IcebergSysTableJniReader::IcebergSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
: JniReader(file_slot_descs, state, profile), _range_params(range_params) {}
RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
: JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {}

Status IcebergSysTableJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
Expand All @@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader(
required_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<std::string, std::string> params;
params["serialized_task"] = _range_params.serialized_task;
// "," is not in base64
params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");
params["time_zone"] = _state->timezone_obj().name();
for (const auto& kv : _range_params.hadoop_props) {
params["time_zone"] = _state->timezone();
for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}
_jni_connector =
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class IcebergSysTableJniReader : public JniReader {
public:
IcebergSysTableJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TIcebergMetadataParams& range_params);
const TMetaScanRange& meta_scan_range);

~IcebergSysTableJniReader() override = default;

Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const TIcebergMetadataParams& _range_params;
const TMetaScanRange& _meta_scan_range;
};

#include "common/compile_check_end.h"
Expand Down
9 changes: 1 addition & 8 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
column_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<String, String> params;
params["db_name"] = range.table_format_params.paimon_params.db_name;
params["table_name"] = range.table_format_params.paimon_params.table_name;
params["paimon_split"] = range.table_format_params.paimon_params.paimon_split;
params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names;
params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate;
params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id);
params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id);
params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id);
params["last_update_time"] =
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");
params["time_zone"] = _state->timezone();
if (range_params->__isset.serialized_table) {
params["serialized_table"] = range_params->serialized_table;
}
Expand Down
23 changes: 7 additions & 16 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"

const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon.";

PaimonSysTableJniReader::PaimonSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TPaimonMetadataParams& range_params)
: JniReader(file_slot_descs, state, profile), _range_params(range_params) {
RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
: JniReader(file_slot_descs, state, profile), _meta_scan_range(meta_scan_range) {
std::vector<std::string> required_fields;
std::vector<std::string> required_types;
for (const auto& desc : _file_slot_descs) {
Expand All @@ -38,21 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader(
}

std::map<std::string, std::string> params;
params["db_name"] = _range_params.db_name;
params["tbl_name"] = _range_params.tbl_name;
params["query_type"] = _range_params.query_type;
params["ctl_id"] = std::to_string(_range_params.ctl_id);
params["db_id"] = std::to_string(_range_params.db_id);
params["tbl_id"] = std::to_string(_range_params.tbl_id);
params["serialized_split"] = _range_params.serialized_split;
params["serialized_table"] = _meta_scan_range.serialized_table;
// "," is not in base64
params["serialized_splits"] = join(_meta_scan_range.serialized_splits, ",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");

for (const auto& kv : _range_params.paimon_props) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}

for (const auto& kv : _range_params.hadoop_props) {
params["time_zone"] = _state->timezone();
for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}

Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader {

public:
static const std::string HADOOP_OPTION_PREFIX;
static const std::string PAIMON_OPTION_PREFIX;
PaimonSysTableJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TPaimonMetadataParams& range_params);
const TMetaScanRange& meta_scan_range);

~PaimonSysTableJniReader() override = default;

Expand All @@ -58,7 +57,7 @@ class PaimonSysTableJniReader : public JniReader {

private:
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
const TPaimonMetadataParams& _range_params;
const TMetaScanRange& _meta_scan_range;
};

#include "common/compile_check_end.h"
Expand Down
31 changes: 4 additions & 27 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ Status VMetaScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
// TODO: refactor this code
auto reader = IcebergSysTableJniReader::create_unique(
_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.iceberg_params);
auto reader = IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
} else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) {
auto reader = PaimonSysTableJniReader::create_unique(
_tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.paimon_params);
auto reader = PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
Expand Down Expand Up @@ -251,9 +251,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
TFetchSchemaTableDataRequest request;
switch (meta_scan_range.metadata_type) {
case TMetadataType::ICEBERG:
RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::HUDI:
RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request));
break;
Expand Down Expand Up @@ -319,26 +316,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
return Status::OK();
}

Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
if (!meta_scan_range.__isset.iceberg_params) {
return Status::InternalError("Can not find TIcebergMetadataParams from meta_scan_range.");
}

// create request
request->__set_cluster_name("");
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request";
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class VMetaScanner : public VScanner {
private:
Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns);
Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_backends_metadata_request(const TMetaScanRange& meta_scan_range,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use paimon;
create database if not exists test_paimon_spark;
use test_paimon_spark;

SET TIME ZONE '+08:00';

CREATE TABLE IF NOT EXISTS t_ts_ntz (
id INT,
ts TIMESTAMP,
ts_ntz TIMESTAMP_NTZ
) USING paimon;

INSERT INTO t_ts_ntz VALUES
(1, CAST('2025-08-12 06:00:00+00:00' AS TIMESTAMP), CAST('2025-08-12 06:00:00' AS TIMESTAMP_NTZ)),
(2, CAST('2025-08-12 14:00:00+08:00' AS TIMESTAMP), CAST('2025-08-12 14:00:00' AS TIMESTAMP_NTZ));
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;

import com.google.common.base.Preconditions;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -34,6 +35,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
Expand All @@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner {
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
private final ClassLoader classLoader;
private final PreExecutionAuthenticator preExecutionAuthenticator;
private final FileScanTask scanTask;
private final Iterator<FileScanTask> scanTasks;
private final List<NestedField> fields;
private final String timezone;
private CloseableIterator<StructLike> reader;

public IcebergSysTableJniScanner(int batchSize, Map<String, String> params) {
this.classLoader = this.getClass().getClassLoader();
this.scanTask = SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
List<FileScanTask> scanTasks = Arrays.stream(params.get("serialized_splits").split(","))
.map(SerializationUtil::deserializeFromBase64)
.map(obj -> (FileScanTask) obj)
.collect(Collectors.toList());
Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not be empty");
this.scanTasks = scanTasks.iterator();
String[] requiredFields = params.get("required_fields").split(",");
this.fields = selectSchema(scanTask.schema().asStruct(), requiredFields);
this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), requiredFields);
this.timezone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
Map<String, String> hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
Expand All @@ -69,28 +77,34 @@ public IcebergSysTableJniScanner(int batchSize, Map<String, String> params) {

@Override
public void open() throws IOException {
Thread.currentThread().setContextClassLoader(classLoader);
nextScanTask();
}

private void nextScanTask() throws IOException {
Preconditions.checkArgument(scanTasks.hasNext());
FileScanTask scanTask = scanTasks.next();
try {
Thread.currentThread().setContextClassLoader(classLoader);
preExecutionAuthenticator.execute(() -> {
// execute FileScanTask to get rows
reader = scanTask.asDataTask().rows().iterator();
return null;
});
} catch (Exception e) {
this.close();
String msg = String.format("Failed to open IcebergMetadataJniScanner");
String msg = String.format("Failed to open next scan task: %s", scanTask);
LOG.error(msg, e);
throw new IOException(msg, e);
}
}

@Override
protected int getNext() throws IOException {
if (reader == null) {
return 0;
}
int rows = 0;
while (reader.hasNext() && rows < getBatchSize()) {
while ((reader.hasNext() || scanTasks.hasNext()) && rows < getBatchSize()) {
if (!reader.hasNext()) {
nextScanTask();
}
StructLike row = reader.next();
for (int i = 0; i < fields.size(); i++) {
NestedField field = fields.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ public class PaimonColumnValue implements ColumnValue {
private DataGetters record;
private ColumnType dorisType;
private DataType dataType;
private String timeZone;

public PaimonColumnValue() {
}

public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType) {
public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType, String timeZone) {
this.idx = idx;
this.record = record;
this.dorisType = columnType;
this.dataType = dataType;
this.timeZone = timeZone;
}

public void setIdx(int idx, ColumnType dorisType, DataType dataType) {
Expand All @@ -67,6 +69,10 @@ public void setOffsetRow(InternalRow record) {
this.record = record;
}

public void setTimeZone(String timeZone) {
this.timeZone = timeZone;
}

@Override
public boolean canGetStringAsBytes() {
return true;
Expand Down Expand Up @@ -136,7 +142,8 @@ public LocalDate getDate() {
public LocalDateTime getDateTime() {
Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision());
if (dataType instanceof LocalZonedTimestampType) {
return LocalDateTime.ofInstant(ts.toInstant(), ZoneId.systemDefault());
return ts.toLocalDateTime().atZone(ZoneId.of("UTC"))
.withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime();
} else {
return ts.toLocalDateTime();
}
Expand All @@ -157,7 +164,7 @@ public void unpackArray(List<ColumnValue> values) {
InternalArray recordArray = record.getArray(idx);
for (int i = 0; i < recordArray.size(); i++) {
PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i,
dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType());
dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType(), timeZone);
values.add(arrayColumnValue);
}
}
Expand All @@ -168,13 +175,13 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
InternalArray key = map.keyArray();
for (int i = 0; i < key.size(); i++) {
PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i,
dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType());
dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType(), timeZone);
keys.add(keyColumnValue);
}
InternalArray value = map.valueArray();
for (int i = 0; i < value.size(); i++) {
PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i,
dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType());
dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType(), timeZone);
values.add(valueColumnValue);
}
}
Expand All @@ -185,7 +192,7 @@ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> value
InternalRow row = record.getRow(idx, structFieldIndex.size());
for (int i : structFieldIndex) {
values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i),
((RowType) dataType).getFields().get(i).type()));
((RowType) dataType).getFields().get(i).type(), timeZone));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,14 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;

public class PaimonJniScanner extends JniScanner {
private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class);
@Deprecated
private static final String PAIMON_OPTION_PREFIX = "paimon.";
@Deprecated
private static final String HADOOP_OPTION_PREFIX = "hadoop.";

private final Map<String, String> params;
@Deprecated
private final Map<String, String> hadoopOptionParams;
private final String paimonSplit;
private final String paimonPredicate;
Expand All @@ -76,6 +73,8 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
}
paimonSplit = params.get("paimon_split");
paimonPredicate = params.get("paimon_predicate");
String timeZone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
columnValue.setTimeZone(timeZone);
initTableInfo(columnTypes, requiredFields, batchSize);
hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
Expand Down
Loading
Loading