From 64c250e1b0877e98b29ae326d276102c7cf8a106 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 26 Dec 2024 19:18:16 +0800 Subject: [PATCH 1/2] [core] Support to query indexes --- docs/content/concepts/system-tables.md | 19 ++ .../paimon/table/system/IndexesTable.java | 238 ++++++++++++++++++ .../table/system/SystemTableLoader.java | 2 + .../paimon/flink/SystemTableITCase.java | 32 +++ 4 files changed, 291 insertions(+) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java diff --git a/docs/content/concepts/system-tables.md b/docs/content/concepts/system-tables.md index 5795aea419fb..47ba39fd4163 100644 --- a/docs/content/concepts/system-tables.md +++ b/docs/content/concepts/system-tables.md @@ -389,6 +389,25 @@ SELECT * FROM T$statistics; */ ``` +### Indexes Table + +You can query the table's index files generated for dynamic bucket table (index_type = HASH) and deletion vectors +(index_type = DELETION_VECTORS) through indexes table. + +```sql +SELECT * FROM my_table$indexes; + +/* ++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+ +| partition | bucket | index_type | file_name | file_size | row_count | dv_ranges | ++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+ +| [2024-10-01] | 0 | HASH | index-70abfebf-149e-4796-9f... | 12 | 3 | | +| [2024-10-01] | 0 | DELETION_VECTORS | index-633857e7-cdce-47d2-87... | 33 | 1 | [(data-346cb9c8-4032-4d66-a... | ++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+ +2 rows in set +*/ +``` + ## Global System Table Global system tables contain the statistical information of all the tables exists in paimon. For convenient of searching, we create a reference system database called `sys`. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java new file mode 100644 index 000000000000..e890538580b6 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMetaSerializer; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.SingletonSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import org.apache.paimon.utils.SerializationUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** A {@link Table} for showing committing snapshots of table. */ +public class IndexesTable implements ReadonlyTable { + + private static final Logger LOG = LoggerFactory.getLogger(IndexesTable.class); + + public static final String INDEXES = "indexes"; + + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "bucket", new IntType(false)), + new DataField(2, "index_type", newStringType(false)), + new DataField(3, "file_name", newStringType(false)), + new DataField(4, "file_size", new BigIntType(false)), + new DataField(5, "row_count", new BigIntType(false)), + new DataField( + 6, + "dv_ranges", + new ArrayType(true, DeletionVectorMeta.SCHEMA)))); + + private final FileStoreTable dataTable; + + public IndexesTable(FileStoreTable dataTable) { + this.dataTable = dataTable; + } + + @Override + public InnerTableScan newScan() { + return new IndexesScan(); + } + + @Override + public InnerTableRead newRead() { + return new IndexesRead(dataTable); + } + + @Override + public String name() { + return dataTable.name() + SYSTEM_TABLE_SPLITTER + INDEXES; + } + + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public List primaryKeys() { + return Collections.singletonList("file_name"); + } + + @Override + public Table copy(Map dynamicOptions) { + return new IndexesTable(dataTable.copy(dynamicOptions)); + } + + private static class IndexesScan extends ReadOnceTableScan { + + @Override + public InnerTableScan withFilter(Predicate predicate) { + return this; + } + + @Override + protected Plan innerPlan() { + return () -> Collections.singletonList(new IndexesSplit()); + } + } + + private static class IndexesSplit extends SingletonSplit { + + private static final long serialVersionUID = 1L; + + private IndexesSplit() {} + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + } + + private static class IndexesRead implements InnerTableRead { + + private RowType readType; + + private final FileStoreTable dataTable; + + public IndexesRead(FileStoreTable dataTable) { + this.dataTable = dataTable; + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + return this; + } + + @Override + public InnerTableRead withReadType(RowType readType) { + this.readType = readType; + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) { + if (!(split instanceof IndexesSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + List manifestFileMetas = allIndexEntries(dataTable); + + RowDataToObjectArrayConverter partitionConverter = + new RowDataToObjectArrayConverter(dataTable.schema().logicalPartitionType()); + + Iterator rows = + Iterators.transform( + manifestFileMetas.iterator(), + indexManifestEntry -> toRow(indexManifestEntry, partitionConverter)); + if (readType != null) { + rows = + Iterators.transform( + rows, + row -> + ProjectedRow.from(readType, IndexesTable.TABLE_TYPE) + .replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private InternalRow toRow( + IndexManifestEntry indexManifestEntry, + RowDataToObjectArrayConverter partitionConverter) { + LinkedHashMap dvMetas = + indexManifestEntry.indexFile().deletionVectorMetas(); + return GenericRow.of( + BinaryString.fromString( + Arrays.toString( + partitionConverter.convert(indexManifestEntry.partition()))), + indexManifestEntry.bucket(), + BinaryString.fromString(indexManifestEntry.indexFile().indexType()), + BinaryString.fromString(indexManifestEntry.indexFile().fileName()), + indexManifestEntry.indexFile().fileSize(), + indexManifestEntry.indexFile().rowCount(), + dvMetas == null + ? null + : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values())); + } + } + + private static List allIndexEntries(FileStoreTable dataTable) { + IndexFileHandler indexFileHandler = dataTable.store().newIndexFileHandler(); + Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable); + if (snapshot == null) { + LOG.warn("Check if your snapshot is empty."); + return Collections.emptyList(); + } + String indexManifest = snapshot.indexManifest(); + if (indexManifest == null || !indexFileHandler.existsManifest(indexManifest)) { + LOG.warn("indexManifest doesn't exist."); + return Collections.emptyList(); + } + + return indexFileHandler.readManifest(indexManifest); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index b77b72e4129d..8e5f8c38c85a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -41,6 +41,7 @@ import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.FilesTable.FILES; +import static org.apache.paimon.table.system.IndexesTable.INDEXES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; import static org.apache.paimon.table.system.OptionsTable.OPTIONS; import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS; @@ -70,6 +71,7 @@ public class SystemTableLoader { .put(AGGREGATION_FIELDS, AggregationFieldsTable::new) .put(STATISTICS, StatisticTable::new) .put(BINLOG, BinlogTable::new) + .put(INDEXES, IndexesTable::new) .build(); public static final List SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java index 771f4acc5e58..37114edbe152 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java @@ -63,4 +63,36 @@ public void testBinlogTableBatchRead() throws Exception { Row.of("+I", new Integer[] {1}, new Integer[] {3}), Row.of("+I", new Integer[] {2}, new Integer[] {2})); } + + @Test + public void testIndexesTable() { + sql( + "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, a) NOT ENFORCED)" + + " PARTITIONED BY (pt) with ('deletion-vectors.enabled'='true')"); + sql( + "INSERT INTO T VALUES ('2024-10-01', 1, 'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')"); + sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 3, 'c_new1')"); + + List rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'HASH'"); + assertThat(rows.size()).isEqualTo(1); + Row row = rows.get(0); + assertThat(row.getField(0)).isEqualTo("[2024-10-01]"); + assertThat(row.getField(1)).isEqualTo(0); + assertThat(row.getField(2)).isEqualTo("HASH"); + assertThat(row.getField(3).toString().startsWith("index-")).isTrue(); + assertThat(row.getField(4)).isEqualTo(12L); + assertThat(row.getField(5)).isEqualTo(3L); + assertThat(row.getField(6)).isNull(); + + rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'DELETION_VECTORS'"); + assertThat(rows.size()).isEqualTo(1); + row = rows.get(0); + assertThat(row.getField(0)).isEqualTo("[2024-10-01]"); + assertThat(row.getField(1)).isEqualTo(0); + assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS"); + assertThat(row.getField(3).toString().startsWith("index-")).isTrue(); + assertThat(row.getField(4)).isEqualTo(33L); + assertThat(row.getField(5)).isEqualTo(1L); + assertThat(row.getField(6)).isNotNull(); + } } From b9c72c200b3bda8eb49ba020026704d37966f1ca Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 27 Dec 2024 11:15:58 +0800 Subject: [PATCH 2/2] fix --- docs/content/concepts/system-tables.md | 4 ++-- .../paimon/table/system/SystemTableLoader.java | 4 ++-- ...{IndexesTable.java => TableIndexesTable.java} | 16 ++++++++-------- .../apache/paimon/flink/SystemTableITCase.java | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) rename paimon-core/src/main/java/org/apache/paimon/table/system/{IndexesTable.java => TableIndexesTable.java} (94%) diff --git a/docs/content/concepts/system-tables.md b/docs/content/concepts/system-tables.md index 47ba39fd4163..92119874e266 100644 --- a/docs/content/concepts/system-tables.md +++ b/docs/content/concepts/system-tables.md @@ -389,13 +389,13 @@ SELECT * FROM T$statistics; */ ``` -### Indexes Table +### Table Indexes Table You can query the table's index files generated for dynamic bucket table (index_type = HASH) and deletion vectors (index_type = DELETION_VECTORS) through indexes table. ```sql -SELECT * FROM my_table$indexes; +SELECT * FROM my_table$table_indexes; /* +--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+ diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index 8e5f8c38c85a..57c3c2caacac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -41,7 +41,6 @@ import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.FilesTable.FILES; -import static org.apache.paimon.table.system.IndexesTable.INDEXES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; import static org.apache.paimon.table.system.OptionsTable.OPTIONS; import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS; @@ -49,6 +48,7 @@ import static org.apache.paimon.table.system.SchemasTable.SCHEMAS; import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS; import static org.apache.paimon.table.system.StatisticTable.STATISTICS; +import static org.apache.paimon.table.system.TableIndexesTable.TABLE_INDEXES; import static org.apache.paimon.table.system.TagsTable.TAGS; /** Loader to load system {@link Table}s. */ @@ -71,7 +71,7 @@ public class SystemTableLoader { .put(AGGREGATION_FIELDS, AggregationFieldsTable::new) .put(STATISTICS, StatisticTable::new) .put(BINLOG, BinlogTable::new) - .put(INDEXES, IndexesTable::new) + .put(TABLE_INDEXES, TableIndexesTable::new) .build(); public static final List SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java similarity index 94% rename from paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java rename to paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index e890538580b6..08731e768a93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/IndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -64,12 +64,12 @@ import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; import static org.apache.paimon.utils.SerializationUtils.newStringType; -/** A {@link Table} for showing committing snapshots of table. */ -public class IndexesTable implements ReadonlyTable { +/** A {@link Table} for showing indexes. */ +public class TableIndexesTable implements ReadonlyTable { - private static final Logger LOG = LoggerFactory.getLogger(IndexesTable.class); + private static final Logger LOG = LoggerFactory.getLogger(TableIndexesTable.class); - public static final String INDEXES = "indexes"; + public static final String TABLE_INDEXES = "table_indexes"; public static final RowType TABLE_TYPE = new RowType( @@ -87,7 +87,7 @@ public class IndexesTable implements ReadonlyTable { private final FileStoreTable dataTable; - public IndexesTable(FileStoreTable dataTable) { + public TableIndexesTable(FileStoreTable dataTable) { this.dataTable = dataTable; } @@ -103,7 +103,7 @@ public InnerTableRead newRead() { @Override public String name() { - return dataTable.name() + SYSTEM_TABLE_SPLITTER + INDEXES; + return dataTable.name() + SYSTEM_TABLE_SPLITTER + TABLE_INDEXES; } @Override @@ -118,7 +118,7 @@ public List primaryKeys() { @Override public Table copy(Map dynamicOptions) { - return new IndexesTable(dataTable.copy(dynamicOptions)); + return new TableIndexesTable(dataTable.copy(dynamicOptions)); } private static class IndexesScan extends ReadOnceTableScan { @@ -194,7 +194,7 @@ public RecordReader createReader(Split split) { Iterators.transform( rows, row -> - ProjectedRow.from(readType, IndexesTable.TABLE_TYPE) + ProjectedRow.from(readType, TableIndexesTable.TABLE_TYPE) .replaceRow(row)); } return new IteratorRecordReader<>(rows); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java index 37114edbe152..98ec635e85f7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java @@ -73,7 +73,7 @@ public void testIndexesTable() { "INSERT INTO T VALUES ('2024-10-01', 1, 'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')"); sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 3, 'c_new1')"); - List rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'HASH'"); + List rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'HASH'"); assertThat(rows.size()).isEqualTo(1); Row row = rows.get(0); assertThat(row.getField(0)).isEqualTo("[2024-10-01]"); @@ -84,7 +84,7 @@ public void testIndexesTable() { assertThat(row.getField(5)).isEqualTo(3L); assertThat(row.getField(6)).isNull(); - rows = sql("SELECT * FROM `T$indexes` WHERE index_type = 'DELETION_VECTORS'"); + rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'DELETION_VECTORS'"); assertThat(rows.size()).isEqualTo(1); row = rows.get(0); assertThat(row.getField(0)).isEqualTo("[2024-10-01]");