Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 6
"modification": 3
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799))
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))
* [IcebergIO] Create tables with a specified partition spec ([#34966](https://github.com/apache/beam/pull/34966))
* [IcebergIO] Create tables with a specified partition spec ([#34966](https://github.com/apache/beam/pull/34966), [#35268](https://github.com/apache/beam/pull/35268))
* [IcebergIO] Dynamically create namespaces if needed ([#35228](https://github.com/apache/beam/pull/35228))

## New Features / Improvements
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ dependencies {
provided library.java.hadoop_client
permitUnusedDeclared library.java.hadoop_client
provided library.java.kafka_clients

testImplementation "org.apache.iceberg:iceberg-api:1.6.1"
testImplementation "org.apache.iceberg:iceberg-core:1.6.1"
testImplementation library.java.vendored_calcite_1_28_0
testImplementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
Expand Down
1 change: 1 addition & 0 deletions sdks/java/extensions/sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ data: {
"LOCATION"
"TBLPROPERTIES"
"PROPERTIES"
"PARTITIONED"
]

# List of keywords from "keywords" section that are not reserved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}

SqlNodeList PartitionFieldList() :
{
final List<SqlNode> list = new ArrayList<SqlNode>();
SqlNode field;
}
{
field = StringLiteral() { list.add(field); }
(
<COMMA> field = StringLiteral() { list.add(field); }
)*
{
return new SqlNodeList(list, getPos());
}
}

/**
* Note: This example is probably out of sync with the code.
*
* CREATE TABLE ( IF NOT EXISTS )?
* CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
* ( database_name '.' )? table_name '(' column_def ( ',' column_def )* ')'
* TYPE type_name
* ( PARTITIONED BY '(' partition_field ( ',' partition_field )* ')' )?
* ( COMMENT comment_string )?
* ( LOCATION location_string )?
* ( TBLPROPERTIES tbl_properties )?
Expand All @@ -271,6 +287,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
final SqlIdentifier id;
List<Schema.Field> fieldList = null;
final SqlNode type;
SqlNodeList partitionFields = null;
SqlNode comment = null;
SqlNode location = null;
SqlNode tblProperties = null;
Expand All @@ -290,6 +307,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
|
type = SimpleIdentifier()
)
[ <PARTITIONED> <BY> <LPAREN> partitionFields = PartitionFieldList() <RPAREN> ]
[ <COMMENT> comment = StringLiteral() ]
[ <LOCATION> location = StringLiteral() ]
[ <TBLPROPERTIES> tblProperties = StringLiteral() ]
Expand All @@ -302,6 +320,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
id,
fieldList,
type,
partitionFields,
comment,
location,
tblProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.TableUtils;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
Expand All @@ -33,12 +35,14 @@
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNodeList;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlUtil;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlWriter;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Pair;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Parse tree for {@code CREATE EXTERNAL TABLE} statement. */
@SuppressWarnings({
Expand All @@ -51,6 +55,7 @@ public class SqlCreateExternalTable extends SqlCreate implements BeamSqlParser.E
private final SqlNode comment;
private final SqlNode location;
private final SqlNode tblProperties;
private final @Nullable SqlNodeList partitionFields;

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("CREATE EXTERNAL TABLE", SqlKind.OTHER_DDL);
Expand All @@ -63,13 +68,15 @@ public SqlCreateExternalTable(
SqlIdentifier name,
List<Schema.Field> columnList,
SqlNode type,
SqlNodeList partitionFields,
SqlNode comment,
SqlNode location,
SqlNode tblProperties) {
super(OPERATOR, pos, replace, ifNotExists);
this.name = checkNotNull(name);
this.columnList = columnList; // may be null
this.type = checkNotNull(type);
this.partitionFields = partitionFields;
this.comment = comment; // may be null
this.location = location; // may be null
this.tblProperties = tblProperties; // may be null
Expand Down Expand Up @@ -98,6 +105,19 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
}
writer.keyword("TYPE");
type.unparse(writer, 0, 0);
if (partitionFields != null) {
writer.keyword("PARTITIONED");
writer.keyword("BY");
writer.sep("(");
for (int i = 0; i < partitionFields.size(); i++) {
if (i > 0) {
writer.sep(",");
}
SqlNode field = partitionFields.get(i);
field.unparse(writer, 0, 0);
}
writer.sep(")");
}
if (comment != null) {
writer.keyword("COMMENT");
comment.unparse(writer, 0, 0);
Expand Down Expand Up @@ -130,7 +150,17 @@ public void execute(CalcitePrepare.Context context) {
name.getParserPosition(),
RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
}

BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
if (partitionFields != null) {
checkArgument(
schema.resolveMetastore().supportsPartitioning(),
"Invalid use of 'PARTITIONED BY()': Table '%s' of type '%s' "
+ "does not support partitioning.",
SqlDdlNodes.name(name),
SqlDdlNodes.getString(type));
}

schema.resolveMetastore().createTable(toTable());
}

Expand All @@ -149,11 +179,19 @@ private void unparseColumn(SqlWriter writer, Schema.Field column) {
}
}

private @Nullable List<String> parsePartitionFields() {
if (partitionFields == null) {
return null;
}
return partitionFields.stream().map(SqlDdlNodes::getString).collect(Collectors.toList());
}

private Table toTable() {
return Table.builder()
.type(SqlDdlNodes.getString(type))
.name(SqlDdlNodes.name(name))
.schema(columnList.stream().collect(toSchema()))
.partitionFields(parsePartitionFields())
.comment(SqlDdlNodes.getString(comment))
.location(SqlDdlNodes.getString(location))
.properties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.TableUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -34,6 +35,8 @@ public abstract class Table implements Serializable {

public abstract Schema getSchema();

public abstract @Nullable List<String> getPartitionFields();

public abstract @Nullable String getComment();

public abstract @Nullable String getLocation();
Expand All @@ -55,6 +58,8 @@ public abstract static class Builder {

public abstract Builder schema(Schema getSchema);

public abstract Builder partitionFields(List<String> fields);

public abstract Builder comment(String name);

public abstract Builder location(String location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class InMemoryCatalog implements Catalog {
private final String name;
private final Map<String, String> properties;
protected final InMemoryMetaStore metaStore = new InMemoryMetaStore();
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();

public InMemoryCatalog(String name, Map<String, String> properties) {
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ default Set<String> getSubProviders() {
default TableProvider getSubProvider(String name) {
return null;
}

default boolean supportsPartitioning() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@

import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;

public class IcebergCatalog extends InMemoryCatalog {
private final IcebergMetastore metaStore = new IcebergMetastore();

public IcebergCatalog(String name, Map<String, String> properties) {
super(name, properties);
metaStore.registerProvider(new IcebergTableProvider(name, properties));
}

@Override
public MetaStore metaStore() {
return metaStore;
}

@Override
public String type() {
return "iceberg";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;

import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;

public class IcebergMetastore extends InMemoryMetaStore {
@Override
public boolean supportsPartitioning() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class IcebergTable extends SchemaBaseBeamTable {
@VisibleForTesting final String tableIdentifier;
@VisibleForTesting final IcebergCatalogConfig catalogConfig;
@VisibleForTesting @Nullable Integer triggeringFrequency;
@VisibleForTesting final @Nullable List<String> partitionFields;

IcebergTable(Table table, IcebergCatalogConfig catalogConfig) {
super(table.getSchema());
Expand All @@ -74,6 +75,7 @@ class IcebergTable extends SchemaBaseBeamTable {
if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {
this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt();
}
this.partitionFields = table.getPartitionFields();
}

@Override
Expand All @@ -83,6 +85,9 @@ public POutput buildIOWriter(PCollection<Row> input) {
if (triggeringFrequency != null) {
configBuilder.put(TRIGGERING_FREQUENCY_FIELD, triggeringFrequency);
}
if (partitionFields != null) {
configBuilder.put("partition_fields", partitionFields);
}
return input.apply(Managed.write(Managed.ICEBERG).withConfig(configBuilder.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public String getTableType() {
public BeamSqlTable buildBeamSqlTable(Table table) {
return new IcebergTable(table, catalogConfig);
}

@Override
public boolean supportsPartitioning() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@
public class BeamSqlCliTest {
@Rule public transient ExpectedException thrown = ExpectedException.none();

@Test
public void testExecute_createTextTable_invalidPartitioningError() {
InMemoryMetaStore metaStore = new InMemoryMetaStore();
metaStore.registerProvider(new TextTableProvider());

BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid use of 'PARTITIONED BY()': Table 'person' of type 'text' does not support partitioning.");
cli.execute(
"CREATE EXTERNAL TABLE person (\n"
+ "id int COMMENT 'id', \n"
+ "name varchar COMMENT 'name', \n"
+ "age int COMMENT 'age') \n"
+ "TYPE 'text' \n"
+ "PARTITIONED BY ('id', 'name') \n"
+ "COMMENT '' LOCATION '/home/admin/orders'");
}

@Test
public void testExecute_createTextTable() throws Exception {
InMemoryMetaStore metaStore = new InMemoryMetaStore();
Expand Down
Loading
Loading