From 4614befd2feb81e806038acde170fc9a82ceb237 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 7 Jan 2025 19:26:38 +0800 Subject: [PATCH 1/6] [log] logging specific join keys and results for lookup join --- .../flink/lookup/FileStoreLookupFunction.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index daf196d37187..97488849bfb2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -91,6 +91,9 @@ public class FileStoreLookupFunction implements Serializable, Closeable { @Nullable private final Predicate predicate; @Nullable private final RefreshBlacklist refreshBlacklist; + private final List joinKeysGetters; + private final List projectFieldsGetters; + private transient File path; private transient LookupTable lookupTable; @@ -123,6 +126,16 @@ public FileStoreLookupFunction( .mapToObj(i -> table.rowType().getFieldNames().get(i)) .collect(Collectors.toList()); + this.joinKeysGetters = + Arrays.stream(joinKeyIndex) + .mapToObj(i -> table.rowType().fieldGetters()[projection[i]]) + .collect(Collectors.toList()); + + this.projectFieldsGetters = + Arrays.stream(projection) + .mapToObj(i -> table.rowType().fieldGetters()[i]) + .collect(Collectors.toList()); + // add primary keys for (String field : table.primaryKeys()) { if (!projectFields.contains(field)) { @@ -166,6 +179,8 @@ private void open() throws Exception { List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); + LOG.info("lookup projection fields: {}, join fields:{}", projectFields, joinKeys); + FileStoreTable storeTable = (FileStoreTable) table; if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO @@ -249,6 +264,19 @@ public Collection lookup(RowData keyRow) { for (BinaryRow partition : partitionLoader.partitions()) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } + + try { + LOG.debug( + "lookup key: {}, matched rows size: {}, matched rows: {}", + logRow(joinKeysGetters, key), + results.size(), + results.stream() + .map(row -> logRow(projectFieldsGetters, row)) + .collect(Collectors.toList())); + } catch (Exception e) { + LOG.error("Error occurs when logging specific join keys and results"); + } + return rows; } catch (OutOfRangeException | ReopenException e) { reopen(); @@ -413,4 +441,15 @@ protected Set getRequireCachedBucketIds() { protected void setCacheRowFilter(@Nullable Filter cacheRowFilter) { this.cacheRowFilter = cacheRowFilter; } + + private String logRow(List fieldGetters, InternalRow row) { + List rowValues = new ArrayList<>(fieldGetters.size()); + + for (InternalRow.FieldGetter fieldGetter : fieldGetters) { + Object fieldValue = fieldGetter.getFieldOrNull(row); + String value = fieldValue == null ? "null" : fieldValue.toString(); + rowValues.add(value); + } + return rowValues.toString(); + } } From 1b4856a2ec3d49ecdc27e5377afcc1b5f6ef4c0e Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 7 Jan 2025 19:38:32 +0800 Subject: [PATCH 2/6] fix --- .../flink/lookup/FileStoreLookupFunction.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 97488849bfb2..ece101b06ef3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -265,18 +265,6 @@ public Collection lookup(RowData keyRow) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } - try { - LOG.debug( - "lookup key: {}, matched rows size: {}, matched rows: {}", - logRow(joinKeysGetters, key), - results.size(), - results.stream() - .map(row -> logRow(projectFieldsGetters, row)) - .collect(Collectors.toList())); - } catch (Exception e) { - LOG.error("Error occurs when logging specific join keys and results"); - } - return rows; } catch (OutOfRangeException | ReopenException e) { reopen(); @@ -292,6 +280,18 @@ private List lookupInternal(InternalRow key) throws IOException { for (InternalRow matchedRow : lookupResults) { rows.add(new FlinkRowData(matchedRow)); } + + try { + LOG.debug( + "lookup key: {}, matched rows size: {}, matched rows: {}", + logRow(joinKeysGetters, key), + lookupResults.size(), + lookupResults.stream() + .map(row -> logRow(projectFieldsGetters, row)) + .collect(Collectors.toList())); + } catch (Exception e) { + LOG.error("Error occurs when logging specific join keys and results"); + } return rows; } From db5caad58319b5bdd9dafba23ec712db61e82cca Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Wed, 8 Jan 2025 09:59:33 +0800 Subject: [PATCH 3/6] fix --- .../paimon/flink/lookup/FileStoreLookupFunction.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index ece101b06ef3..b9109d4d4257 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -179,7 +179,7 @@ private void open() throws Exception { List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); - LOG.info("lookup projection fields: {}, join fields:{}", projectFields, joinKeys); + LOG.info("lookup projection fields:{}, join fields:{}", projectFields, joinKeys); FileStoreTable storeTable = (FileStoreTable) table; @@ -281,17 +281,16 @@ private List lookupInternal(InternalRow key) throws IOException { rows.add(new FlinkRowData(matchedRow)); } - try { + if (LOG.isDebugEnabled()) { LOG.debug( - "lookup key: {}, matched rows size: {}, matched rows: {}", + "lookup key:{}, matched rows size:{}, matched rows:{}", logRow(joinKeysGetters, key), lookupResults.size(), lookupResults.stream() .map(row -> logRow(projectFieldsGetters, row)) .collect(Collectors.toList())); - } catch (Exception e) { - LOG.error("Error occurs when logging specific join keys and results"); } + return rows; } From 8c79686a9578344ee202e3fbb56a42a41c49b621 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 10:18:41 +0800 Subject: [PATCH 4/6] fix log --- .../flink/lookup/FileStoreLookupFunction.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index b9109d4d4257..852390f78f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -91,7 +91,6 @@ public class FileStoreLookupFunction implements Serializable, Closeable { @Nullable private final Predicate predicate; @Nullable private final RefreshBlacklist refreshBlacklist; - private final List joinKeysGetters; private final List projectFieldsGetters; private transient File path; @@ -126,11 +125,6 @@ public FileStoreLookupFunction( .mapToObj(i -> table.rowType().getFieldNames().get(i)) .collect(Collectors.toList()); - this.joinKeysGetters = - Arrays.stream(joinKeyIndex) - .mapToObj(i -> table.rowType().fieldGetters()[projection[i]]) - .collect(Collectors.toList()); - this.projectFieldsGetters = Arrays.stream(projection) .mapToObj(i -> table.rowType().fieldGetters()[i]) @@ -179,7 +173,10 @@ private void open() throws Exception { List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); - LOG.info("lookup projection fields:{}, join fields:{}", projectFields, joinKeys); + LOG.info( + "lookup projection fields in lookup table:{}, join fields in lookup table:{}", + projectFields, + joinKeys); FileStoreTable storeTable = (FileStoreTable) table; @@ -251,6 +248,9 @@ public Collection lookup(RowData keyRow) { try { tryRefresh(); + if (LOG.isDebugEnabled()) { + LOG.debug("lookup key:{}", keyRow.toString()); + } InternalRow key = new FlinkRowWrapper(keyRow); if (partitionLoader == null) { return lookupInternal(key); @@ -264,7 +264,6 @@ public Collection lookup(RowData keyRow) { for (BinaryRow partition : partitionLoader.partitions()) { rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } - return rows; } catch (OutOfRangeException | ReopenException e) { reopen(); @@ -283,8 +282,7 @@ private List lookupInternal(InternalRow key) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug( - "lookup key:{}, matched rows size:{}, matched rows:{}", - logRow(joinKeysGetters, key), + "matched rows in lookup table, size:{}, rows:{}", lookupResults.size(), lookupResults.stream() .map(row -> logRow(projectFieldsGetters, row)) From 5a24dcd3c00493489be9af31cdf94784b58160c4 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 10:19:30 +0800 Subject: [PATCH 5/6] [temp] trun on the debug mode in FileStoreLookupFunction --- .../src/test/resources/log4j2-test.properties | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties index 1b3980d15104..0c000e93e00b 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties +++ b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties @@ -26,3 +26,9 @@ appender.testlogger.type = CONSOLE appender.testlogger.target = SYSTEM_ERR appender.testlogger.layout.type = PatternLayout appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n + +# Set DEBUG level for a specific class +logger.myclass.name =org.apache.paimon.flink.lookup.FileStoreLookupFunction +logger.myclass.level = DEBUG +logger.myclass.additivity = false +logger.myclass.appenderRef.test.ref = TestLogger From 1771129e2598666fbd5d5487e56173b144ce82e1 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 11:48:28 +0800 Subject: [PATCH 6/6] remove log property for debug --- .../src/test/resources/log4j2-test.properties | 6 ------ 1 file changed, 6 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties index 0c000e93e00b..1b3980d15104 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties +++ b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties @@ -26,9 +26,3 @@ appender.testlogger.type = CONSOLE appender.testlogger.target = SYSTEM_ERR appender.testlogger.layout.type = PatternLayout appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n - -# Set DEBUG level for a specific class -logger.myclass.name =org.apache.paimon.flink.lookup.FileStoreLookupFunction -logger.myclass.level = DEBUG -logger.myclass.additivity = false -logger.myclass.appenderRef.test.ref = TestLogger