Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e603a8f
add Iceberg table provider and tests
ahmedabu98 May 1, 2025
f1cbc81
properties go in the tableprovider initialization
ahmedabu98 May 1, 2025
2f4bfec
tobuilder
ahmedabu98 May 1, 2025
b7e85c1
streaming integration test
ahmedabu98 May 6, 2025
19d3db1
spotless
ahmedabu98 May 6, 2025
87b838e
extend test to include multi nested types; fix iceberg <-> conversion…
ahmedabu98 May 8, 2025
960b733
Merge branch 'master' of https://github.com/ahmedabu98/beam into ice_sql
ahmedabu98 May 8, 2025
4e4e31f
add to changes.md
ahmedabu98 May 8, 2025
06be2c0
spotless
ahmedabu98 May 8, 2025
138b54a
fix tests
ahmedabu98 May 8, 2025
29599c4
clean
ahmedabu98 May 12, 2025
964b338
update CHANGES
ahmedabu98 May 14, 2025
1468960
Merge branch 'master' of https://github.com/ahmedabu98/beam into ice_sql
ahmedabu98 May 27, 2025
9b5583c
add projection pushdown and column pruning
ahmedabu98 May 28, 2025
867aaf1
spotless
ahmedabu98 May 28, 2025
841a3f7
fixes
ahmedabu98 May 29, 2025
bcb3d10
fixes
ahmedabu98 May 29, 2025
f447a7c
updates
ahmedabu98 Jun 5, 2025
c85afc5
Merge branch 'master' of https://github.com/ahmedabu98/beam into ice_sql
ahmedabu98 Jun 11, 2025
d61c078
sync with HEAD
ahmedabu98 Jun 11, 2025
0158cac
sync with HEAD and use new Catalog implementation
ahmedabu98 Jun 11, 2025
c40a8c6
sync with HEAD
ahmedabu98 Jun 11, 2025
f5eb78d
mark new interfaces @internal
ahmedabu98 Jun 12, 2025
73b6087
spotless
ahmedabu98 Jun 12, 2025
5dd2a12
fix unparse method
ahmedabu98 Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 4
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799))
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))

