Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -66,8 +64,6 @@
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -372,15 +368,7 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>
@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getTableName();
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths());
case CATALOG_OPTIONS:
return new CatalogOptionsTable(catalogOptions);
default:
throw new TableNotExistException(identifier);
}
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
Expand Down Expand Up @@ -454,22 +442,6 @@ public Path newDatabasePath(String database) {
return newDatabasePath(warehouse(), database);
}

public Map<String, Map<String, Path>> allTablePaths() {
try {
Map<String, Map<String, Path>> allPaths = new HashMap<>();
for (String database : listDatabases()) {
Map<String, Path> tableMap =
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
tableMap.put(table, getTableLocation(Identifier.create(database, table)));
}
}
return allPaths;
} catch (DatabaseNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
}

protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException {
return new TableMeta(getDataTableSchema(identifier), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -39,6 +42,8 @@
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utils for {@link Catalog}. */
Expand Down Expand Up @@ -121,6 +126,31 @@ public static void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

public static Table createGlobalSystemTable(String tableName, Catalog catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
try {
Map<Identifier, Map<String, String>> allOptions = new HashMap<>();
for (String database : catalog.listDatabases()) {
for (String name : catalog.listTables(database)) {
Identifier identifier = Identifier.create(database, name);
Table table = catalog.getTable(identifier);
allOptions.put(identifier, table.options());
}
}
return new AllTableOptionsTable(allOptions);
} catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
case CATALOG_OPTIONS:
return new CatalogOptionsTable(Options.fromMap(catalog.options()));
default:
throw new Catalog.TableNotExistException(
Identifier.create(SYSTEM_DATABASE_NAME, tableName));
}
}

public static Table createSystemTable(Identifier identifier, Table originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
throw new UnsupportedOperationException("TODO support global system tables.");
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
return getSystemTable(identifier);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.paimon.table.system;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand All @@ -45,7 +43,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -68,13 +65,10 @@ public class AllTableOptionsTable implements ReadonlyTable {

public static final String ALL_TABLE_OPTIONS = "all_table_options";

private final FileIO fileIO;
private final Map<String, Map<String, Path>> allTablePaths;
private final Map<Identifier, Map<String, String>> allOptions;

public AllTableOptionsTable(FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
// allTablePath is the map of <database, <table_name, properties>>
this.fileIO = fileIO;
this.allTablePaths = allTablePaths;
public AllTableOptionsTable(Map<Identifier, Map<String, String>> allOptions) {
this.allOptions = allOptions;
}

@Override
Expand Down Expand Up @@ -104,12 +98,12 @@ public InnerTableScan newScan() {

@Override
public InnerTableRead newRead() {
return new AllTableOptionsRead(fileIO);
return new AllTableOptionsRead();
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new AllTableOptionsTable(fileIO, allTablePaths);
return new AllTableOptionsTable(allOptions);
}

private class AllTableOptionsScan extends ReadOnceTableScan {
Expand All @@ -121,18 +115,18 @@ public InnerTableScan withFilter(Predicate predicate) {

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new AllTableSplit(allTablePaths));
return () -> Collections.singletonList(new AllTableSplit(allOptions));
}
}

private static class AllTableSplit extends SingletonSplit {

private static final long serialVersionUID = 1L;

private final Map<String, Map<String, Path>> allTablePaths;
private final Map<Identifier, Map<String, String>> allOptions;

private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
this.allTablePaths = allTablePaths;
private AllTableSplit(Map<Identifier, Map<String, String>> allOptions) {
this.allOptions = allOptions;
}

@Override
Expand All @@ -144,24 +138,19 @@ public boolean equals(Object o) {
return false;
}
AllTableSplit that = (AllTableSplit) o;
return Objects.equals(allTablePaths, that.allTablePaths);
return Objects.equals(allOptions, that.allOptions);
}

@Override
public int hashCode() {
return Objects.hash(allTablePaths);
return Objects.hash(allOptions);
}
}

private static class AllTableOptionsRead implements InnerTableRead {

private final FileIO fileIO;
private RowType readType;

public AllTableOptionsRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
Expand All @@ -183,29 +172,12 @@ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof AllTableSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Map<String, Map<String, Path>> location = ((AllTableSplit) split).allTablePaths;
Iterator<InternalRow> rows = toRow(options(fileIO, location));
if (readType != null) {
rows =
Iterators.transform(
rows,
row ->
ProjectedRow.from(
readType, AggregationFieldsTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}
}

protected static Iterator<InternalRow> toRow(
Map<String, Map<String, Map<String, String>>> option) {
List<InternalRow> rows = new ArrayList<>();
for (Map.Entry<String, Map<String, Map<String, String>>> entry0 : option.entrySet()) {
String database = entry0.getKey();
for (Map.Entry<String, Map<String, String>> entry1 : entry0.getValue().entrySet()) {
String tableName = entry1.getKey();
for (Map.Entry<String, String> entry2 : entry1.getValue().entrySet()) {
List<InternalRow> rows = new ArrayList<>();
for (Map.Entry<Identifier, Map<String, String>> entry :
((AllTableSplit) split).allOptions.entrySet()) {
String database = entry.getKey().getDatabaseName();
String tableName = entry.getKey().getTableName();
for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
String key = entry2.getKey();
String value = entry2.getValue();
rows.add(
Expand All @@ -216,25 +188,17 @@ protected static Iterator<InternalRow> toRow(
BinaryString.fromString(value)));
}
}
}
return rows.iterator();
}

protected static Map<String, Map<String, Map<String, String>>> options(
FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
Map<String, Map<String, Map<String, String>>> allOptions = new HashMap<>();
for (Map.Entry<String, Map<String, Path>> entry0 : allTablePaths.entrySet()) {
Map<String, Map<String, String>> m0 =
allOptions.computeIfAbsent(entry0.getKey(), k -> new HashMap<>());
for (Map.Entry<String, Path> entry1 : entry0.getValue().entrySet()) {
Map<String, String> options =
new SchemaManager(fileIO, entry1.getValue())
.latest()
.orElseThrow(() -> new RuntimeException("Table not exists."))
.options();
m0.put(entry1.getKey(), options);
Iterator<InternalRow> iterator = rows.iterator();
if (readType != null) {
iterator =
Iterators.transform(
iterator,
row ->
ProjectedRow.from(
readType, AggregationFieldsTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(iterator);
}
return allOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,6 @@ public void testGetTable() throws Exception {
() -> catalog.getTable(Identifier.create("non_existing_db", "test_table")))
.withMessage("Table non_existing_db.test_table does not exist.");

// Get all table options from system database
if (!supportGetFromSystemDatabase()) {
return;
}
Table allTableOptionsTable =
catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, ALL_TABLE_OPTIONS));
assertThat(allTableOptionsTable).isNotNull();
Expand Down Expand Up @@ -1029,10 +1025,6 @@ public void testTableUUID() throws Exception {
.isGreaterThan(0);
}

protected boolean supportGetFromSystemDatabase() {
return true;
}

protected boolean supportsAlterDatabase() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ public void tearDown() throws Exception {
restCatalogServer.shutdown();
}

@Override
protected boolean supportGetFromSystemDatabase() {
return false;
}

@Test
void testInitFailWhenDefineWarehouse() {
Options options = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public void before() throws Exception {
}

@Test
public void testSchemasTable() throws Exception {
public void testAllTableOptionsTable() throws Exception {
List<String> result =
read(allTableOptionsTable).stream()
.map(Objects::toString)
.collect(Collectors.toList());
result = result.stream().filter(r -> !r.contains("path")).collect(Collectors.toList());
assertThat(result)
.containsExactlyInAnyOrder(
"+I(default,T,fields.sales.aggregate-function,sum)",
Expand Down
Loading
Loading