Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public String unscheduleCompaction(
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
@CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath,
@CliOption(key = "partitionPath", unspecifiedDefaultValue = "", help = "partition path") final String partitionPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public String showAllFileSlices(

@CliCommand(value = "show fsview latest", help = "Show latest file-system view")
public String showLatestFileSlices(
@CliOption(key = {"partitionPath"}, help = "A valid partition path", mandatory = true) String partition,
@CliOption(key = {"partitionPath"}, help = "A valid partition path", unspecifiedDefaultValue = "") String partition,
@CliOption(key = {"baseFileOnly"}, help = "Only display base file view",
unspecifiedDefaultValue = "false") boolean baseFileOnly,
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String convert(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName,
@CliOption(key = "tableType", mandatory = true, help = "Table type") final String tableType,
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField,
@CliOption(key = "partitionPathField", mandatory = true,
@CliOption(key = "partitionPathField", unspecifiedDefaultValue = "",
help = "Partition path field name") final String partitionPathField,
@CliOption(key = {"parallelism"}, mandatory = true,
help = "Parallelism for hoodie insert") final String parallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
Expand Down Expand Up @@ -225,7 +226,7 @@ public String listPartitions(

@CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata")
public String listFiles(
@CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException {
@CliOption(key = {"partition"}, help = "Name of the partition to list files", unspecifiedDefaultValue = "") final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
Expand All @@ -235,8 +236,13 @@ public String listFiles(
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

Path partitionPath = new Path(HoodieCLI.basePath);
if (!StringUtils.isNullOrEmpty(partition)) {
partitionPath = new Path(HoodieCLI.basePath, partition);
}

HoodieTimer timer = new HoodieTimer().startTimer();
FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition));
FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
LOG.debug("Took " + timer.endTimer() + " ms");

final List<Comparable[]> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,73 @@
@Tag("functional")
public class TestFileSystemViewCommand extends CLIFunctionalTestHarness {

private String nonpartitionedTablePath;
private String partitionedTablePath;
private String partitionPath;
private SyncableFileSystemView fsView;
private SyncableFileSystemView nonpartitionedFsView;
private SyncableFileSystemView partitionedFsView;

@BeforeEach
public void init() throws IOException {
createNonpartitionedTable();
createPartitionedTable();
}

private void createNonpartitionedTable() throws IOException {
HoodieCLI.conf = hadoopConf();

// Create table and connect
String tableName = tableName();
String tablePath = tablePath(tableName);
String nonpartitionedTableName = "nonpartitioned_" + tableName();
nonpartitionedTablePath = tablePath(nonpartitionedTableName);
new TableCommand().createTable(
tablePath, tableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
nonpartitionedTablePath, nonpartitionedTableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

Files.createDirectories(Paths.get(nonpartitionedTablePath));

// Generate 2 commits
String commitTime1 = "3";
String commitTime2 = "4";

String fileId1 = UUID.randomUUID().toString();

// Write date files and log file
String testWriteToken = "2-0-2";
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeBaseFileName(commitTime1, testWriteToken, fileId1)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0, testWriteToken)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeBaseFileName(commitTime2, testWriteToken, fileId1)));
Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));

// Write commit files
Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime2 + ".commit"));

// Reload meta client and create fsView
metaClient = HoodieTableMetaClient.reload(metaClient);

nonpartitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
}

private void createPartitionedTable() throws IOException {
HoodieCLI.conf = hadoopConf();

// Create table and connect
String partitionedTableName = "partitioned_" + tableName();
partitionedTablePath = tablePath(partitionedTableName);
new TableCommand().createTable(
partitionedTablePath, partitionedTableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

partitionPath = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH;
String fullPartitionPath = Paths.get(tablePath, partitionPath).toString();
String fullPartitionPath = Paths.get(partitionedTablePath, partitionPath).toString();
Files.createDirectories(Paths.get(fullPartitionPath));

// Generate 2 commits
Expand All @@ -97,13 +146,13 @@ public void init() throws IOException {
.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken)));

// Write commit files
Files.createFile(Paths.get(tablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(tablePath, ".hoodie", commitTime2 + ".commit"));
Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime1 + ".commit"));
Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime2 + ".commit"));

// Reload meta client and create fsView
metaClient = HoodieTableMetaClient.reload(metaClient);

fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
partitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true);
}

/**
Expand All @@ -116,7 +165,7 @@ public void testShowCommits() {
assertTrue(cr.isSuccess());

// Get all file groups
Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);

List<Comparable[]> rows = new ArrayList<>();
fileGroups.forEach(fg -> fg.getAllFileSlices().forEach(fs -> {
Expand Down Expand Up @@ -164,7 +213,7 @@ public void testShowCommitsWithSpecifiedValues() {
assertTrue(cr.isSuccess());

List<Comparable[]> rows = new ArrayList<>();
Stream<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath);
Stream<HoodieFileGroup> fileGroups = partitionedFsView.getAllFileGroups(partitionPath);

// Only get instant 1, since maxInstant was specified 2
fileGroups.forEach(fg -> fg.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals("1")).forEach(fs -> {
Expand Down Expand Up @@ -197,17 +246,7 @@ public void testShowCommitsWithSpecifiedValues() {
assertEquals(expected, got);
}

/**
* Test case for command 'show fsview latest'.
*/
@Test
public void testShowLatestFileSlices() {
// Test show with partition path '2016/03/15'
CommandResult cr = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
assertTrue(cr.isSuccess());

Stream<FileSlice> fileSlice = fsView.getLatestFileSlices(partitionPath);

private List<Comparable[]> fileSlicesToCRList(Stream<FileSlice> fileSlice, String partitionPath) {
List<Comparable[]> rows = new ArrayList<>();
fileSlice.forEach(fs -> {
int idx = 0;
Expand Down Expand Up @@ -245,7 +284,14 @@ public void testShowLatestFileSlices() {
.collect(Collectors.toList()).toString();
rows.add(row);
});
return rows;
}

/**(
* Test case for command 'show fsview latest'.
*/
@Test
public void testShowLatestFileSlices() throws IOException {
Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
Expand All @@ -267,9 +313,32 @@ public void testShowLatestFileSlices() {
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_BASE_UNSCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_SCHEDULED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_UNSCHEDULED);
String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);

// Test show with partition path '2016/03/15'
new TableCommand().connect(partitionedTablePath, null, false, 0, 0, 0);
CommandResult partitionedTableCR = shell().executeCommand("show fsview latest --partitionPath " + partitionPath);
assertTrue(partitionedTableCR.isSuccess());

Stream<FileSlice> partitionedFileSlice = partitionedFsView.getLatestFileSlices(partitionPath);

List<Comparable[]> partitionedRows = fileSlicesToCRList(partitionedFileSlice, partitionPath);
String partitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, partitionedRows);
partitionedExpected = removeNonWordAndStripSpace(partitionedExpected);
String partitionedResults = removeNonWordAndStripSpace(partitionedTableCR.getResult().toString());
assertEquals(partitionedExpected, partitionedResults);

// Test show for non-partitioned table
new TableCommand().connect(nonpartitionedTablePath, null, false, 0, 0, 0);
CommandResult nonpartitionedTableCR = shell().executeCommand("show fsview latest");
assertTrue(nonpartitionedTableCR.isSuccess());

Stream<FileSlice> nonpartitionedFileSlice = nonpartitionedFsView.getLatestFileSlices("");

List<Comparable[]> nonpartitionedRows = fileSlicesToCRList(nonpartitionedFileSlice, "");

String nonpartitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, nonpartitionedRows);
nonpartitionedExpected = removeNonWordAndStripSpace(nonpartitionedExpected);
String nonpartitionedResults = removeNonWordAndStripSpace(nonpartitionedTableCR.getResult().toString());
assertEquals(nonpartitionedExpected, nonpartitionedResults);
}
}