Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class Block;

namespace doris::vectorized {

const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon.";
const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop.";

PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
Expand Down Expand Up @@ -65,6 +66,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}
if (range.table_format_params.paimon_params.__isset.hadoop_conf) {
for (auto& kv : range.table_format_params.paimon_params.hadoop_conf) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}
}
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
params, column_names);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PaimonJniReader : public JniReader {

public:
static const std::string PAIMON_OPTION_PREFIX;
static const std::string HADOOP_OPTION_PREFIX;
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@

public class PaimonJniScanner extends JniScanner {
private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class);
private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
private static final String PAIMON_OPTION_PREFIX = "paimon.";
private static final String HADOOP_OPTION_PREFIX = "hadoop.";

private final Map<String, String> params;
private final Map<String, String> paimonOptionParams;
private final Map<String, String> hadoopOptionParams;
private final String dbName;
private final String tblName;
private final String paimonSplit;
Expand Down Expand Up @@ -87,6 +90,10 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
}

@Override
Expand Down Expand Up @@ -207,7 +214,8 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException {
}

private void initTable() {
PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, dbName, tblName);
PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
paimonOptionParams, hadoopOptionParams, dbName, tblName);
TableExt tableExt = PaimonTableCache.getTable(key);
if (tableExt.getCreateTime() < lastUpdateTime) {
LOG.warn("invalidate cache table:{}, localTime:{}, remoteTime:{}", key, tableExt.getCreateTime(),
Expand All @@ -223,3 +231,4 @@ private void initTable() {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
Expand Down Expand Up @@ -54,7 +55,7 @@ public TableExt load(PaimonTableCacheKey key) {
private static TableExt loadTable(PaimonTableCacheKey key) {
try {
LOG.warn("load table:{}", key);
Catalog catalog = createCatalog(key.getPaimonOptionParams());
Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams());
Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName()));
return new TableExt(table, System.currentTimeMillis());
} catch (Catalog.TableNotExistException e) {
Expand All @@ -63,10 +64,14 @@ private static TableExt loadTable(PaimonTableCacheKey key) {
}
}

private static Catalog createCatalog(Map<String, String> paimonOptionParams) {
private static Catalog createCatalog(
Map<String, String> paimonOptionParams,
Map<String, String> hadoopOptionParams) {
Options options = new Options();
paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue()));
CatalogContext context = CatalogContext.create(options);
Configuration hadoopConf = new Configuration();
hadoopOptionParams.entrySet().stream().forEach(kv -> hadoopConf.set(kv.getKey(), kv.getValue()));
CatalogContext context = CatalogContext.create(options, hadoopConf);
return CatalogFactory.createCatalog(context);
}

Expand Down Expand Up @@ -108,15 +113,19 @@ public static class PaimonTableCacheKey {

// not in key
private Map<String, String> paimonOptionParams;
private Map<String, String> hadoopOptionParams;
private String dbName;
private String tblName;

public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
Map<String, String> paimonOptionParams, String dbName, String tblName) {
Map<String, String> paimonOptionParams,
Map<String, String> hadoopOptionParams,
String dbName, String tblName) {
this.ctlId = ctlId;
this.dbId = dbId;
this.tblId = tblId;
this.paimonOptionParams = paimonOptionParams;
this.hadoopOptionParams = hadoopOptionParams;
this.dbName = dbName;
this.tblName = tblName;
}
Expand All @@ -137,6 +146,10 @@ public Map<String, String> getPaimonOptionParams() {
return paimonOptionParams;
}

public Map<String, String> getHadoopOptionParams() {
return hadoopOptionParams;
}

public String getDbName() {
return dbName;
}
Expand Down Expand Up @@ -171,6 +184,7 @@ public String toString() {
+ ", dbId=" + dbId
+ ", tblId=" + tblId
+ ", paimonOptionParams=" + paimonOptionParams
+ ", hadoopOptionParams=" + hadoopOptionParams
+ ", dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId());
fileDesc.setTblId(source.getTargetTable().getId());
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
// The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration()
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
DeletionFile deletionFile = optDeletionFile.get();
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ struct TPaimonFileDesc {
10: optional i64 last_update_time
11: optional string file_format
12: optional TPaimonDeletionFileDesc deletion_file;
13: optional map<string, string> hadoop_conf
}

struct TMaxComputeFileDesc {
Expand Down