diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 34a6e02150e7..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 4 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index e0266d62f2e0..f1ba03a243ee 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 5 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index e3d6056a5de9..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/CHANGES.md b/CHANGES.md index f913302cb9df..a7f96fc3fdc1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,10 +75,12 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799)) * [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856)) * [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827)) ## New Features / Improvements +* [Beam SQL] Introducing Beam Catalogs ([#35223](https://github.com/apache/beam/pull/35223)) * Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)). diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index 6f34891c2d3f..a73bd2518fe0 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -74,6 +74,10 @@ dependencies { fmppTask "org.freemarker:freemarker:2.3.31" fmppTemplates library.java.vendored_calcite_1_28_0 implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:managed") + implementation project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + runtimeOnly project(":sdks:java:io:iceberg:hive") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:join-library") permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761 diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java index 7265c3103c16..c1d96eea7bae 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java @@ -99,16 +99,16 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { if (i > 0) { writer.keyword(","); } - properties.get(i).unparse(writer, leftPrec, rightPrec); - } - - for (int i = 0; i < properties.size(); i += 2) { - if (i > 0) { - writer.keyword(","); - } - properties.get(i).unparse(writer, leftPrec, rightPrec); // key + SqlNode property = properties.get(i); + checkState( + property instanceof SqlNodeList, + String.format( + "Unexpected properties entry '%s' of class '%s'", property, property.getClass())); + SqlNodeList kv = ((SqlNodeList) property); + + kv.get(0).unparse(writer, leftPrec, rightPrec); // key writer.keyword("="); - properties.get(i + 1).unparse(writer, leftPrec, rightPrec); // value + kv.get(1).unparse(writer, leftPrec, rightPrec); // value } writer.keyword(")"); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java index 08ca3488c1cc..2a99209e06f5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; /** @@ -25,6 +26,7 @@ * configuration properties. Uses an underlying {@link MetaStore} to manage tables and table * providers. */ +@Internal public interface Catalog { /** A type that defines this catalog. */ String type(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java index 2ac5266219bc..4654f0dd1b0d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.checkerframework.checker.nullness.qual.Nullable; @@ -32,6 +33,7 @@ *

