Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/content/concepts/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,25 @@ SELECT * FROM T$statistics;
*/
```

### Table Indexes Table

You can query the table's index files generated for dynamic bucket table (index_type = HASH) and deletion vectors
(index_type = DELETION_VECTORS) through indexes table.

```sql
SELECT * FROM my_table$table_indexes;

/*
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| partition | bucket | index_type | file_name | file_size | row_count | dv_ranges |
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| [2024-10-01] | 0 | HASH | index-70abfebf-149e-4796-9f... | 12 | 3 | <NULL> |
| [2024-10-01] | 0 | DELETION_VECTORS | index-633857e7-cdce-47d2-87... | 33 | 1 | [(data-346cb9c8-4032-4d66-a... |
+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
2 rows in set
*/
```

## Global System Table

Global system tables contain the statistical information of all the tables exists in paimon. For convenient of searching, we create a reference system database called `sys`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
import static org.apache.paimon.table.system.TableIndexesTable.TABLE_INDEXES;
import static org.apache.paimon.table.system.TagsTable.TAGS;

/** Loader to load system {@link Table}s. */
Expand All @@ -70,6 +71,7 @@ public class SystemTableLoader {
.put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
.put(TABLE_INDEXES, TableIndexesTable::new)
.build();

public static final List<String> SYSTEM_TABLES = new ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.SerializationUtils.newStringType;

/** A {@link Table} for showing indexes. */
public class TableIndexesTable implements ReadonlyTable {

private static final Logger LOG = LoggerFactory.getLogger(TableIndexesTable.class);

public static final String TABLE_INDEXES = "table_indexes";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "partition", SerializationUtils.newStringType(true)),
new DataField(1, "bucket", new IntType(false)),
new DataField(2, "index_type", newStringType(false)),
new DataField(3, "file_name", newStringType(false)),
new DataField(4, "file_size", new BigIntType(false)),
new DataField(5, "row_count", new BigIntType(false)),
new DataField(
6,
"dv_ranges",
new ArrayType(true, DeletionVectorMeta.SCHEMA))));

private final FileStoreTable dataTable;

public TableIndexesTable(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

@Override
public InnerTableScan newScan() {
return new IndexesScan();
}

@Override
public InnerTableRead newRead() {
return new IndexesRead(dataTable);
}

@Override
public String name() {
return dataTable.name() + SYSTEM_TABLE_SPLITTER + TABLE_INDEXES;
}

@Override
public RowType rowType() {
return TABLE_TYPE;
}

@Override
public List<String> primaryKeys() {
return Collections.singletonList("file_name");
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new TableIndexesTable(dataTable.copy(dynamicOptions));
}

private static class IndexesScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
return this;
}

@Override
protected Plan innerPlan() {
return () -> Collections.singletonList(new IndexesSplit());
}
}

private static class IndexesSplit extends SingletonSplit {

private static final long serialVersionUID = 1L;

private IndexesSplit() {}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return o != null && getClass() == o.getClass();
}
}

private static class IndexesRead implements InnerTableRead {

private RowType readType;

private final FileStoreTable dataTable;

public IndexesRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof IndexesSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
List<IndexManifestEntry> manifestFileMetas = allIndexEntries(dataTable);

RowDataToObjectArrayConverter partitionConverter =
new RowDataToObjectArrayConverter(dataTable.schema().logicalPartitionType());

Iterator<InternalRow> rows =
Iterators.transform(
manifestFileMetas.iterator(),
indexManifestEntry -> toRow(indexManifestEntry, partitionConverter));
if (readType != null) {
rows =
Iterators.transform(
rows,
row ->
ProjectedRow.from(readType, TableIndexesTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(
IndexManifestEntry indexManifestEntry,
RowDataToObjectArrayConverter partitionConverter) {
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexManifestEntry.indexFile().deletionVectorMetas();
return GenericRow.of(
BinaryString.fromString(
Arrays.toString(
partitionConverter.convert(indexManifestEntry.partition()))),
indexManifestEntry.bucket(),
BinaryString.fromString(indexManifestEntry.indexFile().indexType()),
BinaryString.fromString(indexManifestEntry.indexFile().fileName()),
indexManifestEntry.indexFile().fileSize(),
indexManifestEntry.indexFile().rowCount(),
dvMetas == null
? null
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
}
}

private static List<IndexManifestEntry> allIndexEntries(FileStoreTable dataTable) {
IndexFileHandler indexFileHandler = dataTable.store().newIndexFileHandler();
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
}
String indexManifest = snapshot.indexManifest();
if (indexManifest == null || !indexFileHandler.existsManifest(indexManifest)) {
LOG.warn("indexManifest doesn't exist.");
return Collections.emptyList();
}

return indexFileHandler.readManifest(indexManifest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,36 @@ public void testBinlogTableBatchRead() throws Exception {
Row.of("+I", new Integer[] {1}, new Integer[] {3}),
Row.of("+I", new Integer[] {2}, new Integer[] {2}));
}

@Test
public void testIndexesTable() {
sql(
"CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, a) NOT ENFORCED)"
+ " PARTITIONED BY (pt) with ('deletion-vectors.enabled'='true')");
sql(
"INSERT INTO T VALUES ('2024-10-01', 1, 'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 3, 'c_new1')");

List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'HASH'");
assertThat(rows.size()).isEqualTo(1);
Row row = rows.get(0);
assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
assertThat(row.getField(1)).isEqualTo(0);
assertThat(row.getField(2)).isEqualTo("HASH");
assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
assertThat(row.getField(4)).isEqualTo(12L);
assertThat(row.getField(5)).isEqualTo(3L);
assertThat(row.getField(6)).isNull();

rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'DELETION_VECTORS'");
assertThat(rows.size()).isEqualTo(1);
row = rows.get(0);
assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
assertThat(row.getField(1)).isEqualTo(0);
assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
assertThat(row.getField(4)).isEqualTo(33L);
assertThat(row.getField(5)).isEqualTo(1L);
assertThat(row.getField(6)).isNotNull();
}
}
Loading