Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
@Nullable private final Predicate predicate;
@Nullable private final RefreshBlacklist refreshBlacklist;

private final List<InternalRow.FieldGetter> projectFieldsGetters;

private transient File path;
private transient LookupTable lookupTable;

Expand Down Expand Up @@ -123,6 +125,11 @@ public FileStoreLookupFunction(
.mapToObj(i -> table.rowType().getFieldNames().get(i))
.collect(Collectors.toList());

this.projectFieldsGetters =
Arrays.stream(projection)
.mapToObj(i -> table.rowType().fieldGetters()[i])
.collect(Collectors.toList());

// add primary keys
for (String field : table.primaryKeys()) {
if (!projectFields.contains(field)) {
Expand Down Expand Up @@ -166,6 +173,11 @@ private void open() throws Exception {

List<String> fieldNames = table.rowType().getFieldNames();
int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
LOG.info(
"lookup projection fields in lookup table:{}, join fields in lookup table:{}",
projectFields,
joinKeys);

FileStoreTable storeTable = (FileStoreTable) table;

if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
Expand Down Expand Up @@ -236,6 +248,9 @@ public Collection<RowData> lookup(RowData keyRow) {
try {
tryRefresh();

if (LOG.isDebugEnabled()) {
LOG.debug("lookup key:{}", keyRow.toString());
}
InternalRow key = new FlinkRowWrapper(keyRow);
if (partitionLoader == null) {
return lookupInternal(key);
Expand Down Expand Up @@ -264,6 +279,16 @@ private List<RowData> lookupInternal(InternalRow key) throws IOException {
for (InternalRow matchedRow : lookupResults) {
rows.add(new FlinkRowData(matchedRow));
}

if (LOG.isDebugEnabled()) {
LOG.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (LOG.isDebugEnabled()) {
LOG.debug("xxx");
}

"matched rows in lookup table, size:{}, rows:{}",
lookupResults.size(),
lookupResults.stream()
.map(row -> logRow(projectFieldsGetters, row))
.collect(Collectors.toList()));
}

return rows;
}

Expand Down Expand Up @@ -413,4 +438,15 @@ protected Set<Integer> getRequireCachedBucketIds() {
protected void setCacheRowFilter(@Nullable Filter<InternalRow> cacheRowFilter) {
this.cacheRowFilter = cacheRowFilter;
}

private String logRow(List<InternalRow.FieldGetter> fieldGetters, InternalRow row) {
List<String> rowValues = new ArrayList<>(fieldGetters.size());

for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
Object fieldValue = fieldGetter.getFieldOrNull(row);
String value = fieldValue == null ? "null" : fieldValue.toString();
rowValues.add(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowValues.add(fieldValue == null ? "null" : fieldValue.toString());

}
return rowValues.toString();
}
}
Loading