Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.FunctionWithException;
Expand Down Expand Up @@ -104,6 +105,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for hadoop file io?

throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -85,6 +87,43 @@ void testFileDoesNotExist() throws IOException {
assertThat(fs.exists(new Path(basePath, randomName()))).isFalse();
}

// --- list files

@Test
void testListFilesIterativeNonRecursive() throws IOException {
Path fileA = createRandomFileInDirectory(basePath);
Path dirB = new Path(basePath, randomName());
fs.mkdirs(dirB);
Path fileBC = createRandomFileInDirectory(dirB);

List<FileStatus> allFiles = new ArrayList<>();
try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath, false)) {
while (iter.hasNext()) {
allFiles.add(iter.next());
}
}
assertThat(allFiles.size()).isEqualTo(1);
assertThat(allFiles.get(0).getPath()).isEqualTo(fileA);
}

@Test
void testListFilesIterativeRecursive() throws IOException {
Path fileA = createRandomFileInDirectory(basePath);
Path dirB = new Path(basePath, randomName());
fs.mkdirs(dirB);
Path fileBC = createRandomFileInDirectory(dirB);

List<FileStatus> allFiles = new ArrayList<>();
try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath, true)) {
while (iter.hasNext()) {
allFiles.add(iter.next());
}
}
assertThat(allFiles.size()).isEqualTo(2);
assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileA)).count()).isEqualTo(1);
assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileBC)).count()).isEqualTo(1);
}

// --- delete

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down
Loading