diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index daf196d37187..852390f78f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -91,6 +91,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable { @Nullable private final Predicate predicate; @Nullable private final RefreshBlacklist refreshBlacklist; + private final List projectFieldsGetters; + private transient File path; private transient LookupTable lookupTable; @@ -123,6 +125,11 @@ public FileStoreLookupFunction( .mapToObj(i -> table.rowType().getFieldNames().get(i)) .collect(Collectors.toList()); + this.projectFieldsGetters = + Arrays.stream(projection) + .mapToObj(i -> table.rowType().fieldGetters()[i]) + .collect(Collectors.toList()); + // add primary keys for (String field : table.primaryKeys()) { if (!projectFields.contains(field)) { @@ -166,6 +173,11 @@ private void open() throws Exception { List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); + LOG.info( + "lookup projection fields in lookup table:{}, join fields in lookup table:{}", + projectFields, + joinKeys); + FileStoreTable storeTable = (FileStoreTable) table; if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO @@ -236,6 +248,9 @@ public Collection lookup(RowData keyRow) { try { tryRefresh(); + if (LOG.isDebugEnabled()) { + LOG.debug("lookup key:{}", keyRow.toString()); + } InternalRow key = new FlinkRowWrapper(keyRow); if (partitionLoader == null) { return lookupInternal(key); @@ -264,6 +279,16 @@ private List lookupInternal(InternalRow key) throws IOException { for (InternalRow matchedRow : lookupResults) { rows.add(new FlinkRowData(matchedRow)); } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "matched rows in lookup table, size:{}, rows:{}", + lookupResults.size(), + lookupResults.stream() + .map(row -> logRow(projectFieldsGetters, row)) + .collect(Collectors.toList())); + } + return rows; } @@ -413,4 +438,15 @@ protected Set getRequireCachedBucketIds() { protected void setCacheRowFilter(@Nullable Filter cacheRowFilter) { this.cacheRowFilter = cacheRowFilter; } + + private String logRow(List fieldGetters, InternalRow row) { + List rowValues = new ArrayList<>(fieldGetters.size()); + + for (InternalRow.FieldGetter fieldGetter : fieldGetters) { + Object fieldValue = fieldGetter.getFieldOrNull(row); + String value = fieldValue == null ? "null" : fieldValue.toString(); + rowValues.add(value); + } + return rowValues.toString(); + } }