Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -33,15 +40,20 @@
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Dynamic partition for lookup. */
public class DynamicPartitionLoader implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionLoader.class);

private static final long serialVersionUID = 1L;

private static final String MAX_PT = "max_pt()";
Expand All @@ -51,6 +63,7 @@ public class DynamicPartitionLoader implements Serializable {
private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;
private final RowDataToObjectArrayConverter partitionConverter;

private Comparator<InternalRow> comparator;

Expand All @@ -61,6 +74,8 @@ private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPar
this.table = table;
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
this.partitionConverter =
new RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
}

public void open() {
Expand All @@ -81,30 +96,85 @@ public List<BinaryRow> partitions() {
return partitions;
}

public Predicate createSpecificPartFilter() {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec = partitionConverter.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return createPartitionPredicate(rowType, partitionMap);
}

/** @return true if partition changed. */
public boolean checkRefresh() {
if (lastRefresh != null
&& !lastRefresh.plus(refreshInterval).isBefore(LocalDateTime.now())) {
return false;
}

LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed after {} second(s), refreshing",
maxPartitionNum,
table.name(),
refreshInterval.toMillis() / 1000);

List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();

if (newPartitions.size() != partitions.size()) {
partitions = newPartitions;
logNewPartitions();
return true;
} else {
for (int i = 0; i < newPartitions.size(); i++) {
if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) {
partitions = newPartitions;
logNewPartitions();
return true;
}
}
LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) didn't find new partitions.",
maxPartitionNum,
table.name());
return false;
}
}

private void logNewPartitions() {
String partitionsStr =
partitions.stream()
.map(
partition ->
InternalRowPartitionComputer.partToSimpleString(
table.rowType().project(table.partitionKeys()),
partition,
"-",
200))
.collect(Collectors.joining(","));
LOG.info(
"DynamicPartitionLoader(maxPartitionNum={},table={}) finds new partitions: {}.",
maxPartitionNum,
table.name(),
partitionsStr);
}

private List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
Expand All @@ -126,6 +196,11 @@ public static DynamicPartitionLoader of(Table table) {
return null;
}

checkArgument(
!table.partitionKeys().isEmpty(),
"{} is not supported for non-partitioned table.",
LOOKUP_DYNAMIC_PARTITION);

int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

Expand All @@ -58,10 +56,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -74,7 +70,6 @@
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;

/** A lookup {@link TableFunction} for file store. */
Expand Down Expand Up @@ -168,12 +163,15 @@ private void open() throws Exception {
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
FileStoreTable storeTable = (FileStoreTable) table;

LOG.info("Creating lookup table for {}.", table.name());
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) {
if (isRemoteServiceAvailable(storeTable)) {
this.lookupTable =
PrimaryKeyPartialLookupTable.createRemoteTable(
storeTable, projection, joinKeys);
LOG.info(
"Remote service is available. Created PrimaryKeyPartialLookupTable with remote service.");
} else {
try {
this.lookupTable =
Expand All @@ -183,7 +181,13 @@ private void open() throws Exception {
path,
joinKeys,
getRequireCachedBucketIds());
} catch (UnsupportedOperationException ignore2) {
LOG.info(
"Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
} catch (UnsupportedOperationException ignore) {
LOG.info(
"Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor "
+ "because bucket mode isn't {}. Will create FullCacheLookupTable.",
BucketMode.HASH_FIXED);
}
}
Copy link
Contributor

@LinMingQiang LinMingQiang Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the reason why it is not supported. (Cus table bucket mode is not HASH_FIXED, so will be use FullCacheLookupTable).

}
Expand All @@ -199,14 +203,15 @@ private void open() throws Exception {
joinKeys,
getRequireCachedBucketIds());
this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS));
LOG.info("Created {}.", lookupTable.getClass().getSimpleName());
}

if (partitionLoader != null) {
partitionLoader.open();
partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion > Add a log where LookupTable is created: Which type of LookupTable will be created and why..

Expand Down Expand Up @@ -267,33 +272,6 @@ private List<RowData> lookupInternal(InternalRow key) throws IOException {
return rows;
}

private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec =
new RowDataToObjectArrayConverter(rowType.project(partitionKeys))
.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return createPartitionPredicate(rowType, partitionMap);
}

private void reopen() {
try {
close();
Expand Down Expand Up @@ -321,7 +299,7 @@ void tryRefresh() throws Exception {

if (partitionChanged) {
// reopen with latest partition
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static org.apache.paimon.CoreOptions.StartupMode;
Expand All @@ -41,6 +44,8 @@
*/
public class LookupDataTableScan extends DataTableStreamScan {

private static final Logger LOG = LoggerFactory.getLogger(LookupDataTableScan.class);

private final StartupMode startupMode;
private final LookupStreamScanMode lookupScanMode;

Expand Down Expand Up @@ -69,6 +74,7 @@ protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
if (plan != null) {
return plan;
}
LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", snapshot.id());
throw new ReopenException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
Expand All @@ -36,6 +37,9 @@

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -50,6 +54,8 @@
/** A streaming reader to load data into {@link LookupTable}. */
public class LookupStreamingReader {

private static final Logger LOG = LoggerFactory.getLogger(LookupStreamingReader.class);

private final LookupFileStoreTable table;
private final int[] projection;
@Nullable private final Filter<InternalRow> cacheRowFilter;
Expand Down Expand Up @@ -103,6 +109,7 @@ public LookupStreamingReader(

public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Exception {
List<Split> splits = scan.plan().splits();
log(splits);
CoreOptions options = CoreOptions.fromMap(table.options());
FunctionWithIOException<Split, RecordReader<InternalRow>> readerSupplier =
split -> readBuilder.newRead().createReader(split);
Expand Down Expand Up @@ -136,6 +143,19 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except
return reader;
}

private void log(List<Split> splits) {
if (splits.isEmpty()) {
LOG.info("LookupStreamingReader didn't get splits from {}.", table.name());
return;
}

DataSplit dataSplit = (DataSplit) splits.get(0);
LOG.info(
"LookupStreamingReader get splits from {} with snapshotId {}.",
table.name(),
dataSplit.snapshotId());
}

@Nullable
public Long nextSnapshotId() {
return scan.checkpoint();
Expand Down
Loading
Loading