diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java index b1be840c5153..2aa3a0be2e95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java @@ -23,56 +23,40 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.RemoteIterator; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; /** Util class for refreshing object table. */ public class ObjectRefresh { - public static long refresh(ObjectTable table) throws Exception { - String location = table.objectLocation(); + private static final long COMMIT_BATCH_SIZE = 10_000; - // 1. collect all files for object table - List fileCollector = new ArrayList<>(); - listAllFiles(table.objectFileIO(), new Path(location), fileCollector); + public static long refresh(ObjectTable table) throws Exception { + long totalObjs = 0; - // 2. write to underlying table BatchWriteBuilder writeBuilder = table.underlyingTable().newBatchWriteBuilder().withOverwrite(); - try (BatchTableWrite write = writeBuilder.newWrite(); - BatchTableCommit commit = writeBuilder.newCommit()) { - for (FileStatus file : fileCollector) { - write.write(toRow(file)); + try (RemoteIterator objIter = + table.objectFileIO().listFilesIterative(new Path(table.objectLocation()), true)) { + while (objIter.hasNext()) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (int i = 0; i < COMMIT_BATCH_SIZE && objIter.hasNext(); i++) { + totalObjs++; + write.write(toRow(objIter.next())); + } + commit.commit(write.prepareCommit()); + } } - commit.commit(write.prepareCommit()); - } - - return fileCollector.size(); - } - - private static void listAllFiles(FileIO fileIO, Path directory, List fileCollector) - throws IOException { - FileStatus[] files = fileIO.listStatus(directory); - if (files == null) { - return; } - for (FileStatus file : files) { - if (file.isDir()) { - listAllFiles(fileIO, file.getPath(), fileCollector); - } else { - fileCollector.add(file); - } - } + return totalObjs; } private static InternalRow toRow(FileStatus file) {