From c4a8b582b96b61ffd3b7a6875f66fbddaeebe5f1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 12 Apr 2024 00:01:40 -0400 Subject: [PATCH 1/4] SchemaTransformProviderTranslation --- ...nerateSequenceSchemaTransformProvider.java | 17 ++- .../schemas/transforms/SchemaTransform.java | 45 ++++++- .../transforms/SchemaTransformProvider.java | 2 +- .../SchemaTransformProviderTranslation.java | 114 ++++++++++++++++++ .../TypedSchemaTransformProvider.java | 4 +- .../providers/FlattenTransformProvider.java | 9 +- .../JavaExplodeTransformProvider.java | 15 +-- .../JavaFilterTransformProvider.java | 14 +-- .../JavaMapToFieldsTransformProvider.java | 14 +-- .../providers/LoggingTransformProvider.java | 14 +-- .../util/construction/TransformUpgrader.java | 23 ++++ ...teSequenceSchemaTransformProviderTest.java | 2 +- .../TypedSchemaTransformProviderTest.java | 39 ++++-- .../service/WindowIntoTransformProvider.java | 34 +++--- ...ionServiceSchemaTransformProviderTest.java | 51 ++++---- .../SqlTransformSchemaTransformProvider.java | 9 +- .../providers/CsvWriteTransformProvider.java | 14 +-- .../DebeziumReadSchemaTransformProvider.java | 11 +- .../FileReadSchemaTransformProvider.java | 16 ++- .../FileWriteSchemaTransformProvider.java | 18 ++- ...ReadSchemaTransformFormatProviderTest.java | 6 +- ...ReadSchemaTransformFormatProviderTest.java | 4 +- .../FileWriteSchemaTransformProviderTest.java | 2 +- ...ReadSchemaTransformFormatProviderTest.java | 7 +- ...ReadSchemaTransformFormatProviderTest.java | 6 +- ...ReadSchemaTransformFormatProviderTest.java | 6 +- ...ueryExportReadSchemaTransformProvider.java | 18 ++- ...FileLoadsWriteSchemaTransformProvider.java | 18 ++- ...ueryDirectReadSchemaTransformProvider.java | 16 ++- ...torageWriteApiSchemaTransformProvider.java | 10 +- .../BigtableReadSchemaTransformProvider.java | 12 +- .../BigtableWriteSchemaTransformProvider.java | 12 +- .../PubsubReadSchemaTransformProvider.java | 15 ++- .../PubsubWriteSchemaTransformProvider.java | 17 ++- ...PubsubLiteReadSchemaTransformProvider.java | 14 +-- ...ubsubLiteWriteSchemaTransformProvider.java | 14 +-- .../SpannerWriteSchemaTransformProvider.java | 20 ++- ...ngestreamsReadSchemaTransformProvider.java | 12 +- ...BigtableReadSchemaTransformProviderIT.java | 2 +- ...PubsubReadSchemaTransformProviderTest.java | 8 +- .../jdbc/JdbcReadSchemaTransformProvider.java | 19 ++- .../JdbcWriteSchemaTransformProvider.java | 20 ++- .../providers/JsonWriteTransformProvider.java | 14 +-- .../KafkaReadSchemaTransformProvider.java | 13 +- .../KafkaWriteSchemaTransformProvider.java | 20 ++- ...ingleStoreSchemaTransformReadProvider.java | 18 ++- ...ngleStoreSchemaTransformWriteProvider.java | 18 ++- .../SingleStoreIOSchemaTransformIT.java | 6 +- .../org/apache/beam/sdk/managed/Managed.java | 6 +- .../ManagedSchemaTransformProvider.java | 19 +-- .../managed/TestSchemaTransformProvider.java | 4 +- 51 files changed, 484 insertions(+), 357 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java index d9dfc2a90bd8..710e3be73d98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java @@ -75,13 +75,9 @@ public String description() { } @Override - public Class configurationClass() { - return GenerateSequenceConfiguration.class; - } - - @Override - public SchemaTransform from(GenerateSequenceConfiguration configuration) { - return new GenerateSequenceSchemaTransform(configuration); + public SchemaTransform from( + GenerateSequenceConfiguration configuration) { + return new GenerateSequenceSchemaTransform(configuration, identifier()); } @DefaultSchema(AutoValueSchema.class) @@ -163,10 +159,13 @@ public void validate() { } } - protected static class GenerateSequenceSchemaTransform extends SchemaTransform { + protected static class GenerateSequenceSchemaTransform + extends SchemaTransform { private final GenerateSequenceConfiguration configuration; - GenerateSequenceSchemaTransform(GenerateSequenceConfiguration configuration) { + GenerateSequenceSchemaTransform( + GenerateSequenceConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index 283720e09772..ef6ebcd2a92d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -17,9 +17,17 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.ParameterizedType; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; /** * An abstraction representing schema capable and aware transforms. The interface is intended to be @@ -33,5 +41,38 @@ * compatibility guarantees and it should not be implemented outside of the Beam repository. */ @Internal -public abstract class SchemaTransform - extends PTransform {} +public abstract class SchemaTransform + extends PTransform { + private final Row configurationRow; + private final String identifier; + + @SuppressWarnings("unchecked") + protected SchemaTransform(ConfigT configuration, String identifier) { + this.identifier = identifier; + @Nullable + ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass(); + checkStateNotNull(parameterizedType, "Could not get the SchemaTransform's parameterized type."); + checkArgument( + parameterizedType.getActualTypeArguments().length == 1, + String.format( + "Expected one parameterized type, but got %s.", + parameterizedType.getActualTypeArguments().length)); + + Class typedClass = (Class) parameterizedType.getActualTypeArguments()[0]; + + try { + this.configurationRow = + SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration); + } catch (NoSuchSchemaException e) { + throw new RuntimeException("Unable to find schema for this SchemaTransform's config.", e); + } + } + + public Row getConfigurationRow() { + return configurationRow; + } + + public String getIdentifier() { + return identifier; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index 9d0ad61b7a6c..1945a7a993ea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -56,7 +56,7 @@ default String description() { * Produce a {@link SchemaTransform} from some transform-specific configuration object. Can throw * a {@link InvalidConfigurationException} or a {@link InvalidSchemaException}. */ - SchemaTransform from(Row configuration); + SchemaTransform from(Row configuration); /** Returns the input collection names of this transform. */ default List inputCollectionNames() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java new file mode 100644 index 000000000000..555d3a244083 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class SchemaTransformProviderTranslation { + public static class SchemaTransformTranslator + implements TransformPayloadTranslator> { + private final String identifier; + private SchemaTransformProvider provider; + + public SchemaTransformTranslator(String identifier) { + this.identifier = identifier; + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + if (schemaTransformProvider.identifier().equalsIgnoreCase(identifier)) { + if (this.provider != null) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + identifier); + } + this.provider = schemaTransformProvider; + } + } + if (this.provider == null) { + throw new IllegalArgumentException( + "Could not find SchemaTransformProvider implementation for identifier " + identifier); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public String getUrn() { + return identifier; + } + + @Override + @SuppressWarnings("argument") + public @Nullable FunctionSpec translate( + AppliedPTransform> application, SdkComponents components) + throws IOException { + return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build(); + } + + @Override + public Row toConfigRow(SchemaTransform transform) { + return transform.getConfigurationRow(); + } + + @Override + public SchemaTransform fromConfigRow(Row configRow, PipelineOptions options) { + return provider.from(configRow); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class SchemaTransformRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + Map, SchemaTransformTranslator> translators = new HashMap<>(); + + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + translators.put( + SchemaTransform.class, + new SchemaTransformTranslator(schemaTransformProvider.identifier())); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + + return translators; + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index e75fa27d2d16..82d670ae8fd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -63,7 +63,7 @@ protected Class configurationClass() { * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a * {@link InvalidSchemaException}. */ - protected abstract SchemaTransform from(ConfigT configuration); + protected abstract SchemaTransform from(ConfigT configuration); /** * List the dependencies needed for this transform. Jars from classpath are used by default when @@ -87,7 +87,7 @@ public final Schema configurationSchema() { } @Override - public final SchemaTransform from(Row configuration) { + public final SchemaTransform from(Row configuration) { return from(configFromRow(configuration)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java index baeccd1ac8cc..63bc52754c9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java @@ -49,13 +49,8 @@ public class FlattenTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { + protected SchemaTransform from(Configuration configuration) { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.of( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java index 48ce5e33d9fa..ee56c706f652 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java @@ -56,13 +56,8 @@ public class JavaExplodeTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new ExplodeTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new ExplodeTransform(configuration, identifier()); } @Override @@ -105,11 +100,11 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for Explode. */ - protected static class ExplodeTransform extends SchemaTransform { - + protected static class ExplodeTransform extends SchemaTransform { private final Configuration configuration; - ExplodeTransform(Configuration configuration) { + ExplodeTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java index 4ae8d2e41b30..f9277e35033c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java @@ -54,13 +54,8 @@ public class JavaFilterTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new JavaFilterTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new JavaFilterTransform(configuration, identifier()); } @Override @@ -107,11 +102,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for Filter-java. */ - protected static class JavaFilterTransform extends SchemaTransform { + protected static class JavaFilterTransform extends SchemaTransform { private final Configuration configuration; - JavaFilterTransform(Configuration configuration) { + JavaFilterTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java index 2e2042aef05d..bbec95d408ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -56,13 +56,8 @@ public class JavaMapToFieldsTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new JavaMapToFieldsTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new JavaMapToFieldsTransform(configuration, identifier()); } @Override @@ -119,11 +114,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for MapToFields-java. */ - protected static class JavaMapToFieldsTransform extends SchemaTransform { + protected static class JavaMapToFieldsTransform extends SchemaTransform { private final Configuration configuration; - JavaMapToFieldsTransform(Configuration configuration) { + JavaMapToFieldsTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java index 25efaeae2a0e..6490db3006ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java @@ -63,13 +63,8 @@ public class LoggingTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new LoggingTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new LoggingTransform(configuration, identifier()); } @Override @@ -134,13 +129,14 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for logging. */ - protected static class LoggingTransform extends SchemaTransform { + protected static class LoggingTransform extends SchemaTransform { private static final Logger LOG = LoggerFactory.getLogger(LoggingTransform.class); private final Configuration configuration; - LoggingTransform(Configuration configuration) { + LoggingTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index deaa77d9b1be..ef0840e56aa5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util.construction; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation.SchemaTransformRegistrar; + import com.fasterxml.jackson.core.Version; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -44,6 +46,7 @@ import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher; import org.apache.beam.sdk.util.ReleaseInfo; @@ -424,6 +427,26 @@ public void close() throws Exception { return null; } + /** + * Like {@link #findUpgradeURN(PTransform)} but for {@link SchemaTransform}s + * + *

Finds a SchemaTransform by comparing the underlying URN. + * + * @param transform transform to lookup. + * @return a URN if discovered. Returns {@code null} otherwise. + */ + @SuppressWarnings({"rawtypes"}) + public static @Nullable String findUpgradeSchemaTransformURN(SchemaTransform transform) { + for (Entry, ? extends TransformPayloadTranslator> entry : + new SchemaTransformRegistrar().getTransformPayloadTranslators().entrySet()) { + if (entry.getValue().getUrn().equals(transform.getIdentifier())) { + return entry.getValue().getUrn(); + } + } + + return null; + } + /** * A utility method that converts an arbitrary serializable object into a byte array. * diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java index dcff3dedb843..4e34cde27767 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java @@ -41,7 +41,7 @@ public class GenerateSequenceSchemaTransformProviderTest { public void testGenerateSequence() { GenerateSequenceConfiguration config = GenerateSequenceConfiguration.builder().setStart(0L).setEnd(10L).build(); - SchemaTransform sequence = new GenerateSequenceSchemaTransformProvider().from(config); + SchemaTransform sequence = new GenerateSequenceSchemaTransformProvider().from(config); List expected = new ArrayList<>(10); for (long i = 0L; i < 10L; i++) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index 6b5ccbff4e42..60e395c75d13 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -67,13 +67,8 @@ public String description() { } @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - public SchemaTransform from(Configuration config) { - return new FakeSchemaTransform(config); + public SchemaTransform from(Configuration config) { + return new FakeSchemaTransform(config, identifier()); } @Override @@ -102,16 +97,17 @@ public String identifier() { } @Override - public SchemaTransform from(Configuration config) { - return new FakeSchemaTransform(config); + public SchemaTransform from(Configuration config) { + return new FakeSchemaTransform(config, identifier()); } } - public static class FakeSchemaTransform extends SchemaTransform { + public static class FakeSchemaTransform extends SchemaTransform { public Configuration config; - public FakeSchemaTransform(Configuration config) { + public FakeSchemaTransform(Configuration config, String identifier) { + super(config, identifier); this.config = config; } @@ -121,6 +117,27 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + @Test + public void testInferConfigurationClass() { + assertEquals(Configuration.class, new FakeTypedSchemaIOProvider().configurationClass()); + assertEquals(Configuration.class, new FakeMinimalTypedProvider().configurationClass()); + } + + @Test + public void testGetProperties() { + SchemaTransformProvider minimalProvider = new FakeMinimalTypedProvider(); + Row inputConfig = + Row.withSchema(minimalProvider.configurationSchema()) + .withFieldValue("field1", "field1") + .withFieldValue("field2", Integer.valueOf(13)) + .build(); + + SchemaTransform transform = minimalProvider.from(inputConfig); + + assertEquals(inputConfig, transform.getConfigurationRow()); + assertEquals(minimalProvider.identifier(), transform.getIdentifier()); + } + @Test public void testFrom() { SchemaTransformProvider provider = new FakeTypedSchemaIOProvider(); diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java index d060d5916e9f..a220520625f7 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -53,22 +53,8 @@ public class WindowIntoTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - try { - return new WindowIntoStrategy( - (WindowingStrategy) - WindowingStrategyTranslation.fromProto( - RunnerApi.WindowingStrategy.parseFrom( - configuration.getSerializedWindowingStrategy()), - null)); - } catch (IOException exn) { - throw new RuntimeException(exn); - } + protected SchemaTransform from(Configuration configuration) { + return new WindowIntoStrategy(configuration, identifier()); } @Override @@ -106,12 +92,22 @@ public abstract static class Builder { } } - private static class WindowIntoStrategy extends SchemaTransform { + private static class WindowIntoStrategy extends SchemaTransform { private final WindowingStrategy windowingStrategy; - WindowIntoStrategy(WindowingStrategy windowingStrategy) { - this.windowingStrategy = windowingStrategy; + WindowIntoStrategy(Configuration configuration, String identifier) { + super(configuration, identifier); + try { + this.windowingStrategy = + (WindowingStrategy) + WindowingStrategyTranslation.fromProto( + RunnerApi.WindowingStrategy.parseFrom( + configuration.getSerializedWindowingStrategy()), + null); + } catch (IOException exn) { + throw new RuntimeException(exn); + } } @Override diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java index 294cb016c5e0..a8e74c0c467a 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java @@ -110,14 +110,9 @@ public static class TestSchemaTransformProvider extends TypedSchemaTransformProvider { @Override - protected Class configurationClass() { - return TestSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(TestSchemaTransformConfiguration configuration) { - return new TestSchemaTransform( - configuration.str1, configuration.str2, configuration.int1, configuration.int2); + protected SchemaTransform from( + TestSchemaTransformConfiguration configuration) { + return new TestSchemaTransform(configuration, identifier()); } @Override @@ -156,18 +151,20 @@ public void processElement(@Element String element, OutputReceiver recei } } - public static class TestSchemaTransform extends SchemaTransform { + public static class TestSchemaTransform + extends SchemaTransform { private String str1; private String str2; private Integer int1; private Integer int2; - public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; + public TestSchemaTransform(TestSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); + this.str1 = configuration.str1; + this.str2 = configuration.str2; + this.int1 = configuration.int1; + this.int2 = configuration.int2; } @Override @@ -208,14 +205,9 @@ public static class TestSchemaTransformProviderMultiInputMultiOutput extends TypedSchemaTransformProvider { @Override - protected Class configurationClass() { - return TestSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(TestSchemaTransformConfiguration configuration) { - return new TestSchemaTransformMultiInputOutput( - configuration.str1, configuration.str2, configuration.int1, configuration.int2); + protected SchemaTransform from( + TestSchemaTransformConfiguration configuration) { + return new TestSchemaTransformMultiInputOutput(configuration, identifier()); } @Override @@ -234,7 +226,8 @@ public List outputCollectionNames() { } } - public static class TestSchemaTransformMultiInputOutput extends SchemaTransform { + public static class TestSchemaTransformMultiInputOutput + extends SchemaTransform { private String str1; private String str2; @@ -242,11 +235,12 @@ public static class TestSchemaTransformMultiInputOutput extends SchemaTransform private Integer int2; public TestSchemaTransformMultiInputOutput( - String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; + TestSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); + this.str1 = configuration.str1; + this.str2 = configuration.str2; + this.int1 = configuration.int1; + this.int2 = configuration.int2; } @Override @@ -454,6 +448,7 @@ public void testSchematransformEquivalentConfigSchema() throws CoderException { assertEquals(transform.int2, equivalentTransform.int2); assertEquals(transform.str1, equivalentTransform.str1); assertEquals(transform.str2, equivalentTransform.str2); + assertEquals(transform.getConfigurationRow(), equivalentTransform.getConfigurationRow()); } private RunnerApi.FunctionSpec createSpec(String identifier, Row configRow) { diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java index f032da0799d8..08cf2585afcb 100644 --- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java +++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java @@ -117,8 +117,8 @@ public Schema configurationSchema() { } @Override - public SchemaTransform from(Row configuration) { - return new SqlSchemaTransform(configuration); + public SchemaTransform from(Row configuration) { + return new SqlSchemaTransform(configuration, identifier()); } @Override @@ -153,10 +153,11 @@ public PDone expand(PCollection input) { } } - static class SqlSchemaTransform extends SchemaTransform { + static class SqlSchemaTransform extends SchemaTransform { final Row config; - public SqlSchemaTransform(Row config) { + public SqlSchemaTransform(Row config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java index f4d54c408cf4..6ae0b84106a8 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -59,13 +59,8 @@ public class CsvWriteTransformProvider private static final String WRITE_RESULTS = "output"; @Override - protected Class configurationClass() { - return CsvWriteConfiguration.class; - } - - @Override - protected SchemaTransform from(CsvWriteConfiguration configuration) { - return new CsvWriteTransform(configuration); + protected SchemaTransform from(CsvWriteConfiguration configuration) { + return new CsvWriteTransform(configuration, identifier()); } @Override @@ -119,11 +114,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for {@link CsvIO#write}. */ - protected static class CsvWriteTransform extends SchemaTransform { + protected static class CsvWriteTransform extends SchemaTransform { private final CsvWriteConfiguration configuration; - CsvWriteTransform(CsvWriteConfiguration configuration) { + CsvWriteTransform(CsvWriteConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..e4103b8b3150 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -75,16 +75,11 @@ protected DebeziumReadSchemaTransformProvider( } @Override - protected @NonNull @Initialized Class - configurationClass() { - return DebeziumReadSchemaTransformConfiguration.class; - } - - @Override - protected @NonNull @Initialized SchemaTransform from( + protected @NonNull @Initialized SchemaTransform from( DebeziumReadSchemaTransformConfiguration configuration) { // TODO(pabloem): Validate configuration parameters to ensure formatting is correct. - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // TODO(pabloem): Test this behavior diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java index 2ea3879adbcf..f2b835a3b796 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java @@ -66,13 +66,9 @@ public class FileReadSchemaTransformProvider static final String FILEPATTERN_ROW_FIELD_NAME = "filepattern"; @Override - protected Class configurationClass() { - return FileReadSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { - return new FileReadSchemaTransform(configuration); + protected SchemaTransform from( + FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration, identifier()); } @Override @@ -91,11 +87,13 @@ public List outputCollectionNames() { } @VisibleForTesting - static class FileReadSchemaTransform extends SchemaTransform { + static class FileReadSchemaTransform + extends SchemaTransform { private FileReadSchemaTransformConfiguration configuration; private boolean useInputPCollection; - FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; useInputPCollection = Strings.isNullOrEmpty(configuration.getFilepattern()); } diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java index f3c76897c0a3..0c92006dffdc 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java @@ -64,16 +64,11 @@ public class FileWriteSchemaTransformProvider static final TupleTag ERROR_TAG = new TupleTag() {}; static final TupleTag RESULT_TAG = new TupleTag() {}; - /** Provides the required {@link TypedSchemaTransformProvider#configurationClass()}. */ - @Override - protected Class configurationClass() { - return FileWriteSchemaTransformConfiguration.class; - } - /** Builds a {@link SchemaTransform} from a {@link FileWriteSchemaTransformConfiguration}. */ @Override - protected SchemaTransform from(FileWriteSchemaTransformConfiguration configuration) { - return new FileWriteSchemaTransform(configuration); + protected SchemaTransform from( + FileWriteSchemaTransformConfiguration configuration) { + return new FileWriteSchemaTransform(configuration, identifier()); } /** Returns the {@link TypedSchemaTransformProvider#identifier()} required for registration. */ @@ -99,11 +94,14 @@ public List outputCollectionNames() { * #inputCollectionNames()} tagged {@link Row}s into a {@link PCollectionRowTuple} of {@link * #outputCollectionNames()} tagged {@link Row}s. */ - static class FileWriteSchemaTransform extends SchemaTransform { + static class FileWriteSchemaTransform + extends SchemaTransform { final FileWriteSchemaTransformConfiguration configuration; - FileWriteSchemaTransform(FileWriteSchemaTransformConfiguration configuration) { + FileWriteSchemaTransform( + FileWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); validateConfiguration(configuration); this.configuration = configuration; } diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java index 5725ceff3a12..cfae5b953b35 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java @@ -90,7 +90,7 @@ public void runWriteAndReadTest( .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -186,7 +186,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(stringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create a PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java index d01e0051c7b8..5216616f6c1a 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java @@ -239,8 +239,8 @@ public void testWriteAndReadWithSchemaTransforms() { .setSchema(getStringSchemaFromBeamSchema(schema)) .build(); - SchemaTransform writeTransform = new FileWriteSchemaTransformProvider().from(writeConfig); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(readConfig); + SchemaTransform writeTransform = new FileWriteSchemaTransformProvider().from(writeConfig); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(readConfig); Schema filePatternSchema = getFilepatternSchema(); PCollection inputRows = writePipeline.apply(Create.of(rows).withRowSchema(schema)); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java index c8494446deda..52f9ef40d00f 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java @@ -60,7 +60,7 @@ public class FileWriteSchemaTransformProviderTest { @Test public void receivedUnexpectedInputTagsThrowsAnError() { - SchemaTransform transform = + SchemaTransform transform = PROVIDER.from(rowConfiguration(defaultConfiguration().setFormat(JSON).build())); PCollectionRowTuple empty = PCollectionRowTuple.empty(errorPipeline); IllegalArgumentException emptyInputError = diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java index e4bc63cd8d8e..25c35768dc0c 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java @@ -188,7 +188,8 @@ public void runWriteAndReadTest( .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = + new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); List expectedRows = @@ -233,7 +234,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -299,7 +300,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(jsonStringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java index 4c5be258651f..61f2c114dc1f 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java @@ -98,7 +98,7 @@ public void testReadStrings() { .setFormat(getFormat()) .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PCollection outputStrings = @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -192,7 +192,7 @@ public void testReadWithPCollectionOfFilepatterns() { // We will get filepatterns from the input PCollection, so don't set filepattern field here FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(getFormat()).build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = Schema.of(Field.of(FILEPATTERN_ROW_FIELD_NAME, FieldType.STRING)); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java index b1d6bba06ea9..08de27d05eaa 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java @@ -91,7 +91,7 @@ public void runWriteAndReadTest( .setFilepattern(folderPath + "/*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -190,7 +190,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(stringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java index af6ab5c71c8b..71c9968bd93a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java @@ -54,16 +54,11 @@ public class BigQueryExportReadSchemaTransformProvider "beam:schematransform:org.apache.beam:bigquery_export_read:v1"; private static final String OUTPUT_TAG = "OUTPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return BigQueryExportReadSchemaTransformConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(BigQueryExportReadSchemaTransformConfiguration configuration) { - return new BigQueryExportSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryExportReadSchemaTransformConfiguration configuration) { + return new BigQueryExportSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -94,13 +89,16 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link * BigQueryExportReadSchemaTransformConfiguration}. */ - protected static class BigQueryExportSchemaTransform extends SchemaTransform { + protected static class BigQueryExportSchemaTransform + extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; private final BigQueryExportReadSchemaTransformConfiguration configuration; - BigQueryExportSchemaTransform(BigQueryExportReadSchemaTransformConfiguration configuration) { + BigQueryExportSchemaTransform( + BigQueryExportReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java index 3212e2a30348..f5734581f4ea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java @@ -62,16 +62,11 @@ public class BigQueryFileLoadsWriteSchemaTransformProvider "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1"; static final String INPUT_TAG = "INPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return BigQueryFileLoadsWriteSchemaTransformConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { - return new BigQueryWriteSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + return new BigQueryWriteSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -102,13 +97,16 @@ public List outputCollectionNames() { * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link * BigQueryFileLoadsWriteSchemaTransformConfiguration}. */ - protected static class BigQueryWriteSchemaTransform extends SchemaTransform { + protected static class BigQueryWriteSchemaTransform + extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; - BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + BigQueryWriteSchemaTransform( + BigQueryFileLoadsWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java index 8b8e8179ce7d..a4936aff2e53 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java @@ -65,13 +65,9 @@ public class BigQueryDirectReadSchemaTransformProvider private static final String OUTPUT_TAG = "OUTPUT_ROWS"; @Override - protected Class configurationClass() { - return BigQueryDirectReadSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(BigQueryDirectReadSchemaTransformConfiguration configuration) { - return new BigQueryDirectReadSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryDirectReadSchemaTransformConfiguration configuration) { + return new BigQueryDirectReadSchemaTransform(configuration, identifier()); } @Override @@ -161,13 +157,15 @@ public abstract static class Builder { * BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link * BigQueryDirectReadSchemaTransformProvider}. */ - protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform { + protected static class BigQueryDirectReadSchemaTransform + extends SchemaTransform { private BigQueryServices testBigQueryServices = null; private final BigQueryDirectReadSchemaTransformConfiguration configuration; BigQueryDirectReadSchemaTransform( - BigQueryDirectReadSchemaTransformConfiguration configuration) { + BigQueryDirectReadSchemaTransformConfiguration configuration, String identifier) { // Validate configuration parameters before PTransform expansion + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 980d783ec43c..5d9fde19fb3d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -89,9 +89,9 @@ public class BigQueryStorageWriteApiSchemaTransformProvider protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; @Override - protected SchemaTransform from( + protected SchemaTransform from( BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { - return new BigQueryStorageWriteApiSchemaTransform(configuration); + return new BigQueryStorageWriteApiSchemaTransform(configuration, identifier()); } @Override @@ -289,13 +289,15 @@ public abstract static class Builder { * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link * BigQueryStorageWriteApiSchemaTransformProvider}. */ - protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform { + protected static class BigQueryStorageWriteApiSchemaTransform + extends SchemaTransform { private BigQueryServices testBigQueryServices = null; private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; BigQueryStorageWriteApiSchemaTransform( - BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + BigQueryStorageWriteApiSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index f48a23559141..cd4e1f7c466f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -71,8 +71,9 @@ public class BigtableReadSchemaTransformProvider .build(); @Override - protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) { - return new BigtableReadSchemaTransform(configuration); + protected SchemaTransform from( + BigtableReadSchemaTransformConfiguration configuration) { + return new BigtableReadSchemaTransform(configuration, identifier()); } @Override @@ -128,10 +129,13 @@ public abstract static class Builder { * BigtableReadSchemaTransformConfiguration} and instantiated by {@link * BigtableReadSchemaTransformProvider}. */ - private static class BigtableReadSchemaTransform extends SchemaTransform { + private static class BigtableReadSchemaTransform + extends SchemaTransform { private final BigtableReadSchemaTransformConfiguration configuration; - BigtableReadSchemaTransform(BigtableReadSchemaTransformConfiguration configuration) { + BigtableReadSchemaTransform( + BigtableReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index cc480be6aa7e..0e58caa17242 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -61,8 +61,9 @@ public class BigtableWriteSchemaTransformProvider private static final String INPUT_TAG = "input"; @Override - protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) { - return new BigtableWriteSchemaTransform(configuration); + protected SchemaTransform from( + BigtableWriteSchemaTransformConfiguration configuration) { + return new BigtableWriteSchemaTransform(configuration, identifier()); } @Override @@ -120,10 +121,13 @@ public abstract static class Builder { * BigtableWriteSchemaTransformConfiguration} and instantiated by {@link * BigtableWriteSchemaTransformProvider}. */ - private static class BigtableWriteSchemaTransform extends SchemaTransform { + private static class BigtableWriteSchemaTransform + extends SchemaTransform { private final BigtableWriteSchemaTransformConfiguration configuration; - BigtableWriteSchemaTransform(BigtableWriteSchemaTransformConfiguration configuration) { + BigtableWriteSchemaTransform( + BigtableWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java index c1f6b2b31754..791e161a367e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java @@ -70,12 +70,8 @@ public class PubsubReadSchemaTransformProvider Schema.builder().addStringField("error").addNullableByteArrayField("row").build(); @Override - public Class configurationClass() { - return PubsubReadSchemaTransformConfiguration.class; - } - - @Override - public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration) { + public SchemaTransform from( + PubsubReadSchemaTransformConfiguration configuration) { if (configuration.getSubscription() == null && configuration.getTopic() == null) { throw new IllegalArgumentException( "To read from Pubsub, a subscription name or a topic name must be provided"); @@ -122,7 +118,7 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration } PubsubReadSchemaTransform transform = - new PubsubReadSchemaTransform(configuration, payloadSchema, payloadMapper); + new PubsubReadSchemaTransform(identifier(), configuration, payloadSchema, payloadMapper); if (configuration.getClientFactory() != null) { transform.setClientFactory(configuration.getClientFactory()); @@ -134,7 +130,8 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration return transform; } - private static class PubsubReadSchemaTransform extends SchemaTransform implements Serializable { + private static class PubsubReadSchemaTransform + extends SchemaTransform implements Serializable { final Schema beamSchema; final SerializableFunction valueMapper; final PubsubReadSchemaTransformConfiguration configuration; @@ -142,9 +139,11 @@ private static class PubsubReadSchemaTransform extends SchemaTransform implement @Nullable Clock clock; PubsubReadSchemaTransform( + String identifier, PubsubReadSchemaTransformConfiguration configuration, Schema payloadSchema, SerializableFunction valueMapper) { + super(configuration, identifier); this.configuration = configuration; Schema outputSchema; List attributes = configuration.getAttributes(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java index 6187f6f79d3e..177ed2f5bb44 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -67,11 +67,6 @@ public class PubsubWriteSchemaTransformProvider public static final Set VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(",")); - @Override - public Class configurationClass() { - return PubsubWriteSchemaTransformConfiguration.class; - } - public static class ErrorFn extends DoFn { private final SerializableFunction valueMapper; private final @Nullable Set attributes; @@ -137,20 +132,24 @@ public void processElement(@Element Row row, MultiOutputReceiver receiver) throw } @Override - public SchemaTransform from(PubsubWriteSchemaTransformConfiguration configuration) { + public SchemaTransform from( + PubsubWriteSchemaTransformConfiguration configuration) { if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase())) { throw new IllegalArgumentException( String.format( "Format %s not supported. Only supported formats are %s", configuration.getFormat(), VALID_FORMATS_STR)); } - return new PubsubWriteSchemaTransform(configuration); + return new PubsubWriteSchemaTransform(configuration, identifier()); } - private static class PubsubWriteSchemaTransform extends SchemaTransform implements Serializable { + private static class PubsubWriteSchemaTransform + extends SchemaTransform implements Serializable { final PubsubWriteSchemaTransformConfiguration configuration; - PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration configuration) { + PubsubWriteSchemaTransform( + PubsubWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index 8afe730f32ce..1792e76d73d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -85,12 +85,6 @@ public class PubsubLiteReadSchemaTransformProvider public static final TupleTag OUTPUT_TAG = new TupleTag() {}; public static final TupleTag ERROR_TAG = new TupleTag() {}; - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return PubsubLiteReadSchemaTransformConfiguration.class; - } - public static class ErrorFn extends DoFn { private final SerializableFunction valueMapper; private final Counter errorCounter; @@ -192,8 +186,9 @@ public void finish(FinishBundleContext c) { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteReadSchemaTransformConfiguration configuration) { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform< + PubsubLiteReadSchemaTransformConfiguration> + from(PubsubLiteReadSchemaTransformConfiguration configuration) { if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( String.format( @@ -242,7 +237,8 @@ public void finish(FinishBundleContext c) { "To read from Pubsub Lite in JSON or AVRO format, you must provide a schema."); } } - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { String project = configuration.getProject(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java index 8ba8176035da..1952c24f3c52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java @@ -80,12 +80,6 @@ public class PubsubLiteWriteSchemaTransformProvider private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class); - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return PubsubLiteWriteSchemaTransformConfiguration.class; - } - public static class ErrorCounterFn extends DoFn { private final SerializableFunction toBytesFn; private final Counter errorCounter; @@ -172,8 +166,9 @@ public void finish() { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteWriteSchemaTransformConfiguration configuration) { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform< + PubsubLiteWriteSchemaTransformConfiguration> + from(PubsubLiteWriteSchemaTransformConfiguration configuration) { if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( @@ -184,7 +179,8 @@ public void finish() { + String.join(", ", SUPPORTED_FORMATS)); } - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { List attributesConfigValue = configuration.getAttributes(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 6c8a2541a88b..42793dc9e495 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -55,21 +55,19 @@ public class SpannerWriteSchemaTransformProvider SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return SpannerWriteSchemaTransformConfiguration.class; + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + SpannerWriteSchemaTransformConfiguration> + from(SpannerWriteSchemaTransformConfiguration configuration) { + return new SpannerSchemaTransformWrite(configuration, identifier()); } - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerWriteSchemaTransformConfiguration configuration) { - return new SpannerSchemaTransformWrite(configuration); - } - - static class SpannerSchemaTransformWrite extends SchemaTransform implements Serializable { + static class SpannerSchemaTransformWrite + extends SchemaTransform implements Serializable { private final SpannerWriteSchemaTransformConfiguration configuration; - SpannerSchemaTransformWrite(SpannerWriteSchemaTransformConfiguration configuration) { + SpannerSchemaTransformWrite( + SpannerWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java index f3562e4cd917..d3035b1dea98 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java @@ -79,11 +79,6 @@ public class SpannerChangestreamsReadSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> { - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return SpannerChangestreamsReadConfiguration.class; - } private static final Logger LOG = LoggerFactory.getLogger(SpannerChangestreamsReadSchemaTransformProvider.class); @@ -94,10 +89,9 @@ public class SpannerChangestreamsReadSchemaTransformProvider Schema.builder().addStringField("error").addNullableStringField("row").build(); @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration - configuration) { - return new SchemaTransform() { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform + from(SpannerChangestreamsReadConfiguration configuration) { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { Pipeline p = input.getPipeline(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java index 81d3103f38bf..c2da6a02cfa9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java @@ -222,7 +222,7 @@ public void testRead() { .setInstanceId(instanceId) .setProjectId(projectId) .build(); - SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); PCollection rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); PAssert.that(rows).containsInAnyOrder(expectedRows); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java index dd5a9abd5ac8..fe1eece009e0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java @@ -182,7 +182,7 @@ public void testReadRaw() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")) @@ -229,7 +229,7 @@ public void testReadAttributes() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")) @@ -260,7 +260,7 @@ public void testReadAvro() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")).containsInAnyOrder(ROWS); @@ -288,7 +288,7 @@ public void testReadAvroWithError() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")).empty(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 0139207235a0..44b2b937ab14 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -50,23 +50,20 @@ public class JdbcReadSchemaTransformProvider JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return JdbcReadSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + JdbcReadSchemaTransformConfiguration> + from(JdbcReadSchemaTransformConfiguration configuration) { configuration.validate(); - return new JdbcReadSchemaTransform(configuration); + return new JdbcReadSchemaTransform(configuration, identifier()); } - static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { + static class JdbcReadSchemaTransform extends SchemaTransform + implements Serializable { JdbcReadSchemaTransformConfiguration config; - public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index a409b604b11f..fc4a45ad9af9 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -55,23 +55,21 @@ public class JdbcWriteSchemaTransformProvider JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return JdbcWriteSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + JdbcWriteSchemaTransformConfiguration> + from(JdbcWriteSchemaTransformConfiguration configuration) { configuration.validate(); - return new JdbcWriteSchemaTransform(configuration); + return new JdbcWriteSchemaTransform(configuration, identifier()); } - static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { + static class JdbcWriteSchemaTransform + extends SchemaTransform implements Serializable { JdbcWriteSchemaTransformConfiguration config; - public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + public JdbcWriteSchemaTransform( + JdbcWriteSchemaTransformConfiguration config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java index 9e030821e5ca..4b5d73153616 100644 --- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java @@ -57,13 +57,8 @@ public class JsonWriteTransformProvider private static final String WRITE_RESULTS = "output"; @Override - protected Class configurationClass() { - return JsonWriteConfiguration.class; - } - - @Override - protected SchemaTransform from(JsonWriteConfiguration configuration) { - return new JsonWriteTransform(configuration); + protected SchemaTransform from(JsonWriteConfiguration configuration) { + return new JsonWriteTransform(configuration, identifier()); } @Override @@ -110,11 +105,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for {@link JsonIO#write}. */ - protected static class JsonWriteTransform extends SchemaTransform { + protected static class JsonWriteTransform extends SchemaTransform { private final JsonWriteConfiguration configuration; - JsonWriteTransform(JsonWriteConfiguration configuration) { + JsonWriteTransform(JsonWriteConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 2776c388f7cc..d229e3d061a4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -89,16 +89,12 @@ public KafkaReadSchemaTransformProvider() { this.testTimeoutSecs = testTimeoutSecs; } - @Override - protected Class configurationClass() { - return KafkaReadSchemaTransformConfiguration.class; - } - @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @Override - protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { + protected SchemaTransform from( + KafkaReadSchemaTransformConfiguration configuration) { configuration.validate(); final String inputSchema = configuration.getSchema(); @@ -122,7 +118,8 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); if (confluentSchemaRegUrl != null) { - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { final String confluentSchemaRegSubject = @@ -173,7 +170,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema); } - return new SchemaTransform() { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { KafkaIO.Read kafkaRead = diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 26f37b790ef8..3942e0d09946 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -73,14 +73,9 @@ public class KafkaWriteSchemaTransformProvider LoggerFactory.getLogger(KafkaWriteSchemaTransformProvider.class); @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return KafkaWriteSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - KafkaWriteSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + KafkaWriteSchemaTransformConfiguration> + from(KafkaWriteSchemaTransformConfiguration configuration) { if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( "Format " @@ -89,13 +84,16 @@ public class KafkaWriteSchemaTransformProvider + "Supported formats are: " + String.join(", ", SUPPORTED_FORMATS)); } - return new KafkaWriteSchemaTransform(configuration); + return new KafkaWriteSchemaTransform(configuration, identifier()); } - static final class KafkaWriteSchemaTransform extends SchemaTransform implements Serializable { + static final class KafkaWriteSchemaTransform + extends SchemaTransform implements Serializable { final KafkaWriteSchemaTransformConfiguration configuration; - KafkaWriteSchemaTransform(KafkaWriteSchemaTransformConfiguration configuration) { + KafkaWriteSchemaTransform( + KafkaWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java index 9511a59d472c..c1044d950101 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java @@ -39,16 +39,11 @@ public class SingleStoreSchemaTransformReadProvider extends TypedSchemaTransformProvider { private static final String OUTPUT_TAG = "OUTPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return SingleStoreSchemaTransformReadConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) { - return new SingleStoreReadSchemaTransform(configuration); + protected SchemaTransform from( + SingleStoreSchemaTransformReadConfiguration configuration) { + return new SingleStoreReadSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -79,10 +74,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using * {@link SingleStoreSchemaTransformReadConfiguration}. */ - private static class SingleStoreReadSchemaTransform extends SchemaTransform { + private static class SingleStoreReadSchemaTransform + extends SchemaTransform { private final SingleStoreSchemaTransformReadConfiguration configuration; - SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) { + SingleStoreReadSchemaTransform( + SingleStoreSchemaTransformReadConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java index dafa10087a4a..f24db96eb1d8 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java @@ -43,16 +43,11 @@ public class SingleStoreSchemaTransformWriteProvider private static final String OUTPUT_TAG = "OUTPUT"; public static final String INPUT_TAG = "INPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return SingleStoreSchemaTransformWriteConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) { - return new SingleStoreWriteSchemaTransform(configuration); + protected SchemaTransform from( + SingleStoreSchemaTransformWriteConfiguration configuration) { + return new SingleStoreWriteSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -83,10 +78,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using * {@link SingleStoreSchemaTransformWriteConfiguration}. */ - private static class SingleStoreWriteSchemaTransform extends SchemaTransform { + private static class SingleStoreWriteSchemaTransform + extends SchemaTransform { private final SingleStoreSchemaTransformWriteConfiguration configuration; - SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { + SingleStoreWriteSchemaTransform( + SingleStoreSchemaTransformWriteConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java index e1a280f1594e..1d0eeec3f7d6 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java @@ -136,7 +136,7 @@ private PipelineResult runWrite() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); Schema.Builder schemaBuilder = new Schema.Builder(); schemaBuilder.addField("id", Schema.FieldType.INT32); @@ -190,7 +190,7 @@ private PipelineResult runRead() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead); String tag = provider.outputCollectionNames().get(0); @@ -221,7 +221,7 @@ private PipelineResult runReadWithPartitions() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions); String tag = provider.outputCollectionNames().get(0); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index b2b010b1e434..26428bc532d5 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; @@ -128,7 +129,8 @@ public static ManagedTransform write(String sink) { } @AutoValue - public abstract static class ManagedTransform extends SchemaTransform { + public abstract static class ManagedTransform + extends PTransform { abstract String getIdentifier(); abstract @Nullable String getConfig(); @@ -186,7 +188,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = + SchemaTransform underlyingTransform = new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); return input.apply(underlyingTransform); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 1ee2b11a90ff..7c05a4c2c29e 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -79,7 +79,7 @@ public ManagedSchemaTransformProvider() {} @DefaultSchema(AutoValueSchema.class) @AutoValue @VisibleForTesting - abstract static class ManagedConfig { + public abstract static class ManagedConfig { public static Builder builder() { return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); } @@ -114,7 +114,7 @@ protected void validate() { } @Override - protected SchemaTransform from(ManagedConfig managedConfig) { + protected SchemaTransform from(ManagedConfig managedConfig) { managedConfig.validate(); SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( @@ -135,22 +135,27 @@ protected SchemaTransform from(ManagedConfig managedConfig) { e); } - return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); + return new ManagedSchemaTransform( + managedConfig, identifier(), transformConfig, schemaTransformProvider); } - private static class ManagedSchemaTransform extends SchemaTransform { + private static class ManagedSchemaTransform extends SchemaTransform { private final Row transformConfig; private final SchemaTransformProvider underlyingTransformProvider; ManagedSchemaTransform( - Row transformConfig, SchemaTransformProvider underlyingTransformProvider) { - this.transformConfig = transformConfig; + ManagedConfig managedConfig, + String managedIdentifier, + Row underlyingTransformConfig, + SchemaTransformProvider underlyingTransformProvider) { + super(managedConfig, managedIdentifier); + this.transformConfig = underlyingTransformConfig; this.underlyingTransformProvider = underlyingTransformProvider; } @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); + SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); return input.apply(underlyingTransform); } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java index 136d98d468d0..7a411875f288 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java @@ -61,10 +61,10 @@ public abstract static class Builder { } @Override - public SchemaTransform from(Config config) { + public SchemaTransform from(Config config) { String extraString = config.getExtraString(); Integer extraInteger = config.getExtraInteger(); - return new SchemaTransform() { + return new SchemaTransform(config, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { Schema schema = From e770eacca77d13d710adad0d32e06e864d8f7d02 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 12 Apr 2024 01:05:20 -0400 Subject: [PATCH 2/4] tests --- .../schemas/transforms/SchemaTransform.java | 9 ++- ...chemaTransformProviderTranslationTest.java | 78 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index ef6ebcd2a92d..bcd201a86b63 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -61,8 +62,14 @@ protected SchemaTransform(ConfigT configuration, String identifier) { Class typedClass = (Class) parameterizedType.getActualTypeArguments()[0]; try { + // Get initial row with values + Row row = SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration); + // Get sorted Schema and recreate the Row + Schema configurationSchema = SchemaRegistry.createDefault().getSchema(typedClass).sorted(); this.configurationRow = - SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration); + configurationSchema.getFields().stream() + .map(field -> row.getValue(field.getName())) + .collect(Row.toRow(configurationSchema)); } catch (NoSuchSchemaException e) { throw new RuntimeException("Unable to find schema for this SchemaTransform's config.", e); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java new file mode 100644 index 000000000000..06983de9e295 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation.SchemaTransformTranslator; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.transforms.providers.FlattenTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.JavaExplodeTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.LoggingTransformProvider; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; + +public class SchemaTransformProviderTranslationTest { + public void translateAndRunChecks(SchemaTransformProvider provider, Row originalRow) { + SchemaTransform transform = provider.from(originalRow); + + SchemaTransformTranslator translator = new SchemaTransformTranslator(provider.identifier()); + Row rowFromTransform = translator.toConfigRow(transform); + + SchemaTransform transformFromRow = + translator.fromConfigRow(rowFromTransform, PipelineOptionsFactory.create()); + + assertEquals(originalRow, rowFromTransform); + assertEquals(originalRow, transformFromRow.getConfigurationRow()); + assertEquals( + provider.configurationSchema(), transformFromRow.getConfigurationRow().getSchema()); + assertEquals(provider.identifier(), transformFromRow.getIdentifier()); + } + + @Test + public void testReCreateJavaExplodeTransform() { + JavaExplodeTransformProvider provider = new JavaExplodeTransformProvider(); + + Row originalRow = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("crossProduct", true) + .withFieldValue("fields", Arrays.asList("a", "c")) + .build(); + + translateAndRunChecks(provider, originalRow); + } + + @Test + public void testReCreateFlattenTransform() { + FlattenTransformProvider provider = new FlattenTransformProvider(); + Row originalRow = Row.withSchema(provider.configurationSchema()).build(); + translateAndRunChecks(provider, originalRow); + } + + @Test + public void testReCreateLoggingTransform() { + LoggingTransformProvider provider = new LoggingTransformProvider(); + Row originalRow = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("level", "INFO") + .withFieldValue("prefix", "some_prefix") + .build(); + translateAndRunChecks(provider, originalRow); + } +} From 3184255be35e122c20335af8d3a9e987d8589d8d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 14 Apr 2024 19:26:40 -0400 Subject: [PATCH 3/4] fix debezium test --- .../apache/beam/sdk/schemas/transforms/SchemaTransform.java | 5 +++-- .../io/debezium/DebeziumReadSchemaTransformProvider.java | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index bcd201a86b63..7a0d6f82e9cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -61,11 +61,12 @@ protected SchemaTransform(ConfigT configuration, String identifier) { Class typedClass = (Class) parameterizedType.getActualTypeArguments()[0]; + SchemaRegistry registry = SchemaRegistry.createDefault(); try { // Get initial row with values - Row row = SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration); + Row row = registry.getToRowFunction(typedClass).apply(configuration); // Get sorted Schema and recreate the Row - Schema configurationSchema = SchemaRegistry.createDefault().getSchema(typedClass).sorted(); + Schema configurationSchema = registry.getSchema(typedClass).sorted(); this.configurationRow = configurationSchema.getFields().stream() .map(field -> row.getValue(field.getName())) diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index e4103b8b3150..9b1314a2c750 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -26,7 +26,9 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -169,6 +171,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return Collections.singletonList("output"); } + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class DebeziumReadSchemaTransformConfiguration { public abstract String getUsername(); From 90df729ca823af2e081d3aa37511eeff012c0db4 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 10 Jun 2024 11:35:23 -0400 Subject: [PATCH 4/4] checkpoint --- .../schemas/transforms/SchemaTransform.java | 44 +++++---- .../transforms/SchemaTransformProvider.java | 2 +- .../SchemaTransformProviderTranslation.java | 94 ++++++++----------- .../TypedSchemaTransformProvider.java | 4 +- .../construction/PTransformTranslation.java | 72 ++++++++++++++ .../util/construction/TransformUpgrader.java | 20 ---- ...chemaTransformProviderTranslationTest.java | 1 - .../TypedSchemaTransformProviderTest.java | 9 +- .../expansion/service/ExpansionService.java | 23 +++-- 9 files changed, 159 insertions(+), 110 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index a9f04ced463a..cf323c093432 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -21,6 +21,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.lang.reflect.ParameterizedType; + +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.schemas.NoSuchSchemaException; @@ -42,42 +44,46 @@ * compatibility guarantees and it should not be implemented outside of the Beam repository. */ @Internal -public abstract class SchemaTransform +public abstract class SchemaTransform extends PTransform { - private final Row configurationRow; - private final String identifier; + private @Nullable Row configurationRow; + private @Nullable String identifier; + private boolean registered = false; - @SuppressWarnings("unchecked") - protected SchemaTransform(ConfigT configuration, String identifier) { + public SchemaTransform register(Row configurationRow, String identifier) { + this.configurationRow = configurationRow; this.identifier = identifier; - @Nullable - ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass(); - checkStateNotNull(parameterizedType, "Could not get the SchemaTransform's parameterized type."); - checkArgument( - parameterizedType.getActualTypeArguments().length == 1, - String.format( - "Expected one parameterized type, but got %s.", - parameterizedType.getActualTypeArguments().length)); + registered = true; - Class configType = (Class) parameterizedType.getActualTypeArguments()[0]; + return this; + } + public SchemaTransform register(ConfigT configuration, Class configClass, String identifier) { SchemaRegistry registry = SchemaRegistry.createDefault(); try { // Get initial row with values // sort lexicographically and convert field names to snake_case - this.configurationRow = registry.getToRowFunction(configType).apply(configuration) + Row configRow = registry + .getToRowFunction(configClass) + .apply(configuration) .sorted() .toSnakeCase(); + return register(configRow, identifier); } catch (NoSuchSchemaException e) { - throw new RuntimeException("Unable to find schema for this SchemaTransform's config.", e); + throw new RuntimeException( + String.format("Unable to find schema for this SchemaTransform's config type: %s", configClass), e); } } public Row getConfigurationRow() { - return configurationRow; + return Preconditions.checkNotNull(configurationRow, "Could not fetch SchemaTransform's configuration. " + + "Please store it using SchemaTransform::register."); } - public String getIdentifier() { - return identifier; + return Preconditions.checkNotNull(identifier, "Could not fetch SchemaTransform's identifier. " + + "Please store it using SchemaTransform::register."); + } + public boolean isRegistered() { + return registered; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index 1945a7a993ea..9d0ad61b7a6c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -56,7 +56,7 @@ default String description() { * Produce a {@link SchemaTransform} from some transform-specific configuration object. Can throw * a {@link InvalidConfigurationException} or a {@link InvalidSchemaException}. */ - SchemaTransform from(Row configuration); + SchemaTransform from(Row configuration); /** Returns the input collection names of this transform. */ default List inputCollectionNames() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java index 555d3a244083..e2244e858b87 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java @@ -17,98 +17,86 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; -import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; + +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.util.construction.SdkComponents; -import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.checkerframework.checker.nullness.qual.Nullable; public class SchemaTransformProviderTranslation { public static class SchemaTransformTranslator - implements TransformPayloadTranslator> { - private final String identifier; - private SchemaTransformProvider provider; + implements TransformPayloadTranslator { + private final SchemaTransformProvider provider; - public SchemaTransformTranslator(String identifier) { - this.identifier = identifier; - try { - for (SchemaTransformProvider schemaTransformProvider : - ServiceLoader.load(SchemaTransformProvider.class)) { - if (schemaTransformProvider.identifier().equalsIgnoreCase(identifier)) { - if (this.provider != null) { - throw new IllegalArgumentException( - "Found multiple SchemaTransformProvider implementations with the same identifier " - + identifier); - } - this.provider = schemaTransformProvider; - } - } - if (this.provider == null) { - throw new IllegalArgumentException( - "Could not find SchemaTransformProvider implementation for identifier " + identifier); - } - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } + public SchemaTransformTranslator(SchemaTransformProvider provider) { + this.provider = provider; } @Override public String getUrn() { - return identifier; + return BeamUrns.getUrn(SCHEMA_TRANSFORM); } @Override @SuppressWarnings("argument") public @Nullable FunctionSpec translate( - AppliedPTransform> application, SdkComponents components) + AppliedPTransform application, SdkComponents components) throws IOException { - return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build(); + SchemaApi.Schema expansionSchema = + SchemaTranslation.schemaToProto(provider.configurationSchema(), true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(provider.configurationSchema()).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + ExternalTransforms.SchemaTransformPayload.newBuilder() + .setIdentifier(provider.identifier()) + .setConfigurationSchema(expansionSchema) + .setConfigurationRow(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); } @Override - public Row toConfigRow(SchemaTransform transform) { + public Row toConfigRow(SchemaTransform transform) { return transform.getConfigurationRow(); } @Override - public SchemaTransform fromConfigRow(Row configRow, PipelineOptions options) { + public SchemaTransform fromConfigRow(Row configRow, PipelineOptions options) { return provider.from(configRow); } } - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class SchemaTransformRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - Map, SchemaTransformTranslator> translators = new HashMap<>(); - try { - for (SchemaTransformProvider schemaTransformProvider : - ServiceLoader.load(SchemaTransformProvider.class)) { - translators.put( - SchemaTransform.class, - new SchemaTransformTranslator(schemaTransformProvider.identifier())); - } - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } - - return translators; + private static Map cachedTranslators; + public static Map getDefaultTranslators() { + if (cachedTranslators != null) { + return cachedTranslators; + } + cachedTranslators = new HashMap<>(); + for (SchemaTransformProvider provider : ServiceLoader.load(SchemaTransformProvider.class)) { + cachedTranslators.put(provider.identifier(), new SchemaTransformTranslator(provider)); } + return cachedTranslators; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 1cb5deb010f2..d5c6c724c6f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -67,7 +67,7 @@ protected Class configurationClass() { * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a * {@link InvalidSchemaException}. */ - protected abstract SchemaTransform from(ConfigT configuration); + protected abstract SchemaTransform from(ConfigT configuration); /** * List the dependencies needed for this transform. Jars from classpath are used by default when @@ -92,7 +92,7 @@ public Schema configurationSchema() { /** Produces a {@link SchemaTransform} from a Row configuration. */ @Override - public SchemaTransform from(Row configuration) { + public SchemaTransform from(Row configuration) { return from(configFromRow(configuration)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 3d9142723737..7cf95bded30b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -46,6 +46,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; @@ -59,6 +61,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedSet; @@ -251,6 +254,7 @@ private static Collection> loadKnownTranslators() { (Comparator) ObjectsClassComparator.INSTANCE) .add(new RawPTransformTranslator()) .add(new KnownTransformPayloadTranslator()) + .add(new SchemaTransformPayloadTranslator()) .add(ParDoTranslator.create()) .add(ExternalTranslator.create()) .build(); @@ -581,6 +585,74 @@ public RunnerApi.PTransform translate( } } + /** + * Translates {@link SchemaTransform}s by populating the {@link FunctionSpec} with a + * {@link ExternalTransforms.SchemaTransformPayload} containing the transform's configuration + * {@link Schema} and {@link Row}. + */ + private static class SchemaTransformPayloadTranslator implements TransformTranslator { + @Override + public @Nullable String getUrn(SchemaTransform transform) { + return transform.getIdentifier(); + } + + @Override + public boolean canTranslate(PTransform transform) { + // Can translate only if the SchemaTransform implementation registers its + // configuration Row and identifier + if (transform instanceof SchemaTransform) { + return (((SchemaTransform) transform).isRegistered()); + } + return false; + } + + @Override + public RunnerApi.PTransform translate( + AppliedPTransform appliedPTransform, + List subtransforms, + SdkComponents components) throws IOException { + RunnerApi.PTransform.Builder transformBuilder = + translateAppliedPTransform(appliedPTransform, subtransforms, components); + + String identifier = ((SchemaTransform) appliedPTransform.getTransform()).getIdentifier(); + TransformPayloadTranslator payloadTranslator = SchemaTransformProviderTranslation.getDefaultTranslators().get(identifier); + + FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components); + Row configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform()); + + if (spec != null) { + transformBuilder.setSpec(spec); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { + String underlyingIdentifier = Preconditions.checkNotNull( + configRow.getString("transform_identifier"), + "Encountered a Managed Transform that has an empty \"transform_identifier\": %n%s", + configRow); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.MANAGED_UNDERLYING_TRANSFORM_URN_KEY), + ByteString.copyFromUtf8(underlyingIdentifier)); + } + } + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_KEY), + ByteString.copyFrom( + CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow))); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_SCHEMA_KEY), + ByteString.copyFrom( + SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray())); + + for (Entry annotation : + appliedPTransform.getTransform().getAnnotations().entrySet()) { + transformBuilder.putAnnotations( + annotation.getKey(), ByteString.copyFrom(annotation.getValue())); + } + + return transformBuilder.build(); + } + } /** * Translates an {@link AppliedPTransform} by: * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index 0e99ef36ff92..a4e56bbe6c24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -454,26 +454,6 @@ public void close() throws Exception { return null; } - /** - * Like {@link #findUpgradeURN(PTransform)} but for {@link SchemaTransform}s - * - *

Finds a SchemaTransform by comparing the underlying URN. - * - * @param transform transform to lookup. - * @return a URN if discovered. Returns {@code null} otherwise. - */ - @SuppressWarnings({"rawtypes"}) - public static @Nullable String findUpgradeSchemaTransformURN(SchemaTransform transform) { - for (Entry, ? extends TransformPayloadTranslator> entry : - new SchemaTransformRegistrar().getTransformPayloadTranslators().entrySet()) { - if (entry.getValue().getUrn().equals(transform.getIdentifier())) { - return entry.getValue().getUrn(); - } - } - - return null; - } - /** * A utility method that converts an arbitrary serializable object into a byte array. * diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java index 06983de9e295..d4a9341b866c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java @@ -42,7 +42,6 @@ public void translateAndRunChecks(SchemaTransformProvider provider, Row original assertEquals(originalRow, transformFromRow.getConfigurationRow()); assertEquals( provider.configurationSchema(), transformFromRow.getConfigurationRow().getSchema()); - assertEquals(provider.identifier(), transformFromRow.getIdentifier()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index 5d7f40b43114..abbf80a561c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -69,7 +69,7 @@ public String description() { @Override public SchemaTransform from(Configuration config) { - return new FakeSchemaTransform(config, identifier()); + return new FakeSchemaTransform(config); } @Override @@ -100,7 +100,7 @@ public String identifier() { @Override public SchemaTransform from(Configuration config) { - return new FakeSchemaTransform(config, identifier()); + return new FakeSchemaTransform(config); } } @@ -108,8 +108,8 @@ public static class FakeSchemaTransform extends SchemaTransform { public Configuration config; - public FakeSchemaTransform(Configuration config, String identifier) { - super(config, identifier); + public FakeSchemaTransform(Configuration config) { + super(config); this.config = config; } @@ -137,7 +137,6 @@ public void testGetProperties() { SchemaTransform transform = minimalProvider.from(inputConfig); assertEquals(inputConfig, transform.getConfigurationRow()); - assertEquals(minimalProvider.identifier(), transform.getIdentifier()); } @Test diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 770da14fa1cf..dd4bde6665b9 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -68,18 +68,12 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.construction.BeamUrns; -import org.apache.beam.sdk.util.construction.Environments; -import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.*; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.beam.sdk.util.construction.RehydratedComponents; -import org.apache.beam.sdk.util.construction.SdkComponents; -import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PInput; @@ -139,11 +133,22 @@ public interface ExpansionServiceRegistrar { public static class ExternalTransformRegistrarLoader implements ExpansionService.ExpansionServiceRegistrar { + /** + * Map of known PTransform URNs (ie. transforms listed in a {@link TransformPayloadTranslatorRegistrar}) + * and their corresponding TransformProviders. + */ @Override public Map knownTransforms() { Map providers = new HashMap<>(); - // First check and register ExternalTransformBuilder in serviceloader style, converting + // First populate with SchemaTransform URNs and their default translator implementation. + // These can be overwritten below if a custom translator for a given URN is found. + Map defaultSchemaTransformTranslators = SchemaTransformProviderTranslation.getDefaultTranslators(); + for (Map.Entry entry: defaultSchemaTransformTranslators.entrySet()) { + providers.put(entry.getKey(), new TransformProviderForPayloadTranslator(entry.getValue())); + } + + // Then check and register ExternalTransformBuilder in serviceloader style, converting // to TransformProvider after validation. Map registeredBuilders = loadTransformBuilders(); for (Map.Entry registeredBuilder :