Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
Expand Down Expand Up @@ -65,7 +66,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {

protected final FileIO fileIO;
protected final SchemaManager schemaManager;
protected final long schemaId;
protected final TableSchema schema;
protected final CoreOptions options;
protected final RowType partitionType;
private final CatalogEnvironment catalogEnvironment;
Expand All @@ -75,13 +76,13 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
public AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
CoreOptions options,
RowType partitionType,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.schema = schema;
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
Expand All @@ -50,14 +51,14 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
public AppendOnlyFileStore(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
RowType rowType,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
Expand All @@ -82,7 +83,7 @@ public AppendOnlyFileStoreRead newRead() {
return new AppendOnlyFileStoreRead(
fileIO,
schemaManager,
schemaId,
schema,
rowType,
FileFormatDiscover.of(options),
pathFactory());
Expand All @@ -99,7 +100,7 @@ public AppendOnlyFileStoreWrite newWrite(
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
schemaId,
schema.id(),
commitUser,
rowType,
pathFactory(),
Expand Down Expand Up @@ -138,7 +139,7 @@ public void pushdown(Predicate predicate) {
bucketFilter,
snapshotManager(),
schemaManager,
schemaId,
schema,
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
options.bucket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
public KeyValueFileStore(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
boolean crossPartitionUpdate,
CoreOptions options,
RowType partitionType,
Expand All @@ -86,7 +87,7 @@ public KeyValueFileStore(
MergeFunctionFactory<KeyValue> mfFactory,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
Expand Down Expand Up @@ -121,8 +122,7 @@ public KeyValueFileStoreScan newScan(String branchName) {
public KeyValueFileStoreRead newRead() {
return new KeyValueFileStoreRead(
options,
schemaManager,
schemaId,
schema,
keyType,
valueType,
newKeyComparator(),
Expand All @@ -134,7 +134,7 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
return KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
schemaId,
schema,
keyType,
valueType,
FileFormatDiscover.of(options),
Expand Down Expand Up @@ -162,7 +162,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
schemaId,
schema,
commitUser,
keyType,
valueType,
Expand Down Expand Up @@ -221,7 +221,7 @@ public void pushdown(Predicate keyFilter) {
bucketFilter,
snapshotManager(),
schemaManager,
schemaId,
schema,
keyValueFieldsExtractor,
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BulkFormatMapping;
Expand All @@ -52,7 +53,7 @@ public class KeyValueFileReaderFactory {

private final FileIO fileIO;
private final SchemaManager schemaManager;
private final long schemaId;
private final TableSchema schema;
private final RowType keyType;
private final RowType valueType;

Expand All @@ -67,7 +68,7 @@ public class KeyValueFileReaderFactory {
private KeyValueFileReaderFactory(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
RowType keyType,
RowType valueType,
BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder,
Expand All @@ -77,7 +78,7 @@ private KeyValueFileReaderFactory(
DeletionVector.Factory dvFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.schema = schema;
this.keyType = keyType;
this.valueType = valueType;
this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
Expand Down Expand Up @@ -110,8 +111,8 @@ private RecordReader<KeyValue> createRecordReader(
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
schemaManager.schema(this.schemaId),
schemaManager.schema(schemaId));
schema,
schemaId == schema.id() ? schema : schemaManager.schema(schemaId));

BulkFormatMapping bulkFormatMapping =
reuseFormat
Expand Down Expand Up @@ -141,7 +142,7 @@ private RecordReader<KeyValue> createRecordReader(
public static Builder builder(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
Expand All @@ -151,7 +152,7 @@ public static Builder builder(
return new Builder(
fileIO,
schemaManager,
schemaId,
schema,
keyType,
valueType,
formatDiscover,
Expand All @@ -165,7 +166,7 @@ public static class Builder {

private final FileIO fileIO;
private final SchemaManager schemaManager;
private final long schemaId;
private final TableSchema schema;
private final RowType keyType;
private final RowType valueType;
private final FileFormatDiscover formatDiscover;
Expand All @@ -182,7 +183,7 @@ public static class Builder {
private Builder(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
Expand All @@ -191,7 +192,7 @@ private Builder(
CoreOptions options) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.schema = schema;
this.keyType = keyType;
this.valueType = valueType;
this.formatDiscover = formatDiscover;
Expand All @@ -209,7 +210,7 @@ public Builder copyWithoutProjection() {
return new Builder(
fileIO,
schemaManager,
schemaId,
schema,
keyType,
valueType,
formatDiscover,
Expand Down Expand Up @@ -255,7 +256,7 @@ public KeyValueFileReaderFactory build(
return new KeyValueFileReaderFactory(
fileIO,
schemaManager,
schemaId,
schema,
projectedKeyType,
projectedValueType,
BulkFormatMapping.newBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
private final String branchName;

Expand All @@ -91,6 +92,7 @@ public AbstractFileStoreScan(
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
Expand All @@ -101,6 +103,7 @@ public AbstractFileStoreScan(
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
this.schema = schema;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
Expand Down Expand Up @@ -407,7 +410,8 @@ private List<ManifestFileMeta> readManifests(Snapshot snapshot) {

/** Note: Keep this thread-safe. */
protected TableSchema scanTableSchema(long id) {
return tableSchemas.computeIfAbsent(id, key -> schemaManager.schema(id));
return tableSchemas.computeIfAbsent(
id, key -> key == schema.id() ? schema : schemaManager.schema(id));
}

/** Note: Keep this thread-safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {

private final FileIO fileIO;
private final SchemaManager schemaManager;
private final long schemaId;
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
Expand All @@ -74,13 +74,13 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
public AppendOnlyFileStoreRead(
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
RowType rowType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
Expand Down Expand Up @@ -113,8 +113,11 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
bulkFormatMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
key -> {
TableSchema tableSchema = schemaManager.schema(this.schemaId);
TableSchema dataSchema = schemaManager.schema(key.schemaId);
TableSchema tableSchema = schema;
TableSchema dataSchema =
key.schemaId == schema.id()
? schema
: schemaManager.schema(key.schemaId);

// projection to data schema
int[][] dataProjection =
Expand All @@ -131,7 +134,7 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
dataSchema.fields());

List<Predicate> dataFilters =
this.schemaId == key.schemaId
this.schema.id() == key.schemaId
? filters
: SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.stats.FieldStatsConverters;
Expand All @@ -44,7 +45,7 @@ public AppendOnlyFileStoreScan(
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
Expand All @@ -56,14 +57,15 @@ public AppendOnlyFileStoreScan(
bucketFilter,
snapshotManager,
schemaManager,
schema,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId);
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
}

public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
Expand Down Expand Up @@ -89,14 +88,13 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {

public KeyValueFileStoreRead(
CoreOptions options,
SchemaManager schemaManager,
long schemaId,
TableSchema schema,
RowType keyType,
RowType valueType,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
this.tableSchema = schemaManager.schema(schemaId);
this.tableSchema = schema;
this.readerFactoryBuilder = readerFactoryBuilder;
this.fileIO = readerFactoryBuilder.fileIO();
this.keyComparator = keyComparator;
Expand Down
Loading