## New Features / Improvements
* [Beam SQL] Introducing Beam Catalogs ([#35223](https://github.com/apache/beam/pull/35223))
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
Expand Down
4 changes: 4 additions & 0 deletions sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ dependencies {
fmppTask "org.freemarker:freemarker:2.3.31"
fmppTemplates library.java.vendored_calcite_1_28_0
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:managed")
implementation project(":sdks:java:io:iceberg")
runtimeOnly project(":sdks:java:io:iceberg:bqms")
runtimeOnly project(":sdks:java:io:iceberg:hive")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:join-library")
permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
if (i > 0) {
writer.keyword(",");
}
properties.get(i).unparse(writer, leftPrec, rightPrec);
}

for (int i = 0; i < properties.size(); i += 2) {
if (i > 0) {
writer.keyword(",");
}
properties.get(i).unparse(writer, leftPrec, rightPrec); // key
SqlNode property = properties.get(i);
checkState(
property instanceof SqlNodeList,
String.format(
"Unexpected properties entry '%s' of class '%s'", property, property.getClass()));
SqlNodeList kv = ((SqlNodeList) property);

kv.get(0).unparse(writer, leftPrec, rightPrec); // key
writer.keyword("=");
properties.get(i + 1).unparse(writer, leftPrec, rightPrec); // value
kv.get(1).unparse(writer, leftPrec, rightPrec); // value
}
writer.keyword(")");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;

/**
* Represents a named and configurable container for managing tables. Is defined with a type and
* configuration properties. Uses an underlying {@link MetaStore} to manage tables and table
* providers.
*/
@Internal
public interface Catalog {
/** A type that defines this catalog. */
String type();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -32,6 +33,7 @@
* <p>When {@link #registerTableProvider(String, TableProvider)} is called, the provider should
* become available for all catalogs.
*/
@Internal
public interface CatalogManager {
/** Creates and stores a catalog of a particular type. */
void createCatalog(String name, String type, Map<String, String> properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import org.apache.beam.sdk.annotations.Internal;

/**
* Over-arching registrar to capture available {@link Catalog}s. Implementations should be marked
* with {@link com.google.auto.service.AutoService} to be available to {@link
* java.util.ServiceLoader}s.
*/
@Internal
public interface CatalogRegistrar {
Iterable<Class<? extends Catalog>> getCatalogs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.util.Preconditions;

@AutoService(Catalog.class)
public class InMemoryCatalog implements Catalog {
private final String name;
private final Map<String, String> properties;
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
protected final InMemoryMetaStore metaStore = new InMemoryMetaStore();

public InMemoryCatalog(String name, Map<String, String> properties) {
this.name = name;
Expand All @@ -41,7 +39,8 @@ public String type() {

@Override
public String name() {
return Preconditions.checkStateNotNull(name, "InMemoryCatalog has not been initialized");
return Preconditions.checkStateNotNull(
name, getClass().getSimpleName() + " has not been initialized");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.beam.sdk.extensions.sql.meta.catalog;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

@AutoService(CatalogRegistrar.class)
public class InMemoryCatalogRegistrar implements CatalogRegistrar {
@Override
public Iterable<Class<? extends Catalog>> getCatalogs() {
return ImmutableList.of(InMemoryCatalog.class);
return ImmutableList.<Class<? extends Catalog>>builder()
.add(InMemoryCatalog.class)
.add(IcebergCatalog.class)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;

public class IcebergCatalog extends InMemoryCatalog {
public IcebergCatalog(String name, Map<String, String> properties) {
super(name, properties);
metaStore.registerProvider(new IcebergTableProvider(name, properties));
}

@Override
public String type() {
return "iceberg";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import static org.apache.beam.sdk.io.iceberg.FilterUtils.SUPPORTED_OPS;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.AND;
import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind.OR;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergFilter implements BeamSqlTableFilter {
private @Nullable List<RexNode> supported;
private @Nullable List<RexNode> unsupported;
private final List<RexNode> predicateCNF;

public IcebergFilter(List<RexNode> predicateCNF) {
this.predicateCNF = predicateCNF;
}

private void maybeInitialize() {
if (supported != null && unsupported != null) {
return;
}
ImmutableList.Builder<RexNode> supportedBuilder = ImmutableList.builder();
ImmutableList.Builder<RexNode> unsupportedBuilder = ImmutableList.builder();
for (RexNode node : predicateCNF) {
if (!node.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) {
throw new IllegalArgumentException(
"Predicate node '"
+ node.getClass().getSimpleName()
+ "' should be a boolean expression, but was: "
+ node.getType().getSqlTypeName());
}

if (isSupported(node).getLeft()) {
supportedBuilder.add(node);
} else {
unsupportedBuilder.add(node);
}
}
supported = supportedBuilder.build();
unsupported = unsupportedBuilder.build();
}

@Override
public List<RexNode> getNotSupported() {
maybeInitialize();
return checkStateNotNull(unsupported);
}

@Override
public int numSupported() {
maybeInitialize();
return BeamSqlTableFilter.expressionsInFilter(checkStateNotNull(supported));
}

public List<RexNode> getSupported() {
maybeInitialize();
return checkStateNotNull(supported);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(IcebergFilter.class)
.add(
"supported",
checkStateNotNull(supported).stream()
.map(RexNode::toString)
.collect(Collectors.joining()))
.add(
"unsupported",
checkStateNotNull(unsupported).stream()
.map(RexNode::toString)
.collect(Collectors.joining()))
.toString();
}

/**
* Check whether a {@code RexNode} is supported. As of right now Iceberg supports: 1. Complex
* predicates (both conjunction and disjunction). 2. Comparison between a column and a literal.
*
* @param node A node to check for predicate push-down support.
* @return A pair containing a boolean whether an expression is supported and the number of input
* references used by the expression.
*/
private Pair<Boolean, Integer> isSupported(RexNode node) {
int numberOfInputRefs = 0;
boolean isSupported = true;

if (node instanceof RexCall) {
RexCall compositeNode = (RexCall) node;
if (!SUPPORTED_OPS.contains(node.getKind())) {
isSupported = false;
} else {
for (RexNode operand : compositeNode.getOperands()) {
// All operands must be supported for a parent node to be supported.
Pair<Boolean, Integer> childSupported = isSupported(operand);
if (!node.getKind().belongsTo(ImmutableSet.of(AND, OR))) {
numberOfInputRefs += childSupported.getRight();
}
// Predicate functions with multiple columns are unsupported.
isSupported = numberOfInputRefs < 2 && childSupported.getLeft();
}
}
} else if (node instanceof RexInputRef) {
numberOfInputRefs = 1;
} else if (node instanceof RexLiteral) {
// RexLiterals are expected, but no action is needed.
} else {
throw new UnsupportedOperationException(
"Encountered an unexpected node type: " + node.getClass().getSimpleName());
}

return Pair.of(isSupported, numberOfInputRefs);
}
}
Loading
Loading