Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run ",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -29,7 +30,7 @@ public class IcebergCatalog extends InMemoryCatalog {
// TODO(ahmedabu98): extend this to the IO implementation so
// other SDKs can make use of it too
private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
private final Map<String, IcebergMetastore> metaStores = new HashMap<>();
@VisibleForTesting final IcebergCatalogConfig catalogConfig;

public IcebergCatalog(String name, Map<String, String> properties) {
Expand All @@ -52,12 +53,12 @@ public IcebergCatalog(String name, Map<String, String> properties) {
.setCatalogProperties(catalogProps.build())
.setConfigProperties(hadoopProps.build())
.build();
metaStore.registerProvider(new IcebergTableProvider(catalogConfig));
}

@Override
public InMemoryMetaStore metaStore() {
return metaStore;
public IcebergMetastore metaStore(String db) {
metaStores.putIfAbsent(db, new IcebergMetastore(db, catalogConfig));
return metaStores.get(db);
}

@Override
Expand All @@ -70,17 +71,24 @@ public boolean createDatabase(String database) {
return catalogConfig.createNamespace(database);
}

@Override
public void useDatabase(String database) {
checkArgument(databaseExists(database), "Database '%s' does not exist.");
currentDatabase = database;
}

@Override
public boolean databaseExists(String db) {
return catalogConfig.namespaceExists(db);
}

@Override
public boolean dropDatabase(String database, boolean cascade) {
boolean removed = catalogConfig.dropNamespace(database, cascade);
metaStores.remove(database);
if (database.equals(currentDatabase)) {
currentDatabase = null;
}
return removed;
}

@Override
public Set<String> listDatabases() {
return catalogConfig.listNamespaces();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.TableUtils;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig.IcebergTableInfo;
import org.apache.beam.sdk.io.iceberg.TableAlreadyExistsException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMetastore extends InMemoryMetaStore {
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetastore.class);
@VisibleForTesting final IcebergCatalogConfig catalogConfig;
private final Map<String, Table> cachedTables = new HashMap<>();
private final String database;

public IcebergMetastore(String db, IcebergCatalogConfig catalogConfig) {
this.database = db;
this.catalogConfig = catalogConfig;
}

@Override
public String getTableType() {
return "iceberg";
}

@Override
public void createTable(Table table) {
if (!table.getType().equals("iceberg")) {
getProvider(table.getType()).createTable(table);
} else {
String identifier = getIdentifier(table);
try {
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields());
} catch (TableAlreadyExistsException e) {
LOG.info(
"Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier);
}
}
cachedTables.put(table.getName(), table);
}

@Override
public void dropTable(String tableName) {
String identifier = getIdentifier(tableName);
if (catalogConfig.dropTable(identifier)) {
LOG.info("Dropped table '{}' (path: '{}').", tableName, identifier);
} else {
LOG.info(
"Ignoring DROP TABLE call for '{}' (path: '{}') because it does not exist.",
tableName,
identifier);
}
cachedTables.remove(tableName);
}

@Override
public Map<String, Table> getTables() {
for (String id : catalogConfig.listTables(database)) {
String name = TableName.create(id).getTableName();
@Nullable Table cachedTable = cachedTables.get(name);
if (cachedTable == null) {
Table table = checkStateNotNull(loadTable(id));
cachedTables.put(name, table);
}
}
return ImmutableMap.copyOf(cachedTables);
}

@Override
public @Nullable Table getTable(String name) {
if (cachedTables.containsKey(name)) {
return cachedTables.get(name);
}
@Nullable Table table = loadTable(getIdentifier(name));
if (table != null) {
cachedTables.put(name, table);
}
return table;
}

private String getIdentifier(String name) {
return database + "." + name;
}

private String getIdentifier(Table table) {
checkArgument(
table.getLocation() == null, "Cannot create Iceberg tables using LOCATION property.");
return getIdentifier(table.getName());
}

private @Nullable Table loadTable(String identifier) {
@Nullable IcebergTableInfo tableInfo = catalogConfig.loadTable(identifier);
if (tableInfo == null) {
return null;
}
return Table.builder()
.type(getTableType())
.name(identifier)
.schema(tableInfo.getSchema())
.properties(TableUtils.parseProperties(tableInfo.getProperties()))
.build();
}

@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
if (table.getType().equals("iceberg")) {
return new IcebergTable(getIdentifier(table), table, catalogConfig);
}
return getProvider(table.getType()).buildBeamSqlTable(table);
}

@Override
public boolean supportsPartitioning(Table table) {
if (table.getType().equals("iceberg")) {
return true;
}
return getProvider(table.getType()).supportsPartitioning(table);
}

@Override
public void registerProvider(TableProvider provider) {
super.registerProvider(provider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -66,10 +65,10 @@ class IcebergTable extends SchemaBaseBeamTable {
@VisibleForTesting @Nullable Integer triggeringFrequency;
@VisibleForTesting final @Nullable List<String> partitionFields;

IcebergTable(Table table, IcebergCatalogConfig catalogConfig) {
IcebergTable(String tableIdentifier, Table table, IcebergCatalogConfig catalogConfig) {
super(table.getSchema());
this.schema = table.getSchema();
this.tableIdentifier = checkArgumentNotNull(table.getLocation());
this.tableIdentifier = tableIdentifier;
this.catalogConfig = catalogConfig;
ObjectNode properties = table.getProperties();
if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {
Expand Down

This file was deleted.

Loading
Loading