diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java index 7c30a1038c1b..aeadc7cdd585 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java @@ -23,8 +23,15 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -33,15 +40,20 @@ import java.time.LocalDateTime; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Dynamic partition for lookup. */ public class DynamicPartitionLoader implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionLoader.class); + private static final long serialVersionUID = 1L; private static final String MAX_PT = "max_pt()"; @@ -51,6 +63,7 @@ public class DynamicPartitionLoader implements Serializable { private final Table table; private final Duration refreshInterval; private final int maxPartitionNum; + private final RowDataToObjectArrayConverter partitionConverter; private Comparator comparator; @@ -61,6 +74,8 @@ private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPar this.table = table; this.refreshInterval = refreshInterval; this.maxPartitionNum = maxPartitionNum; + this.partitionConverter = + new RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys())); } public void open() { @@ -81,6 +96,31 @@ public List partitions() { return partitions; } + public Predicate createSpecificPartFilter() { + Predicate partFilter = null; + for (BinaryRow partition : partitions) { + if (partFilter == null) { + partFilter = createSinglePartFilter(partition); + } else { + partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition)); + } + } + return partFilter; + } + + private Predicate createSinglePartFilter(BinaryRow partition) { + RowType rowType = table.rowType(); + List partitionKeys = table.partitionKeys(); + Object[] partitionSpec = partitionConverter.convert(partition); + Map partitionMap = new HashMap<>(partitionSpec.length); + for (int i = 0; i < partitionSpec.length; i++) { + partitionMap.put(partitionKeys.get(i), partitionSpec[i]); + } + + // create partition predicate base on rowType instead of partitionType + return createPartitionPredicate(rowType, partitionMap); + } + /** @return true if partition changed. */ public boolean checkRefresh() { if (lastRefresh != null @@ -88,23 +128,53 @@ public boolean checkRefresh() { return false; } + LOG.info( + "DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed after {} second(s), refreshing", + maxPartitionNum, + table.name(), + refreshInterval.toMillis() / 1000); + List newPartitions = getMaxPartitions(); lastRefresh = LocalDateTime.now(); if (newPartitions.size() != partitions.size()) { partitions = newPartitions; + logNewPartitions(); return true; } else { for (int i = 0; i < newPartitions.size(); i++) { if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) { partitions = newPartitions; + logNewPartitions(); return true; } } + LOG.info( + "DynamicPartitionLoader(maxPartitionNum={},table={}) didn't find new partitions.", + maxPartitionNum, + table.name()); return false; } } + private void logNewPartitions() { + String partitionsStr = + partitions.stream() + .map( + partition -> + InternalRowPartitionComputer.partToSimpleString( + table.rowType().project(table.partitionKeys()), + partition, + "-", + 200)) + .collect(Collectors.joining(",")); + LOG.info( + "DynamicPartitionLoader(maxPartitionNum={},table={}) finds new partitions: {}.", + maxPartitionNum, + table.name(), + partitionsStr); + } + private List getMaxPartitions() { List newPartitions = table.newReadBuilder().newScan().listPartitions().stream() @@ -126,6 +196,11 @@ public static DynamicPartitionLoader of(Table table) { return null; } + checkArgument( + !table.partitionKeys().isEmpty(), + "{} is not supported for non-partitioned table.", + LOOKUP_DYNAMIC_PARTITION); + int maxPartitionNum; switch (dynamicPartition.toLowerCase()) { case MAX_PT: diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index daf196d37187..2f3ec62bbc58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -28,14 +28,12 @@ import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.OutOfRangeException; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.Filter; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; @@ -58,10 +56,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -74,7 +70,6 @@ import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; -import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A lookup {@link TableFunction} for file store. */ @@ -168,12 +163,15 @@ private void open() throws Exception { int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); FileStoreTable storeTable = (FileStoreTable) table; + LOG.info("Creating lookup table for {}.", table.name()); if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { if (isRemoteServiceAvailable(storeTable)) { this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable( storeTable, projection, joinKeys); + LOG.info( + "Remote service is available. Created PrimaryKeyPartialLookupTable with remote service."); } else { try { this.lookupTable = @@ -183,7 +181,13 @@ private void open() throws Exception { path, joinKeys, getRequireCachedBucketIds()); - } catch (UnsupportedOperationException ignore2) { + LOG.info( + "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor."); + } catch (UnsupportedOperationException ignore) { + LOG.info( + "Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor " + + "because bucket mode isn't {}. Will create FullCacheLookupTable.", + BucketMode.HASH_FIXED); } } } @@ -199,6 +203,7 @@ private void open() throws Exception { joinKeys, getRequireCachedBucketIds()); this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); + LOG.info("Created {}.", lookupTable.getClass().getSimpleName()); } if (partitionLoader != null) { @@ -206,7 +211,7 @@ private void open() throws Exception { partitionLoader.checkRefresh(); List partitions = partitionLoader.partitions(); if (!partitions.isEmpty()) { - lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions)); + lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter()); } } @@ -267,33 +272,6 @@ private List lookupInternal(InternalRow key) throws IOException { return rows; } - private Predicate createSpecificPartFilter(List partitions) { - Predicate partFilter = null; - for (BinaryRow partition : partitions) { - if (partFilter == null) { - partFilter = createSinglePartFilter(partition); - } else { - partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition)); - } - } - return partFilter; - } - - private Predicate createSinglePartFilter(BinaryRow partition) { - RowType rowType = table.rowType(); - List partitionKeys = table.partitionKeys(); - Object[] partitionSpec = - new RowDataToObjectArrayConverter(rowType.project(partitionKeys)) - .convert(partition); - Map partitionMap = new HashMap<>(partitionSpec.length); - for (int i = 0; i < partitionSpec.length; i++) { - partitionMap.put(partitionKeys.get(i), partitionSpec[i]); - } - - // create partition predicate base on rowType instead of partitionType - return createPartitionPredicate(rowType, partitionMap); - } - private void reopen() { try { close(); @@ -321,7 +299,7 @@ void tryRefresh() throws Exception { if (partitionChanged) { // reopen with latest partition - lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions)); + lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter()); lookupTable.close(); lookupTable.open(); // no need to refresh the lookup table because it is reopened diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java index f43d80321ecc..72041811d6f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -30,6 +30,9 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import static org.apache.paimon.CoreOptions.StartupMode; @@ -41,6 +44,8 @@ */ public class LookupDataTableScan extends DataTableStreamScan { + private static final Logger LOG = LoggerFactory.getLogger(LookupDataTableScan.class); + private final StartupMode startupMode; private final LookupStreamScanMode lookupScanMode; @@ -69,6 +74,7 @@ protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) { if (plan != null) { return plan; } + LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", snapshot.id()); throw new ReopenException(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index 132b30138d0a..9615de48be25 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -26,6 +26,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; @@ -36,6 +37,9 @@ import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -50,6 +54,8 @@ /** A streaming reader to load data into {@link LookupTable}. */ public class LookupStreamingReader { + private static final Logger LOG = LoggerFactory.getLogger(LookupStreamingReader.class); + private final LookupFileStoreTable table; private final int[] projection; @Nullable private final Filter cacheRowFilter; @@ -103,6 +109,7 @@ public LookupStreamingReader( public RecordReader nextBatch(boolean useParallelism) throws Exception { List splits = scan.plan().splits(); + log(splits); CoreOptions options = CoreOptions.fromMap(table.options()); FunctionWithIOException> readerSupplier = split -> readBuilder.newRead().createReader(split); @@ -136,6 +143,19 @@ public RecordReader nextBatch(boolean useParallelism) throws Except return reader; } + private void log(List splits) { + if (splits.isEmpty()) { + LOG.info("LookupStreamingReader didn't get splits from {}.", table.name()); + return; + } + + DataSplit dataSplit = (DataSplit) splits.get(0); + LOG.info( + "LookupStreamingReader get splits from {} with snapshotId {}.", + table.name(), + dataSplit.snapshotId()); + } + @Nullable public Long nextSnapshotId() { return scan.checkpoint(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 7bd7a652b56e..255351767cd6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -34,6 +34,9 @@ import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.Closeable; @@ -186,8 +189,11 @@ interface QueryExecutor extends Closeable { static class LocalQueryExecutor implements QueryExecutor { + private static final Logger LOG = LoggerFactory.getLogger(LocalQueryExecutor.class); + private final LocalTableQuery tableQuery; private final StreamTableScan scan; + private final String tableName; private LocalQueryExecutor( FileStoreTable table, @@ -214,6 +220,8 @@ private LocalQueryExecutor( ? null : requireCachedBucketIds::contains) .newStreamScan(); + + this.tableName = table.name(); } @Override @@ -226,15 +234,13 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) public void refresh() { while (true) { List splits = scan.plan().splits(); + log(splits); + if (splits.isEmpty()) { return; } for (Split split : splits) { - if (!(split instanceof DataSplit)) { - throw new IllegalArgumentException( - "Unsupported split: " + split.getClass()); - } BinaryRow partition = ((DataSplit) split).partition(); int bucket = ((DataSplit) split).bucket(); List before = ((DataSplit) split).beforeFiles(); @@ -249,6 +255,19 @@ public void refresh() { public void close() throws IOException { tableQuery.close(); } + + private void log(List splits) { + if (splits.isEmpty()) { + LOG.info("LocalQueryExecutor didn't get splits from {}.", tableName); + return; + } + + DataSplit dataSplit = (DataSplit) splits.get(0); + LOG.info( + "LocalQueryExecutor get splits from {} with snapshotId {}.", + tableName, + dataSplit.snapshotId()); + } } static class RemoteQueryExecutor implements QueryExecutor {