When {@link #registerTableProvider(String, TableProvider)} is called, the provider should * become available for all catalogs. */ +@Internal public interface CatalogManager { /** Creates and stores a catalog of a particular type. */ void createCatalog(String name, String type, Map properties); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java index 3fad539e8cc4..08fe6179b1f1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.extensions.sql.meta.catalog; +import org.apache.beam.sdk.annotations.Internal; + /** * Over-arching registrar to capture available {@link Catalog}s. Implementations should be marked * with {@link com.google.auto.service.AutoService} to be available to {@link * java.util.ServiceLoader}s. */ +@Internal public interface CatalogRegistrar { Iterable> getCatalogs(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 4ff9811605a1..8757c2357861 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -17,17 +17,15 @@ */ package org.apache.beam.sdk.extensions.sql.meta.catalog; -import com.google.auto.service.AutoService; import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; import org.apache.beam.sdk.util.Preconditions; -@AutoService(Catalog.class) public class InMemoryCatalog implements Catalog { private final String name; private final Map properties; - private final InMemoryMetaStore metaStore = new InMemoryMetaStore(); + protected final InMemoryMetaStore metaStore = new InMemoryMetaStore(); public InMemoryCatalog(String name, Map properties) { this.name = name; @@ -41,7 +39,8 @@ public String type() { @Override public String name() { - return Preconditions.checkStateNotNull(name, "InMemoryCatalog has not been initialized"); + return Preconditions.checkStateNotNull( + name, getClass().getSimpleName() + " has not been initialized"); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java index df53818823ca..2d94e19c1689 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java @@ -18,12 +18,16 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @AutoService(CatalogRegistrar.class) public class InMemoryCatalogRegistrar implements CatalogRegistrar { @Override public Iterable> getCatalogs() { - return ImmutableList.of(InMemoryCatalog.class); + return ImmutableList.>builder() + .add(InMemoryCatalog.class) + .add(IcebergCatalog.class) + .build(); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java new file mode 100644 index 000000000000..85fd6f4efc17 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog; + +public class IcebergCatalog extends InMemoryCatalog { + public IcebergCatalog(String name, Map properties) { + super(name, properties); + metaStore.registerProvider(new IcebergTableProvider(name, properties)); + } + + @Override + public String type() { + return "iceberg"; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java new file mode 100644 index 000000000000..b3854ced46c6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static org.apache.beam.sdk.io.iceberg.FilterUtils.SUPPORTED_OPS; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.AND; +import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.OR; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class IcebergFilter implements BeamSqlTableFilter { + private @Nullable List supported; + private @Nullable List unsupported; + private final List predicateCNF; + + public IcebergFilter(List predicateCNF) { + this.predicateCNF = predicateCNF; + } + + private void maybeInitialize() { + if (supported != null && unsupported != null) { + return; + } + ImmutableList.Builder supportedBuilder = ImmutableList.builder(); + ImmutableList.Builder unsupportedBuilder = ImmutableList.builder(); + for (RexNode node : predicateCNF) { + if (!node.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) { + throw new IllegalArgumentException( + "Predicate node '" + + node.getClass().getSimpleName() + + "' should be a boolean expression, but was: " + + node.getType().getSqlTypeName()); + } + + if (isSupported(node).getLeft()) { + supportedBuilder.add(node); + } else { + unsupportedBuilder.add(node); + } + } + supported = supportedBuilder.build(); + unsupported = unsupportedBuilder.build(); + } + + @Override + public List getNotSupported() { + maybeInitialize(); + return checkStateNotNull(unsupported); + } + + @Override + public int numSupported() { + maybeInitialize(); + return BeamSqlTableFilter.expressionsInFilter(checkStateNotNull(supported)); + } + + public List getSupported() { + maybeInitialize(); + return checkStateNotNull(supported); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(IcebergFilter.class) + .add( + "supported", + checkStateNotNull(supported).stream() + .map(RexNode::toString) + .collect(Collectors.joining())) + .add( + "unsupported", + checkStateNotNull(unsupported).stream() + .map(RexNode::toString) + .collect(Collectors.joining())) + .toString(); + } + + /** + * Check whether a {@code RexNode} is supported. As of right now Iceberg supports: 1. Complex + * predicates (both conjunction and disjunction). 2. Comparison between a column and a literal. + * + * @param node A node to check for predicate push-down support. + * @return A pair containing a boolean whether an expression is supported and the number of input + * references used by the expression. + */ + private Pair isSupported(RexNode node) { + int numberOfInputRefs = 0; + boolean isSupported = true; + + if (node instanceof RexCall) { + RexCall compositeNode = (RexCall) node; + if (!SUPPORTED_OPS.contains(node.getKind())) { + isSupported = false; + } else { + for (RexNode operand : compositeNode.getOperands()) { + // All operands must be supported for a parent node to be supported. + Pair childSupported = isSupported(operand); + if (!node.getKind().belongsTo(ImmutableSet.of(AND, OR))) { + numberOfInputRefs += childSupported.getRight(); + } + // Predicate functions with multiple columns are unsupported. + isSupported = numberOfInputRefs < 2 && childSupported.getLeft(); + } + } + } else if (node instanceof RexInputRef) { + numberOfInputRefs = 1; + } else if (node instanceof RexLiteral) { + // RexLiterals are expected, but no action is needed. + } else { + throw new UnsupportedOperationException( + "Encountered an unexpected node type: " + node.getClass().getSimpleName()); + } + + return Pair.of(isSupported, numberOfInputRefs); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java new file mode 100644 index 000000000000..9a87edff2a21 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.IntFunction; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter; +import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter; +import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport; +import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.rel2sql.SqlImplementor; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.dialect.BigQuerySqlDialect; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class IcebergTable extends SchemaBaseBeamTable { + private static final Logger LOG = LoggerFactory.getLogger(IcebergTable.class); + @VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties"; + @VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties"; + @VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name"; + + @VisibleForTesting + static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds"; + + @VisibleForTesting final String tableIdentifier; + @VisibleForTesting final IcebergCatalogConfig catalogConfig; + @VisibleForTesting @Nullable Integer triggeringFrequency; + + IcebergTable(Table table, IcebergCatalogConfig catalogConfig) { + super(table.getSchema()); + this.schema = table.getSchema(); + this.tableIdentifier = checkArgumentNotNull(table.getLocation()); + this.catalogConfig = catalogConfig; + ObjectNode properties = table.getProperties(); + if (properties.has(TRIGGERING_FREQUENCY_FIELD)) { + this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt(); + } + } + + @Override + public POutput buildIOWriter(PCollection input) { + ImmutableMap.Builder configBuilder = ImmutableMap.builder(); + configBuilder.putAll(getBaseConfig()); + if (triggeringFrequency != null) { + configBuilder.put(TRIGGERING_FREQUENCY_FIELD, triggeringFrequency); + } + return input.apply(Managed.write(Managed.ICEBERG).withConfig(configBuilder.build())); + } + + @Override + public PCollection buildIOReader(PBegin begin) { + return begin + .apply(Managed.read(Managed.ICEBERG).withConfig(getBaseConfig())) + .getSinglePCollection(); + } + + @Override + public PCollection buildIOReader( + PBegin begin, BeamSqlTableFilter filter, List fieldNames) { + + Map readConfig = new HashMap<>(getBaseConfig()); + + if (!(filter instanceof DefaultTableFilter)) { + IcebergFilter icebergFilter = (IcebergFilter) filter; + if (!icebergFilter.getSupported().isEmpty()) { + String expression = generateFilterExpression(getSchema(), icebergFilter.getSupported()); + if (!expression.isEmpty()) { + LOG.info("Pushing down the following filter: {}", expression); + readConfig.put("filter", expression); + } + } + } + + if (!fieldNames.isEmpty()) { + readConfig.put("keep", fieldNames); + } + + return begin + .apply("Read Iceberg with push-down", Managed.read(Managed.ICEBERG).withConfig(readConfig)) + .getSinglePCollection(); + } + + @Override + public ProjectSupport supportsProjects() { + return ProjectSupport.WITHOUT_FIELD_REORDERING; + } + + @Override + public BeamSqlTableFilter constructFilter(List filter) { + return new IcebergFilter(filter); + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + private Map getBaseConfig() { + ImmutableMap.Builder managedConfigBuilder = ImmutableMap.builder(); + managedConfigBuilder.put("table", tableIdentifier); + @Nullable String name = catalogConfig.getCatalogName(); + @Nullable Map catalogProps = catalogConfig.getCatalogProperties(); + @Nullable Map hadoopConfProps = catalogConfig.getConfigProperties(); + if (name != null) { + managedConfigBuilder.put(CATALOG_NAME_FIELD, name); + } + if (catalogProps != null) { + managedConfigBuilder.put(CATALOG_PROPERTIES_FIELD, catalogProps); + } + if (hadoopConfProps != null) { + managedConfigBuilder.put(HADOOP_CONFIG_PROPERTIES_FIELD, hadoopConfProps); + } + return managedConfigBuilder.build(); + } + + private String generateFilterExpression(Schema schema, List supported) { + final IntFunction field = + i -> new SqlIdentifier(schema.getField(i).getName(), SqlParserPos.ZERO); + + SqlImplementor.Context context = new BeamSqlUnparseContext(field); + + // Create a single SqlNode from a list of RexNodes + SqlNode andSqlNode = null; + for (RexNode node : supported) { + SqlNode sqlNode = context.toSql(null, node); + if (andSqlNode == null) { + andSqlNode = sqlNode; + continue; + } + // AND operator must have exactly 2 operands. + andSqlNode = + SqlStdOperatorTable.AND.createCall( + SqlParserPos.ZERO, ImmutableList.of(andSqlNode, sqlNode)); + } + return checkStateNotNull(andSqlNode).toSqlString(BigQuerySqlDialect.DEFAULT).getSql(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java new file mode 100644 index 000000000000..38acc65d6d76 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class IcebergTableProvider extends InMemoryMetaTableProvider { + private static final String BEAM_HADOOP_PREFIX = "beam.catalog.%s.hadoop"; + @VisibleForTesting final IcebergCatalogConfig catalogConfig; + + public IcebergTableProvider(String name, Map properties) { + ImmutableMap.Builder catalogProps = ImmutableMap.builder(); + ImmutableMap.Builder hadoopProps = ImmutableMap.builder(); + String hadoopPrefix = String.format(BEAM_HADOOP_PREFIX, name); + + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().startsWith(hadoopPrefix)) { + hadoopProps.put(entry.getKey(), entry.getValue()); + } else { + catalogProps.put(entry.getKey(), entry.getValue()); + } + } + + catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName(name) + .setCatalogProperties(catalogProps.build()) + .setConfigProperties(hadoopProps.build()) + .build(); + } + + @Override + public String getTableType() { + return "iceberg"; + } + + @Override + public BeamSqlTable buildBeamSqlTable(Table table) { + return new IcebergTable(table, catalogConfig); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java new file mode 100644 index 000000000000..01a34920f585 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Table schema for Iceberg. */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java index 8a892cc2eb73..a571a5d2dbb7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java @@ -113,4 +113,8 @@ private void initTablesFromProvider(TableProvider provider) { Map getProviders() { return providers; } + + public boolean hasProvider(TableProvider provider) { + return providers.containsKey(provider.getTableType()); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java new file mode 100644 index 000000000000..59f3b7650c7c --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests stream writing to Iceberg with Beam SQL. */ +@RunWith(JUnit4.class) +public class PubsubToIcebergIT implements Serializable { + private static final Schema SOURCE_SCHEMA = + Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build(); + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient TestPubsub pubsub = TestPubsub.create(); + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("PubsubToIcebergIT"); + static final String DATASET = "sql_pubsub_to_iceberg_it_" + System.nanoTime(); + static String warehouse; + protected static final GcpOptions OPTIONS = + TestPipeline.testingPipelineOptions().as(GcpOptions.class); + @Rule public TestName testName = new TestName(); + + @BeforeClass + public static void createDataset() throws IOException, InterruptedException { + warehouse = + String.format( + "%s%s/%s", + TestPipeline.testingPipelineOptions().getTempLocation(), + PubsubToIcebergIT.class.getSimpleName(), + UUID.randomUUID()); + BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET); + createCatalogDdl = + "CREATE CATALOG my_catalog \n" + + "TYPE iceberg \n" + + "PROPERTIES (\n" + + " 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n" + + " 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n" + + format(" 'warehouse' = '%s', \n", warehouse) + + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) + + " 'gcp_region' = 'us-central1')"; + setCatalogDdl = "SET CATALOG my_catalog"; + } + + private String tableIdentifier; + private static String createCatalogDdl; + private static String setCatalogDdl; + + @Before + public void setup() { + tableIdentifier = DATASET + "." + testName.getMethodName(); + } + + @AfterClass + public static void deleteDataset() { + BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET); + } + + @Test + public void testSimpleInsert() throws Exception { + String pubsubTableString = + "CREATE EXTERNAL TABLE pubsub_topic (\n" + + "event_timestamp TIMESTAMP, \n" + + "attributes MAP, \n" + + "payload ROW< \n" + + " id BIGINT, \n" + + " name VARCHAR \n" + + " > \n" + + ") \n" + + "TYPE 'pubsub' \n" + + "LOCATION '" + + pubsub.topicPath() + + "' \n" + + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'"; + String icebergTableString = + "CREATE EXTERNAL TABLE iceberg_table( \n" + + " id BIGINT, \n" + + " name VARCHAR \n " + + ") \n" + + "TYPE 'iceberg' \n" + + "LOCATION '" + + tableIdentifier + + "' \n" + + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + String insertStatement = + "INSERT INTO iceberg_table \n" + + "SELECT \n" + + " pubsub_topic.payload.id, \n" + + " pubsub_topic.payload.name \n" + + "FROM pubsub_topic"; + pipeline.apply( + SqlTransform.query(insertStatement) + .withDdlString(createCatalogDdl) + .withDdlString(setCatalogDdl) + .withDdlString(pubsubTableString) + .withDdlString(icebergTableString)); + pipeline.run(); + + // Block until a subscription for this topic exists + pubsub.assertSubscriptionEventuallyCreated( + pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5)); + + List messages = + ImmutableList.of( + message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3), 7, "baz")); + pubsub.publish(messages); + + validateRowsWritten(); + } + + @Test + public void testSimpleInsertFlat() throws Exception { + String pubsubTableString = + "CREATE EXTERNAL TABLE pubsub_topic (\n" + + "event_timestamp TIMESTAMP, \n" + + "id BIGINT, \n" + + "name VARCHAR \n" + + ") \n" + + "TYPE 'pubsub' \n" + + "LOCATION '" + + pubsub.topicPath() + + "' \n" + + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'"; + String bqTableString = + "CREATE EXTERNAL TABLE iceberg_table( \n" + + " id BIGINT, \n" + + " name VARCHAR \n " + + ") \n" + + "TYPE 'iceberg' \n" + + "LOCATION '" + + tableIdentifier + + "' \n" + + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + String insertStatement = + "INSERT INTO iceberg_table \n" + + "SELECT \n" + + " id, \n" + + " name \n" + + "FROM pubsub_topic"; + + pipeline.apply( + SqlTransform.query(insertStatement) + .withDdlString(createCatalogDdl) + .withDdlString(setCatalogDdl) + .withDdlString(pubsubTableString) + .withDdlString(bqTableString)); + pipeline.run(); + + // Block until a subscription for this topic exists + pubsub.assertSubscriptionEventuallyCreated( + pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5)); + + List messages = + ImmutableList.of( + message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3), 7, "baz")); + pubsub.publish(messages); + + validateRowsWritten(); + } + + private void validateRowsWritten() throws IOException, InterruptedException { + String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableIdentifier); + List expectedRows = + ImmutableList.of( + row(SOURCE_SCHEMA, 3L, "foo"), + row(SOURCE_SCHEMA, 5L, "bar"), + row(SOURCE_SCHEMA, 7L, "baz")); + + BackOff backOff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(10)) + .withMaxBackoff(Duration.standardSeconds(20)) + .withMaxCumulativeBackoff(Duration.standardMinutes(5)) + .backoff(); + Sleeper sleeper = Sleeper.DEFAULT; + do { + List returnedRows = new ArrayList<>(); + try { + returnedRows = BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), true, true); + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() != HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new RuntimeException(e); + } + } + List beamRows = + returnedRows.stream() + .map(r -> BigQueryUtils.toBeamRow(SOURCE_SCHEMA, r)) + .collect(Collectors.toList()); + if (beamRows.containsAll(expectedRows)) { + return; + } + } while (BackOffUtils.next(sleeper, backOff)); + + throw new RuntimeException("Polled for 5 minutes and could not find all rows in table."); + } + + private Row row(Schema schema, Object... values) { + return Row.withSchema(schema).addValues(values).build(); + } + + private PubsubMessage message(Instant timestamp, int id, String name) { + return message(timestamp, jsonString(id, name)); + } + + private PubsubMessage message(Instant timestamp, String jsonPayload) { + return new PubsubMessage( + jsonPayload.getBytes(UTF_8), ImmutableMap.of("ts", String.valueOf(timestamp.getMillis()))); + } + + private String jsonString(int id, String name) { + return "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }"; + } + + private Instant ts(long millis) { + return new Instant(millis); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java new file mode 100644 index 000000000000..f14344b4f1fe --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; + +import org.apache.beam.sdk.extensions.sql.TableUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link IcebergFilter}. */ +@RunWith(JUnit4.class) +public class IcebergFilterTest { + // TODO: add date, time, and datetime fields. + private static final Schema BASIC_SCHEMA = + Schema.builder() + .addInt32Field("unused1") + .addInt32Field("id") + .addStringField("name") + .addInt16Field("unused2") + .addBooleanField("b") + .build(); + + private BeamSqlEnv sqlEnv; + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Before + public void buildUp() { + TestTableProvider tableProvider = new TestTableProvider(); + Table table = getTable("TEST", PushDownOptions.NONE); + tableProvider.createTable(table); + tableProvider.addRows( + table.getName(), + row(BASIC_SCHEMA, 100, 1, "one", (short) 100, true), + row(BASIC_SCHEMA, 200, 2, "two", (short) 200, false)); + + sqlEnv = + BeamSqlEnv.builder(tableProvider) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); + } + + @Test + public void testIsSupported() { + ImmutableList> sqlQueries = + ImmutableList.of( + Pair.of("select * from TEST where unused1=100", true), + Pair.of("select * from TEST where unused1 in (100, 200)", true), + Pair.of("select * from TEST where unused1+10=110", true), + Pair.of("select * from TEST where b", true), + Pair.of( + "select * from TEST where unused1>100 and unused1<=200 and id<>1 and (name='two' or id=2)", + true), + Pair.of("select * from TEST where unused2=200", true), + // Functions involving more than one column are not supported yet. + Pair.of("select * from TEST where unused1=unused2 and id=2", false)); + + for (Pair query : sqlQueries) { + String sql = query.getLeft(); + Boolean isSupported = query.getRight(); + + BeamRelNode beamRelNode = sqlEnv.parseQuery(sql); + assertThat(beamRelNode, instanceOf(BeamCalcRel.class)); + IcebergFilter filter = + new IcebergFilter(((BeamCalcRel) beamRelNode).getProgram().split().right); + + assertThat( + "Query: '" + sql + "' is expected to be " + (isSupported ? "supported." : "unsupported."), + filter.getNotSupported().isEmpty() == isSupported); + } + } + + private static Table getTable(String name, PushDownOptions options) { + return Table.builder() + .name(name) + .comment(name + " table") + .schema(BASIC_SCHEMA) + .properties( + TableUtils.parseProperties( + "{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }")) + .type("test") + .build(); + } + + private static Row row(Schema schema, Object... objects) { + return Row.withSchema(schema).addValues(objects).build(); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java new file mode 100644 index 000000000000..fd3c18b6072a --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; +import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; +import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; +import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32; +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.apache.beam.sdk.schemas.Schema.FieldType.array; +import static org.apache.beam.sdk.schemas.Schema.FieldType.row; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; + +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for writing to Iceberg with Beam SQL. */ +@RunWith(JUnit4.class) +public class IcebergReadWriteIT { + private static final Schema NESTED_SCHEMA = + Schema.builder() + .addNullableArrayField("c_arr_struct_arr", STRING) + .addNullableInt32Field("c_arr_struct_integer") + .build(); + private static final Schema SOURCE_SCHEMA = + Schema.builder() + .addNullableField("c_bigint", INT64) + .addNullableField("c_integer", INT32) + .addNullableField("c_float", FLOAT) + .addNullableField("c_double", DOUBLE) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_timestamp", CalciteUtils.TIMESTAMP) + .addNullableField("c_varchar", STRING) + .addNullableField("c_char", STRING) + .addNullableField("c_arr", array(STRING)) + .addNullableField("c_arr_struct", array(row(NESTED_SCHEMA))) + .build(); + + @Rule public transient TestPipeline writePipeline = TestPipeline.create(); + @Rule public transient TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestName testName = new TestName(); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("IcebergReadWriteIT"); + static final String DATASET = "iceberg_sql_tests_" + System.nanoTime(); + static String warehouse; + protected static final GcpOptions OPTIONS = + TestPipeline.testingPipelineOptions().as(GcpOptions.class); + + @BeforeClass + public static void createDataset() throws IOException, InterruptedException { + warehouse = + format( + "%s%s/%s", + TestPipeline.testingPipelineOptions().getTempLocation(), + IcebergReadWriteIT.class.getSimpleName(), + UUID.randomUUID()); + BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET); + } + + @AfterClass + public static void deleteDataset() { + BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET); + } + + @Test + public void testSqlWriteAndRead() throws IOException, InterruptedException { + BeamSqlEnv sqlEnv = + BeamSqlEnv.builder(new InMemoryCatalogManager()) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); + String tableIdentifier = DATASET + "." + testName.getMethodName(); + + // 1) create Iceberg catalog + String createCatalog = + "CREATE CATALOG my_catalog \n" + + "TYPE iceberg \n" + + "PROPERTIES (\n" + + " 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n" + + " 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n" + + format(" 'warehouse' = '%s', \n", warehouse) + + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) + + " 'gcp_region' = 'us-central1')"; + sqlEnv.executeDdl(createCatalog); + + // 2) use the catalog we just created + String setCatalog = "SET CATALOG my_catalog"; + sqlEnv.executeDdl(setCatalog); + + // 3) create beam table + String createTableStatement = + "CREATE EXTERNAL TABLE TEST( \n" + + " c_bigint BIGINT, \n" + + " c_integer INTEGER, \n" + + " c_float FLOAT, \n" + + " c_double DOUBLE, \n" + + " c_boolean BOOLEAN, \n" + + " c_timestamp TIMESTAMP, \n" + + " c_varchar VARCHAR, \n " + + " c_char CHAR, \n" + + " c_arr ARRAY, \n" + + " c_arr_struct ARRAY, c_arr_struct_integer INTEGER>> \n" + + ") \n" + + "TYPE 'iceberg' \n" + + "LOCATION '" + + tableIdentifier + + "'"; + sqlEnv.executeDdl(createTableStatement); + + // 3) write to underlying Iceberg table + String insertStatement = + "INSERT INTO TEST VALUES (" + + "9223372036854775807, " + + "2147483647, " + + "1.0, " + + "1.0, " + + "TRUE, " + + "TIMESTAMP '2018-05-28 20:17:40.123', " + + "'varchar', " + + "'char', " + + "ARRAY['123', '456'], " + + "ARRAY[" + + "ROW(ARRAY['abc', 'xyz'], 123), " + + "ROW(ARRAY['foo', 'bar'], 456), " + + "ROW(ARRAY['cat', 'dog'], 789)]" + + ")"; + BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement)); + writePipeline.run().waitUntilFinish(); + + // 4) run external query on Iceberg table (hosted on BQ) to verify correct row was written + String query = format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableIdentifier); + TableRow returnedRow = + BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), true, true).get(0); + Row beamRow = BigQueryUtils.toBeamRow(SOURCE_SCHEMA, returnedRow); + Row expectedRow = + Row.withSchema(SOURCE_SCHEMA) + .addValues( + 9223372036854775807L, + 2147483647, + (float) 1.0, + 1.0, + true, + parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), + "varchar", + "char", + asList("123", "456"), + asList( + nestedRow(asList("abc", "xyz"), 123), + nestedRow(asList("foo", "bar"), 456), + nestedRow(asList("cat", "dog"), 789))) + .build(); + assertEquals(expectedRow, beamRow); + + // 5) read using Beam SQL and verify + String selectTableStatement = "SELECT * FROM TEST"; + PCollection output = + BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement)); + PAssert.that(output).containsInAnyOrder(expectedRow); + PipelineResult.State state = readPipeline.run().waitUntilFinish(); + assertThat(state, equalTo(PipelineResult.State.DONE)); + } + + @Test + public void testSQLReadWithProjectAndFilterPushDown() { + BeamSqlEnv sqlEnv = + BeamSqlEnv.builder(new InMemoryCatalogManager()) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); + String tableIdentifier = DATASET + "." + testName.getMethodName(); + + // 1) create Iceberg catalog + String createCatalog = + "CREATE CATALOG my_catalog \n" + + "TYPE iceberg \n" + + "PROPERTIES (\n" + + " 'catalog-impl' = 'org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', \n" + + " 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', \n" + + format(" 'warehouse' = '%s', \n", warehouse) + + format(" 'gcp_project' = '%s', \n", OPTIONS.getProject()) + + " 'gcp_region' = 'us-central1')"; + sqlEnv.executeDdl(createCatalog); + + // 2) use the catalog we just created + String setCatalog = "SET CATALOG my_catalog"; + sqlEnv.executeDdl(setCatalog); + + // 3) create Beam table + String createTableStatement = + "CREATE EXTERNAL TABLE TEST( \n" + + " c_integer INTEGER, \n" + + " c_float FLOAT, \n" + + " c_boolean BOOLEAN, \n" + + " c_timestamp TIMESTAMP, \n" + + " c_varchar VARCHAR \n " + + ") \n" + + "TYPE 'iceberg' \n" + + "LOCATION '" + + tableIdentifier + + "'"; + sqlEnv.executeDdl(createTableStatement); + + // 4) insert some data) + String insertStatement = + "INSERT INTO TEST VALUES " + + "(123, 1.23, TRUE, TIMESTAMP '2025-05-22 20:17:40.123', 'a'), " + + "(456, 4.56, FALSE, TIMESTAMP '2025-05-25 20:17:40.123', 'b'), " + + "(789, 7.89, TRUE, TIMESTAMP '2025-05-28 20:17:40.123', 'c')"; + BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement)); + writePipeline.run().waitUntilFinish(Duration.standardMinutes(5)); + + // 5) read with a filter + String selectTableStatement = + "SELECT c_integer, c_varchar FROM TEST where " + + "(c_boolean=TRUE and c_varchar in ('a', 'b')) or c_float > 5"; + BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement); + PCollection output = BeamSqlRelUtils.toPCollection(readPipeline, relNode); + + assertThat(relNode, instanceOf(BeamPushDownIOSourceRel.class)); + // Unused fields should not be projected by an IO + assertThat(relNode.getRowType().getFieldNames(), containsInAnyOrder("c_integer", "c_varchar")); + + assertThat( + output.getSchema(), + equalTo( + Schema.builder() + .addNullableField("c_integer", INT32) + .addNullableField("c_varchar", STRING) + .build())); + + PAssert.that(output) + .containsInAnyOrder( + Row.withSchema(output.getSchema()).addValues(123, "a").build(), + Row.withSchema(output.getSchema()).addValues(789, "c").build()); + PipelineResult.State state = readPipeline.run().waitUntilFinish(Duration.standardMinutes(5)); + assertThat(state, equalTo(PipelineResult.State.DONE)); + } + + private Row nestedRow(List arr, Integer intVal) { + return Row.withSchema(NESTED_SCHEMA).addValues(arr, intVal).build(); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java new file mode 100644 index 000000000000..316028d7599f --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergTable.TRIGGERING_FREQUENCY_FIELD; +import static org.apache.beam.sdk.schemas.Schema.toSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.stream.Stream; +import org.apache.beam.sdk.extensions.sql.TableUtils; +import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +/** UnitTest for {@link IcebergTableProvider}. */ +public class IcebergTableProviderTest { + private final IcebergTableProvider provider = + new IcebergTableProvider( + "test_catalog", + ImmutableMap.of( + "catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "warehouse", "gs://bucket/warehouse", + "beam.catalog.test_catalog.hadoop.fs.gs.project.id", "apache-beam-testing", + "beam.catalog.test_catalog.hadoop.foo", "bar")); + + @Test + public void testGetTableType() { + assertEquals("iceberg", provider.getTableType()); + } + + @Test + public void testBuildBeamSqlTable() throws Exception { + ImmutableMap properties = ImmutableMap.of(TRIGGERING_FREQUENCY_FIELD, 30); + + ObjectMapper mapper = new ObjectMapper(); + String propertiesString = mapper.writeValueAsString(properties); + Table table = fakeTableWithProperties("my_table", propertiesString); + BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); + + assertNotNull(sqlTable); + assertTrue(sqlTable instanceof IcebergTable); + + IcebergTable icebergTable = (IcebergTable) sqlTable; + assertEquals("namespace.table", icebergTable.tableIdentifier); + assertEquals(provider.catalogConfig, icebergTable.catalogConfig); + } + + private static Table fakeTableWithProperties(String name, String properties) { + return Table.builder() + .name(name) + .comment(name + " table") + .location("namespace.table") + .schema( + Stream.of( + Schema.Field.nullable("id", Schema.FieldType.INT32), + Schema.Field.nullable("name", Schema.FieldType.STRING)) + .collect(toSchema())) + .type("iceberg") + .properties(TableUtils.parseProperties(properties)) + .build(); + } +} diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 6655df7d80c6..2e6274571a89 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -71,13 +71,8 @@ dependencies { implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - // **** IcebergIO runtime dependencies **** - runtimeOnly library.java.hadoop_auth - runtimeOnly library.java.hadoop_client - // For writing to GCS - runtimeOnly library.java.bigdataoss_gcs_connector + // **** IcebergIO catalogs **** // HiveCatalog - runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive") // BigQueryMetastoreCatalog (Java 11+) runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 4d79fb061d05..2ac04eb067a3 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -63,11 +63,11 @@ dependencies { permitUnusedDeclared "org.immutables:value:2.8.8" implementation library.java.vendored_calcite_1_28_0 runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" + runtimeOnly library.java.bigdataoss_gcs_connector + runtimeOnly library.java.hadoop_client testImplementation project(":sdks:java:managed") - testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio - testImplementation library.java.bigdataoss_gcs_connector testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.parquet:parquet-avro:$parquet_version" testImplementation "org.apache.parquet:parquet-common:$parquet_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index 1ff7119196e0..614c45fcf624 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlBasicCall; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; @@ -47,6 +48,7 @@ import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.NaNUtil; import org.checkerframework.checker.nullness.qual.Nullable; @@ -57,8 +59,9 @@ * *

Note: Only supports top-level fields (i.e. cannot reference nested fields). */ -class FilterUtils { - private static final Map FILTERS = +@Internal +public class FilterUtils { + static final Map FILTERS = ImmutableMap.builder() .put(SqlKind.IS_NULL, Operation.IS_NULL) .put(SqlKind.IS_NOT_NULL, Operation.NOT_NULL) @@ -74,6 +77,8 @@ class FilterUtils { .put(SqlKind.OR, Operation.OR) .build(); + public static final Set SUPPORTED_OPS = FILTERS.keySet(); + /** * Parses a SQL filter expression string and returns a set of all field names referenced within * it. @@ -115,7 +120,6 @@ private static void extractFieldNames(SqlNode node, Set fieldNames) { } // SqlLiteral nodes do not contain field names, so we can ignore them. } - /** * parses a SQL filter expression string into an Iceberg {@link Expression} that can be used for * data pruning. @@ -261,8 +265,10 @@ private static Expression convertFieldInLiteral(Operation op, SqlBasicCall call, checkArgument( value instanceof SqlNodeList, "Expected right hand side to be a list but got " + value.getClass()); - String name = ((SqlIdentifier) term).getSimple(); - TypeID type = schema.findType(name).typeId(); + String caseInsensitiveName = ((SqlIdentifier) term).getSimple(); + Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); + String name = field.name(); + TypeID type = field.type().typeId(); List list = ((SqlNodeList) value) .getList().stream().filter(Objects::nonNull).collect(Collectors.toList()); @@ -287,13 +293,17 @@ private static Expression convertFieldAndLiteral( SqlNode left = getLeftChild(call); SqlNode right = getRightChild(call); if (left instanceof SqlIdentifier && right instanceof SqlLiteral) { - String name = ((SqlIdentifier) left).getSimple(); - TypeID type = schema.findType(name).typeId(); + String caseInsensitiveName = ((SqlIdentifier) left).getSimple(); + Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); + String name = field.name(); + TypeID type = field.type().typeId(); Object value = convertLiteral((SqlLiteral) right, name, type); return convertLR.apply(name, value); } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) { - String name = ((SqlIdentifier) right).getSimple(); - TypeID type = schema.findType(name).typeId(); + String caseInsensitiveName = ((SqlIdentifier) right).getSimple(); + Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); + String name = field.name(); + TypeID type = field.type().typeId(); Object value = convertLiteral((SqlLiteral) left, name, type); return convertRL.apply(name, value); } else { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index c3a185f3a833..3358bc072a2d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -50,6 +50,8 @@ public static Builder builder() { return new AutoValue_IcebergCatalogConfig.Builder(); } + public abstract Builder toBuilder(); + public org.apache.iceberg.catalog.Catalog catalog() { if (cachedCatalog == null) { String catalogName = getCatalogName(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index de88b4af2699..81ec229df70f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -93,7 +93,6 @@ public boolean advance() throws IOException { // This nullness annotation is incorrect, but the most expedient way to work with Iceberg's APIs // which are not null-safe. - @SuppressWarnings("nullness") org.apache.iceberg.Schema requiredSchema = source.getScanConfig().getRequiredSchema(); @Nullable String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING);