diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index ef690c15b684d2..fa73454f4b4850 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -35,7 +35,8 @@ class Block; namespace doris::vectorized { -const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix."; +const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon."; +const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop."; PaimonJniReader::PaimonJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, @@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d for (auto& kv : range.table_format_params.paimon_params.paimon_options) { params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; } + if (range.table_format_params.paimon_params.__isset.hadoop_conf) { + for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) { + params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; + } + } _jni_connector = std::make_unique("org/apache/doris/paimon/PaimonJniScanner", params, column_names); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 6b6a6907270657..6ecf6cd1f153e1 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader { public: static const std::string PAIMON_OPTION_PREFIX; + static const std::string HADOOP_OPTION_PREFIX; PaimonJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 719a7ea0b9d9e9..4ef40d9fa1a6be 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -42,9 +42,12 @@ public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); - private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix."; + private static final String PAIMON_OPTION_PREFIX = "paimon."; + private static final String HADOOP_OPTION_PREFIX = "hadoop."; + private final Map params; private final Map paimonOptionParams; + private final Map hadoopOptionParams; private final String dbName; private final String tblName; private final String paimonSplit; @@ -87,6 +90,10 @@ public PaimonJniScanner(int batchSize, Map params) { .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + hadoopOptionParams = params.entrySet().stream() + .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) + .collect(Collectors + .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); } @Override @@ -207,7 +214,8 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException { } private void initTable() { - PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, dbName, tblName); + PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, + paimonOptionParams, hadoopOptionParams, dbName, tblName); TableExt tableExt = PaimonTableCache.getTable(key); if (tableExt.getCreateTime() < lastUpdateTime) { LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}", key, tableExt.getCreateTime(), @@ -223,3 +231,4 @@ private void initTable() { } } + diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java index f57ffeb5592156..12aac1533920b4 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.hadoop.conf.Configuration; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -54,7 +55,7 @@ public TableExt load(PaimonTableCacheKey key) { private static TableExt loadTable(PaimonTableCacheKey key) { try { LOG.warn("load table:{}", key); - Catalog catalog = createCatalog(key.getPaimonOptionParams()); + Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); return new TableExt(table, System.currentTimeMillis()); } catch (Catalog.TableNotExistException e) { @@ -63,10 +64,14 @@ private static TableExt loadTable(PaimonTableCacheKey key) { } } - private static Catalog createCatalog(Map paimonOptionParams) { + private static Catalog createCatalog( + Map paimonOptionParams, + Map hadoopOptionParams) { Options options = new Options(); paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue())); - CatalogContext context = CatalogContext.create(options); + Configuration hadoopConf = new Configuration(); + hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue())); + CatalogContext context = CatalogContext.create(options, hadoopConf); return CatalogFactory.createCatalog(context); } @@ -108,15 +113,19 @@ public static class PaimonTableCacheKey { // not in key private Map paimonOptionParams; + private Map hadoopOptionParams; private String dbName; private String tblName; public PaimonTableCacheKey(long ctlId, long dbId, long tblId, - Map paimonOptionParams, String dbName, String tblName) { + Map paimonOptionParams, + Map hadoopOptionParams, + String dbName, String tblName) { this.ctlId = ctlId; this.dbId = dbId; this.tblId = tblId; this.paimonOptionParams = paimonOptionParams; + this.hadoopOptionParams = hadoopOptionParams; this.dbName = dbName; this.tblName = tblName; } @@ -137,6 +146,10 @@ public Map getPaimonOptionParams() { return paimonOptionParams; } + public Map getHadoopOptionParams() { + return hadoopOptionParams; + } + public String getDbName() { return dbName; } @@ -171,6 +184,7 @@ public String toString() { + ", dbId=" + dbId + ", tblId=" + tblId + ", paimonOptionParams=" + paimonOptionParams + + ", hadoopOptionParams=" + hadoopOptionParams + ", dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 8ed8af15d86d02..661d41bd7c052e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -167,6 +167,8 @@ public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) { fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); + // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { DeletionFile deletionFile = optDeletionFile.get(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index fc1a6e6baf57a2..a060f5efab44c5 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -327,6 +327,7 @@ struct TPaimonFileDesc { 10: optional i64 last_update_time 11: optional string file_format 12: optional TPaimonDeletionFileDesc deletion_file; + 13: optional map hadoop_conf } struct TMaxComputeFileDesc {