Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
* The Beam Java API for Calcite SqlTransform is no longer experimental ([BEAM-12680](https://issues.apache.org/jira/browse/BEAM-12680)).

## I/Os

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,7 @@
* }</pre>
*/
@AutoValue
@Experimental
@AutoValue.CopyAnnotations
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> {
static final String PCOLLECTION_NAME = "PCOLLECTION";

Expand All @@ -122,9 +117,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>

abstract QueryParameters queryParameters();

abstract List<UdfDefinition> udfDefinitions();
abstract @Experimental List<UdfDefinition> udfDefinitions();

abstract List<UdafDefinition> udafDefinitions();
abstract @Experimental List<UdafDefinition> udafDefinitions();

abstract boolean autoLoading();

Expand Down Expand Up @@ -154,8 +149,9 @@ public PCollection<Row> expand(PInput input) {

tableProviderMap().forEach(sqlEnvBuilder::addSchema);

if (defaultTableProvider() != null) {
sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
final @Nullable String defaultTableProvider = defaultTableProvider();
if (defaultTableProvider != null) {
sqlEnvBuilder.setCurrentSchema(defaultTableProvider);
}

sqlEnvBuilder.setQueryPlannerClassName(
Expand Down Expand Up @@ -239,6 +235,7 @@ public SqlTransform withDefaultTableProvider(String name, TableProvider tablePro
return withTableProvider(name, tableProvider).toBuilder().setDefaultTableProvider(name).build();
}

@Experimental
public SqlTransform withQueryPlannerClass(Class<? extends QueryPlanner> clazz) {
return toBuilder().setQueryPlannerClassName(clazz.getName()).build();
}
Expand All @@ -265,6 +262,7 @@ public SqlTransform withAutoLoading(boolean autoLoading) {
*
* <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDF in BeamSql.
*/
@Experimental
public SqlTransform registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
}
Expand All @@ -273,6 +271,7 @@ public SqlTransform registerUdf(String functionName, Class<? extends BeamSqlUdf>
* Register {@link SerializableFunction} as a UDF function used in this query. Note, {@link
* SerializableFunction} must have a constructor without arguments.
*/
@Experimental
public SqlTransform registerUdf(String functionName, SerializableFunction sfn) {
return registerUdf(functionName, sfn.getClass(), "apply");
}
Expand All @@ -288,6 +287,7 @@ private SqlTransform registerUdf(String functionName, Class<?> clazz, String met
}

/** register a {@link Combine.CombineFn} as UDAF function used in this query. */
@Experimental
public SqlTransform registerUdaf(String functionName, Combine.CombineFn combineFn) {
ImmutableList<UdafDefinition> newUdafs =
ImmutableList.<UdafDefinition>builder()
Expand All @@ -311,16 +311,17 @@ static Builder builder() {
}

@AutoValue.Builder
@AutoValue.CopyAnnotations
abstract static class Builder {
abstract Builder setQueryString(String queryString);

abstract Builder setQueryParameters(QueryParameters queryParameters);

abstract Builder setDdlStrings(List<String> ddlStrings);

abstract Builder setUdfDefinitions(List<UdfDefinition> udfDefinitions);
abstract @Experimental Builder setUdfDefinitions(List<UdfDefinition> udfDefinitions);

abstract Builder setUdafDefinitions(List<UdafDefinition> udafDefinitions);
abstract @Experimental Builder setUdafDefinitions(List<UdafDefinition> udafDefinitions);

abstract Builder setAutoLoading(boolean autoLoading);

Expand All @@ -335,7 +336,7 @@ abstract static class Builder {

@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
@Experimental
abstract static class UdfDefinition {
abstract String udfName();

Expand All @@ -350,7 +351,7 @@ static UdfDefinition of(String udfName, Class<?> clazz, String methodName) {

@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
@Experimental
abstract static class UdafDefinition {
abstract String udafName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand All @@ -27,6 +28,7 @@
@SuppressWarnings({
"rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
@Experimental
public interface UdfUdafProvider {
/** For UDFs implement {@link BeamSqlUdf}. */
default Map<String, Class<? extends BeamSqlUdf>> getBeamSqlUdfs() {
Expand Down