diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java index d9dfc2a90bd8..710e3be73d98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java @@ -75,13 +75,9 @@ public String description() { } @Override - public Class configurationClass() { - return GenerateSequenceConfiguration.class; - } - - @Override - public SchemaTransform from(GenerateSequenceConfiguration configuration) { - return new GenerateSequenceSchemaTransform(configuration); + public SchemaTransform from( + GenerateSequenceConfiguration configuration) { + return new GenerateSequenceSchemaTransform(configuration, identifier()); } @DefaultSchema(AutoValueSchema.class) @@ -163,10 +159,13 @@ public void validate() { } } - protected static class GenerateSequenceSchemaTransform extends SchemaTransform { + protected static class GenerateSequenceSchemaTransform + extends SchemaTransform { private final GenerateSequenceConfiguration configuration; - GenerateSequenceSchemaTransform(GenerateSequenceConfiguration configuration) { + GenerateSequenceSchemaTransform( + GenerateSequenceConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index 283720e09772..cf323c093432 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -17,9 +17,20 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.ParameterizedType; + +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; /** * An abstraction representing schema capable and aware transforms. The interface is intended to be @@ -34,4 +45,45 @@ */ @Internal public abstract class SchemaTransform - extends PTransform {} + extends PTransform { + private @Nullable Row configurationRow; + private @Nullable String identifier; + private boolean registered = false; + + public SchemaTransform register(Row configurationRow, String identifier) { + this.configurationRow = configurationRow; + this.identifier = identifier; + registered = true; + + return this; + } + + public SchemaTransform register(ConfigT configuration, Class configClass, String identifier) { + SchemaRegistry registry = SchemaRegistry.createDefault(); + try { + // Get initial row with values + // sort lexicographically and convert field names to snake_case + Row configRow = registry + .getToRowFunction(configClass) + .apply(configuration) + .sorted() + .toSnakeCase(); + return register(configRow, identifier); + } catch (NoSuchSchemaException e) { + throw new RuntimeException( + String.format("Unable to find schema for this SchemaTransform's config type: %s", configClass), e); + } + } + + public Row getConfigurationRow() { + return Preconditions.checkNotNull(configurationRow, "Could not fetch SchemaTransform's configuration. " + + "Please store it using SchemaTransform::register."); + } + public String getIdentifier() { + return Preconditions.checkNotNull(identifier, "Could not fetch SchemaTransform's identifier. " + + "Please store it using SchemaTransform::register."); + } + public boolean isRegistered() { + return registered; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java new file mode 100644 index 000000000000..e2244e858b87 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslation.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class SchemaTransformProviderTranslation { + public static class SchemaTransformTranslator + implements TransformPayloadTranslator { + private final SchemaTransformProvider provider; + + public SchemaTransformTranslator(SchemaTransformProvider provider) { + this.provider = provider; + } + + @Override + public String getUrn() { + return BeamUrns.getUrn(SCHEMA_TRANSFORM); + } + + @Override + @SuppressWarnings("argument") + public @Nullable FunctionSpec translate( + AppliedPTransform application, SdkComponents components) + throws IOException { + SchemaApi.Schema expansionSchema = + SchemaTranslation.schemaToProto(provider.configurationSchema(), true); + Row configRow = toConfigRow(application.getTransform()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + RowCoder.of(provider.configurationSchema()).encode(configRow, os); + + return FunctionSpec.newBuilder() + .setUrn(getUrn()) + .setPayload( + ExternalTransforms.SchemaTransformPayload.newBuilder() + .setIdentifier(provider.identifier()) + .setConfigurationSchema(expansionSchema) + .setConfigurationRow(ByteString.copyFrom(os.toByteArray())) + .build() + .toByteString()) + .build(); + } + + @Override + public Row toConfigRow(SchemaTransform transform) { + return transform.getConfigurationRow(); + } + + @Override + public SchemaTransform fromConfigRow(Row configRow, PipelineOptions options) { + return provider.from(configRow); + } + } + + + private static Map cachedTranslators; + public static Map getDefaultTranslators() { + if (cachedTranslators != null) { + return cachedTranslators; + } + cachedTranslators = new HashMap<>(); + for (SchemaTransformProvider provider : ServiceLoader.load(SchemaTransformProvider.class)) { + cachedTranslators.put(provider.identifier(), new SchemaTransformTranslator(provider)); + } + return cachedTranslators; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java index baeccd1ac8cc..63bc52754c9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/FlattenTransformProvider.java @@ -49,13 +49,8 @@ public class FlattenTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { + protected SchemaTransform from(Configuration configuration) { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.of( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java index 48ce5e33d9fa..ee56c706f652 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaExplodeTransformProvider.java @@ -56,13 +56,8 @@ public class JavaExplodeTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new ExplodeTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new ExplodeTransform(configuration, identifier()); } @Override @@ -105,11 +100,11 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for Explode. */ - protected static class ExplodeTransform extends SchemaTransform { - + protected static class ExplodeTransform extends SchemaTransform { private final Configuration configuration; - ExplodeTransform(Configuration configuration) { + ExplodeTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java index 4ae8d2e41b30..f9277e35033c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaFilterTransformProvider.java @@ -54,13 +54,8 @@ public class JavaFilterTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new JavaFilterTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new JavaFilterTransform(configuration, identifier()); } @Override @@ -107,11 +102,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for Filter-java. */ - protected static class JavaFilterTransform extends SchemaTransform { + protected static class JavaFilterTransform extends SchemaTransform { private final Configuration configuration; - JavaFilterTransform(Configuration configuration) { + JavaFilterTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java index 2e2042aef05d..bbec95d408ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -56,13 +56,8 @@ public class JavaMapToFieldsTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new JavaMapToFieldsTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new JavaMapToFieldsTransform(configuration, identifier()); } @Override @@ -119,11 +114,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for MapToFields-java. */ - protected static class JavaMapToFieldsTransform extends SchemaTransform { + protected static class JavaMapToFieldsTransform extends SchemaTransform { private final Configuration configuration; - JavaMapToFieldsTransform(Configuration configuration) { + JavaMapToFieldsTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java index 25efaeae2a0e..6490db3006ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/LoggingTransformProvider.java @@ -63,13 +63,8 @@ public class LoggingTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - return new LoggingTransform(configuration); + protected SchemaTransform from(Configuration configuration) { + return new LoggingTransform(configuration, identifier()); } @Override @@ -134,13 +129,14 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for logging. */ - protected static class LoggingTransform extends SchemaTransform { + protected static class LoggingTransform extends SchemaTransform { private static final Logger LOG = LoggerFactory.getLogger(LoggingTransform.class); private final Configuration configuration; - LoggingTransform(Configuration configuration) { + LoggingTransform(Configuration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 3d9142723737..7cf95bded30b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -46,6 +46,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; @@ -59,6 +61,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedSet; @@ -251,6 +254,7 @@ private static Collection> loadKnownTranslators() { (Comparator) ObjectsClassComparator.INSTANCE) .add(new RawPTransformTranslator()) .add(new KnownTransformPayloadTranslator()) + .add(new SchemaTransformPayloadTranslator()) .add(ParDoTranslator.create()) .add(ExternalTranslator.create()) .build(); @@ -581,6 +585,74 @@ public RunnerApi.PTransform translate( } } + /** + * Translates {@link SchemaTransform}s by populating the {@link FunctionSpec} with a + * {@link ExternalTransforms.SchemaTransformPayload} containing the transform's configuration + * {@link Schema} and {@link Row}. + */ + private static class SchemaTransformPayloadTranslator implements TransformTranslator { + @Override + public @Nullable String getUrn(SchemaTransform transform) { + return transform.getIdentifier(); + } + + @Override + public boolean canTranslate(PTransform transform) { + // Can translate only if the SchemaTransform implementation registers its + // configuration Row and identifier + if (transform instanceof SchemaTransform) { + return (((SchemaTransform) transform).isRegistered()); + } + return false; + } + + @Override + public RunnerApi.PTransform translate( + AppliedPTransform appliedPTransform, + List subtransforms, + SdkComponents components) throws IOException { + RunnerApi.PTransform.Builder transformBuilder = + translateAppliedPTransform(appliedPTransform, subtransforms, components); + + String identifier = ((SchemaTransform) appliedPTransform.getTransform()).getIdentifier(); + TransformPayloadTranslator payloadTranslator = SchemaTransformProviderTranslation.getDefaultTranslators().get(identifier); + + FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components); + Row configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform()); + + if (spec != null) { + transformBuilder.setSpec(spec); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { + String underlyingIdentifier = Preconditions.checkNotNull( + configRow.getString("transform_identifier"), + "Encountered a Managed Transform that has an empty \"transform_identifier\": %n%s", + configRow); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.MANAGED_UNDERLYING_TRANSFORM_URN_KEY), + ByteString.copyFromUtf8(underlyingIdentifier)); + } + } + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_KEY), + ByteString.copyFrom( + CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow))); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_SCHEMA_KEY), + ByteString.copyFrom( + SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray())); + + for (Entry annotation : + appliedPTransform.getTransform().getAnnotations().entrySet()) { + transformBuilder.putAnnotations( + annotation.getKey(), ByteString.copyFrom(annotation.getValue())); + } + + return transformBuilder.build(); + } + } /** * Translates an {@link AppliedPTransform} by: * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index f88d55a1f5cf..a4e56bbe6c24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util.construction; import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation.SchemaTransformRegistrar; import com.fasterxml.jackson.core.Version; import java.io.ByteArrayInputStream; @@ -46,6 +47,7 @@ import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher; import org.apache.beam.sdk.util.ReleaseInfo; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java index dcff3dedb843..4e34cde27767 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProviderTest.java @@ -41,7 +41,7 @@ public class GenerateSequenceSchemaTransformProviderTest { public void testGenerateSequence() { GenerateSequenceConfiguration config = GenerateSequenceConfiguration.builder().setStart(0L).setEnd(10L).build(); - SchemaTransform sequence = new GenerateSequenceSchemaTransformProvider().from(config); + SchemaTransform sequence = new GenerateSequenceSchemaTransformProvider().from(config); List expected = new ArrayList<>(10); for (long i = 0L; i < 10L; i++) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java new file mode 100644 index 000000000000..d4a9341b866c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProviderTranslationTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation.SchemaTransformTranslator; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.transforms.providers.FlattenTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.JavaExplodeTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.LoggingTransformProvider; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; + +public class SchemaTransformProviderTranslationTest { + public void translateAndRunChecks(SchemaTransformProvider provider, Row originalRow) { + SchemaTransform transform = provider.from(originalRow); + + SchemaTransformTranslator translator = new SchemaTransformTranslator(provider.identifier()); + Row rowFromTransform = translator.toConfigRow(transform); + + SchemaTransform transformFromRow = + translator.fromConfigRow(rowFromTransform, PipelineOptionsFactory.create()); + + assertEquals(originalRow, rowFromTransform); + assertEquals(originalRow, transformFromRow.getConfigurationRow()); + assertEquals( + provider.configurationSchema(), transformFromRow.getConfigurationRow().getSchema()); + } + + @Test + public void testReCreateJavaExplodeTransform() { + JavaExplodeTransformProvider provider = new JavaExplodeTransformProvider(); + + Row originalRow = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("crossProduct", true) + .withFieldValue("fields", Arrays.asList("a", "c")) + .build(); + + translateAndRunChecks(provider, originalRow); + } + + @Test + public void testReCreateFlattenTransform() { + FlattenTransformProvider provider = new FlattenTransformProvider(); + Row originalRow = Row.withSchema(provider.configurationSchema()).build(); + translateAndRunChecks(provider, originalRow); + } + + @Test + public void testReCreateLoggingTransform() { + LoggingTransformProvider provider = new LoggingTransformProvider(); + Row originalRow = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("level", "INFO") + .withFieldValue("prefix", "some_prefix") + .build(); + translateAndRunChecks(provider, originalRow); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index b1dc0911a927..abbf80a561c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -68,12 +68,7 @@ public String description() { } @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - public SchemaTransform from(Configuration config) { + public SchemaTransform from(Configuration config) { return new FakeSchemaTransform(config); } @@ -104,16 +99,17 @@ public String identifier() { } @Override - public SchemaTransform from(Configuration config) { + public SchemaTransform from(Configuration config) { return new FakeSchemaTransform(config); } } - public static class FakeSchemaTransform extends SchemaTransform { + public static class FakeSchemaTransform extends SchemaTransform { public Configuration config; public FakeSchemaTransform(Configuration config) { + super(config); this.config = config; } @@ -123,6 +119,26 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + @Test + public void testInferConfigurationClass() { + assertEquals(Configuration.class, new FakeTypedSchemaIOProvider().configurationClass()); + assertEquals(Configuration.class, new FakeMinimalTypedProvider().configurationClass()); + } + + @Test + public void testGetProperties() { + SchemaTransformProvider minimalProvider = new FakeMinimalTypedProvider(); + Row inputConfig = + Row.withSchema(minimalProvider.configurationSchema()) + .withFieldValue("field1", "field1") + .withFieldValue("field2", Integer.valueOf(13)) + .build(); + + SchemaTransform transform = minimalProvider.from(inputConfig); + + assertEquals(inputConfig, transform.getConfigurationRow()); + } + @Test public void testFrom() { SchemaTransformProvider provider = new FakeTypedSchemaIOProvider(); diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 770da14fa1cf..dd4bde6665b9 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -68,18 +68,12 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProviderTranslation; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.construction.BeamUrns; -import org.apache.beam.sdk.util.construction.Environments; -import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.*; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.beam.sdk.util.construction.RehydratedComponents; -import org.apache.beam.sdk.util.construction.SdkComponents; -import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PInput; @@ -139,11 +133,22 @@ public interface ExpansionServiceRegistrar { public static class ExternalTransformRegistrarLoader implements ExpansionService.ExpansionServiceRegistrar { + /** + * Map of known PTransform URNs (ie. transforms listed in a {@link TransformPayloadTranslatorRegistrar}) + * and their corresponding TransformProviders. + */ @Override public Map knownTransforms() { Map providers = new HashMap<>(); - // First check and register ExternalTransformBuilder in serviceloader style, converting + // First populate with SchemaTransform URNs and their default translator implementation. + // These can be overwritten below if a custom translator for a given URN is found. + Map defaultSchemaTransformTranslators = SchemaTransformProviderTranslation.getDefaultTranslators(); + for (Map.Entry entry: defaultSchemaTransformTranslators.entrySet()) { + providers.put(entry.getKey(), new TransformProviderForPayloadTranslator(entry.getValue())); + } + + // Then check and register ExternalTransformBuilder in serviceloader style, converting // to TransformProvider after validation. Map registeredBuilders = loadTransformBuilders(); for (Map.Entry registeredBuilder : diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java index d060d5916e9f..a220520625f7 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -53,22 +53,8 @@ public class WindowIntoTransformProvider protected static final String OUTPUT_ROWS_TAG = "output"; @Override - protected Class configurationClass() { - return Configuration.class; - } - - @Override - protected SchemaTransform from(Configuration configuration) { - try { - return new WindowIntoStrategy( - (WindowingStrategy) - WindowingStrategyTranslation.fromProto( - RunnerApi.WindowingStrategy.parseFrom( - configuration.getSerializedWindowingStrategy()), - null)); - } catch (IOException exn) { - throw new RuntimeException(exn); - } + protected SchemaTransform from(Configuration configuration) { + return new WindowIntoStrategy(configuration, identifier()); } @Override @@ -106,12 +92,22 @@ public abstract static class Builder { } } - private static class WindowIntoStrategy extends SchemaTransform { + private static class WindowIntoStrategy extends SchemaTransform { private final WindowingStrategy windowingStrategy; - WindowIntoStrategy(WindowingStrategy windowingStrategy) { - this.windowingStrategy = windowingStrategy; + WindowIntoStrategy(Configuration configuration, String identifier) { + super(configuration, identifier); + try { + this.windowingStrategy = + (WindowingStrategy) + WindowingStrategyTranslation.fromProto( + RunnerApi.WindowingStrategy.parseFrom( + configuration.getSerializedWindowingStrategy()), + null); + } catch (IOException exn) { + throw new RuntimeException(exn); + } } @Override diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java index 294cb016c5e0..a8e74c0c467a 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java @@ -110,14 +110,9 @@ public static class TestSchemaTransformProvider extends TypedSchemaTransformProvider { @Override - protected Class configurationClass() { - return TestSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(TestSchemaTransformConfiguration configuration) { - return new TestSchemaTransform( - configuration.str1, configuration.str2, configuration.int1, configuration.int2); + protected SchemaTransform from( + TestSchemaTransformConfiguration configuration) { + return new TestSchemaTransform(configuration, identifier()); } @Override @@ -156,18 +151,20 @@ public void processElement(@Element String element, OutputReceiver recei } } - public static class TestSchemaTransform extends SchemaTransform { + public static class TestSchemaTransform + extends SchemaTransform { private String str1; private String str2; private Integer int1; private Integer int2; - public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; + public TestSchemaTransform(TestSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); + this.str1 = configuration.str1; + this.str2 = configuration.str2; + this.int1 = configuration.int1; + this.int2 = configuration.int2; } @Override @@ -208,14 +205,9 @@ public static class TestSchemaTransformProviderMultiInputMultiOutput extends TypedSchemaTransformProvider { @Override - protected Class configurationClass() { - return TestSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(TestSchemaTransformConfiguration configuration) { - return new TestSchemaTransformMultiInputOutput( - configuration.str1, configuration.str2, configuration.int1, configuration.int2); + protected SchemaTransform from( + TestSchemaTransformConfiguration configuration) { + return new TestSchemaTransformMultiInputOutput(configuration, identifier()); } @Override @@ -234,7 +226,8 @@ public List outputCollectionNames() { } } - public static class TestSchemaTransformMultiInputOutput extends SchemaTransform { + public static class TestSchemaTransformMultiInputOutput + extends SchemaTransform { private String str1; private String str2; @@ -242,11 +235,12 @@ public static class TestSchemaTransformMultiInputOutput extends SchemaTransform private Integer int2; public TestSchemaTransformMultiInputOutput( - String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; + TestSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); + this.str1 = configuration.str1; + this.str2 = configuration.str2; + this.int1 = configuration.int1; + this.int2 = configuration.int2; } @Override @@ -454,6 +448,7 @@ public void testSchematransformEquivalentConfigSchema() throws CoderException { assertEquals(transform.int2, equivalentTransform.int2); assertEquals(transform.str1, equivalentTransform.str1); assertEquals(transform.str2, equivalentTransform.str2); + assertEquals(transform.getConfigurationRow(), equivalentTransform.getConfigurationRow()); } private RunnerApi.FunctionSpec createSpec(String identifier, Row configRow) { diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java index f032da0799d8..08cf2585afcb 100644 --- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java +++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java @@ -117,8 +117,8 @@ public Schema configurationSchema() { } @Override - public SchemaTransform from(Row configuration) { - return new SqlSchemaTransform(configuration); + public SchemaTransform from(Row configuration) { + return new SqlSchemaTransform(configuration, identifier()); } @Override @@ -153,10 +153,11 @@ public PDone expand(PCollection input) { } } - static class SqlSchemaTransform extends SchemaTransform { + static class SqlSchemaTransform extends SchemaTransform { final Row config; - public SqlSchemaTransform(Row config) { + public SqlSchemaTransform(Row config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java index f4d54c408cf4..6ae0b84106a8 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -59,13 +59,8 @@ public class CsvWriteTransformProvider private static final String WRITE_RESULTS = "output"; @Override - protected Class configurationClass() { - return CsvWriteConfiguration.class; - } - - @Override - protected SchemaTransform from(CsvWriteConfiguration configuration) { - return new CsvWriteTransform(configuration); + protected SchemaTransform from(CsvWriteConfiguration configuration) { + return new CsvWriteTransform(configuration, identifier()); } @Override @@ -119,11 +114,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for {@link CsvIO#write}. */ - protected static class CsvWriteTransform extends SchemaTransform { + protected static class CsvWriteTransform extends SchemaTransform { private final CsvWriteConfiguration configuration; - CsvWriteTransform(CsvWriteConfiguration configuration) { + CsvWriteTransform(CsvWriteConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..9b1314a2c750 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -26,7 +26,9 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -75,16 +77,11 @@ protected DebeziumReadSchemaTransformProvider( } @Override - protected @NonNull @Initialized Class - configurationClass() { - return DebeziumReadSchemaTransformConfiguration.class; - } - - @Override - protected @NonNull @Initialized SchemaTransform from( + protected @NonNull @Initialized SchemaTransform from( DebeziumReadSchemaTransformConfiguration configuration) { // TODO(pabloem): Validate configuration parameters to ensure formatting is correct. - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // TODO(pabloem): Test this behavior @@ -174,6 +171,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return Collections.singletonList("output"); } + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class DebeziumReadSchemaTransformConfiguration { public abstract String getUsername(); diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java index 2ea3879adbcf..f2b835a3b796 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java @@ -66,13 +66,9 @@ public class FileReadSchemaTransformProvider static final String FILEPATTERN_ROW_FIELD_NAME = "filepattern"; @Override - protected Class configurationClass() { - return FileReadSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) { - return new FileReadSchemaTransform(configuration); + protected SchemaTransform from( + FileReadSchemaTransformConfiguration configuration) { + return new FileReadSchemaTransform(configuration, identifier()); } @Override @@ -91,11 +87,13 @@ public List outputCollectionNames() { } @VisibleForTesting - static class FileReadSchemaTransform extends SchemaTransform { + static class FileReadSchemaTransform + extends SchemaTransform { private FileReadSchemaTransformConfiguration configuration; private boolean useInputPCollection; - FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) { + FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; useInputPCollection = Strings.isNullOrEmpty(configuration.getFilepattern()); } diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java index f3c76897c0a3..0c92006dffdc 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java @@ -64,16 +64,11 @@ public class FileWriteSchemaTransformProvider static final TupleTag ERROR_TAG = new TupleTag() {}; static final TupleTag RESULT_TAG = new TupleTag() {}; - /** Provides the required {@link TypedSchemaTransformProvider#configurationClass()}. */ - @Override - protected Class configurationClass() { - return FileWriteSchemaTransformConfiguration.class; - } - /** Builds a {@link SchemaTransform} from a {@link FileWriteSchemaTransformConfiguration}. */ @Override - protected SchemaTransform from(FileWriteSchemaTransformConfiguration configuration) { - return new FileWriteSchemaTransform(configuration); + protected SchemaTransform from( + FileWriteSchemaTransformConfiguration configuration) { + return new FileWriteSchemaTransform(configuration, identifier()); } /** Returns the {@link TypedSchemaTransformProvider#identifier()} required for registration. */ @@ -99,11 +94,14 @@ public List outputCollectionNames() { * #inputCollectionNames()} tagged {@link Row}s into a {@link PCollectionRowTuple} of {@link * #outputCollectionNames()} tagged {@link Row}s. */ - static class FileWriteSchemaTransform extends SchemaTransform { + static class FileWriteSchemaTransform + extends SchemaTransform { final FileWriteSchemaTransformConfiguration configuration; - FileWriteSchemaTransform(FileWriteSchemaTransformConfiguration configuration) { + FileWriteSchemaTransform( + FileWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); validateConfiguration(configuration); this.configuration = configuration; } diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java index 5725ceff3a12..cfae5b953b35 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java @@ -90,7 +90,7 @@ public void runWriteAndReadTest( .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -186,7 +186,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(stringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create a PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java index d01e0051c7b8..5216616f6c1a 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java @@ -239,8 +239,8 @@ public void testWriteAndReadWithSchemaTransforms() { .setSchema(getStringSchemaFromBeamSchema(schema)) .build(); - SchemaTransform writeTransform = new FileWriteSchemaTransformProvider().from(writeConfig); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(readConfig); + SchemaTransform writeTransform = new FileWriteSchemaTransformProvider().from(writeConfig); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(readConfig); Schema filePatternSchema = getFilepatternSchema(); PCollection inputRows = writePipeline.apply(Create.of(rows).withRowSchema(schema)); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java index e733969eb3d1..fc577dfd411b 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java @@ -52,7 +52,7 @@ public class FileWriteSchemaTransformProviderTest { @Test public void receivedUnexpectedInputTagsThrowsAnError() { - SchemaTransform transform = PROVIDER.from(defaultConfiguration().setFormat(JSON).build()); + SchemaTransform transform = PROVIDER.from(defaultConfiguration().setFormat(JSON).build()); PCollectionRowTuple empty = PCollectionRowTuple.empty(errorPipeline); IllegalArgumentException emptyInputError = assertThrows(IllegalArgumentException.class, () -> empty.apply(transform)); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java index e4bc63cd8d8e..25c35768dc0c 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java @@ -188,7 +188,8 @@ public void runWriteAndReadTest( .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = + new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); List expectedRows = @@ -233,7 +234,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -299,7 +300,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(jsonStringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java index 4c5be258651f..61f2c114dc1f 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java @@ -98,7 +98,7 @@ public void testReadStrings() { .setFormat(getFormat()) .setFilepattern(filePath + "*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PCollection outputStrings = @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -192,7 +192,7 @@ public void testReadWithPCollectionOfFilepatterns() { // We will get filepatterns from the input PCollection, so don't set filepattern field here FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(getFormat()).build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = Schema.of(Field.of(FILEPATTERN_ROW_FIELD_NAME, FieldType.STRING)); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java index b1d6bba06ea9..08de27d05eaa 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java @@ -91,7 +91,7 @@ public void runWriteAndReadTest( .setFilepattern(folderPath + "/*") .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); @@ -130,7 +130,7 @@ public void testStreamingRead() { .setPollIntervalMillis(100L) .setTerminateAfterSecondsSinceNewOutput(3L) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); @@ -190,7 +190,7 @@ public void testReadWithPCollectionOfFilepatterns() { .setFormat(getFormat()) .setSchema(stringSchema) .build(); - SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); + SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); // Create an PCollection of filepatterns and feed into the read transform Schema patternSchema = getFilepatternSchema(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java index af6ab5c71c8b..71c9968bd93a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java @@ -54,16 +54,11 @@ public class BigQueryExportReadSchemaTransformProvider "beam:schematransform:org.apache.beam:bigquery_export_read:v1"; private static final String OUTPUT_TAG = "OUTPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return BigQueryExportReadSchemaTransformConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(BigQueryExportReadSchemaTransformConfiguration configuration) { - return new BigQueryExportSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryExportReadSchemaTransformConfiguration configuration) { + return new BigQueryExportSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -94,13 +89,16 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link * BigQueryExportReadSchemaTransformConfiguration}. */ - protected static class BigQueryExportSchemaTransform extends SchemaTransform { + protected static class BigQueryExportSchemaTransform + extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; private final BigQueryExportReadSchemaTransformConfiguration configuration; - BigQueryExportSchemaTransform(BigQueryExportReadSchemaTransformConfiguration configuration) { + BigQueryExportSchemaTransform( + BigQueryExportReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java index 3212e2a30348..f5734581f4ea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java @@ -62,16 +62,11 @@ public class BigQueryFileLoadsWriteSchemaTransformProvider "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1"; static final String INPUT_TAG = "INPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return BigQueryFileLoadsWriteSchemaTransformConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { - return new BigQueryWriteSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + return new BigQueryWriteSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -102,13 +97,16 @@ public List outputCollectionNames() { * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link * BigQueryFileLoadsWriteSchemaTransformConfiguration}. */ - protected static class BigQueryWriteSchemaTransform extends SchemaTransform { + protected static class BigQueryWriteSchemaTransform + extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; - BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + BigQueryWriteSchemaTransform( + BigQueryFileLoadsWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java index 8b8e8179ce7d..a4936aff2e53 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java @@ -65,13 +65,9 @@ public class BigQueryDirectReadSchemaTransformProvider private static final String OUTPUT_TAG = "OUTPUT_ROWS"; @Override - protected Class configurationClass() { - return BigQueryDirectReadSchemaTransformConfiguration.class; - } - - @Override - protected SchemaTransform from(BigQueryDirectReadSchemaTransformConfiguration configuration) { - return new BigQueryDirectReadSchemaTransform(configuration); + protected SchemaTransform from( + BigQueryDirectReadSchemaTransformConfiguration configuration) { + return new BigQueryDirectReadSchemaTransform(configuration, identifier()); } @Override @@ -161,13 +157,15 @@ public abstract static class Builder { * BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link * BigQueryDirectReadSchemaTransformProvider}. */ - protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform { + protected static class BigQueryDirectReadSchemaTransform + extends SchemaTransform { private BigQueryServices testBigQueryServices = null; private final BigQueryDirectReadSchemaTransformConfiguration configuration; BigQueryDirectReadSchemaTransform( - BigQueryDirectReadSchemaTransformConfiguration configuration) { + BigQueryDirectReadSchemaTransformConfiguration configuration, String identifier) { // Validate configuration parameters before PTransform expansion + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 980d783ec43c..5d9fde19fb3d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -89,9 +89,9 @@ public class BigQueryStorageWriteApiSchemaTransformProvider protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; @Override - protected SchemaTransform from( + protected SchemaTransform from( BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { - return new BigQueryStorageWriteApiSchemaTransform(configuration); + return new BigQueryStorageWriteApiSchemaTransform(configuration, identifier()); } @Override @@ -289,13 +289,15 @@ public abstract static class Builder { * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link * BigQueryStorageWriteApiSchemaTransformProvider}. */ - protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform { + protected static class BigQueryStorageWriteApiSchemaTransform + extends SchemaTransform { private BigQueryServices testBigQueryServices = null; private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; BigQueryStorageWriteApiSchemaTransform( - BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + BigQueryStorageWriteApiSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index f48a23559141..cd4e1f7c466f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -71,8 +71,9 @@ public class BigtableReadSchemaTransformProvider .build(); @Override - protected SchemaTransform from(BigtableReadSchemaTransformConfiguration configuration) { - return new BigtableReadSchemaTransform(configuration); + protected SchemaTransform from( + BigtableReadSchemaTransformConfiguration configuration) { + return new BigtableReadSchemaTransform(configuration, identifier()); } @Override @@ -128,10 +129,13 @@ public abstract static class Builder { * BigtableReadSchemaTransformConfiguration} and instantiated by {@link * BigtableReadSchemaTransformProvider}. */ - private static class BigtableReadSchemaTransform extends SchemaTransform { + private static class BigtableReadSchemaTransform + extends SchemaTransform { private final BigtableReadSchemaTransformConfiguration configuration; - BigtableReadSchemaTransform(BigtableReadSchemaTransformConfiguration configuration) { + BigtableReadSchemaTransform( + BigtableReadSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index cc480be6aa7e..0e58caa17242 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -61,8 +61,9 @@ public class BigtableWriteSchemaTransformProvider private static final String INPUT_TAG = "input"; @Override - protected SchemaTransform from(BigtableWriteSchemaTransformConfiguration configuration) { - return new BigtableWriteSchemaTransform(configuration); + protected SchemaTransform from( + BigtableWriteSchemaTransformConfiguration configuration) { + return new BigtableWriteSchemaTransform(configuration, identifier()); } @Override @@ -120,10 +121,13 @@ public abstract static class Builder { * BigtableWriteSchemaTransformConfiguration} and instantiated by {@link * BigtableWriteSchemaTransformProvider}. */ - private static class BigtableWriteSchemaTransform extends SchemaTransform { + private static class BigtableWriteSchemaTransform + extends SchemaTransform { private final BigtableWriteSchemaTransformConfiguration configuration; - BigtableWriteSchemaTransform(BigtableWriteSchemaTransformConfiguration configuration) { + BigtableWriteSchemaTransform( + BigtableWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java index c1f6b2b31754..791e161a367e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java @@ -70,12 +70,8 @@ public class PubsubReadSchemaTransformProvider Schema.builder().addStringField("error").addNullableByteArrayField("row").build(); @Override - public Class configurationClass() { - return PubsubReadSchemaTransformConfiguration.class; - } - - @Override - public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration) { + public SchemaTransform from( + PubsubReadSchemaTransformConfiguration configuration) { if (configuration.getSubscription() == null && configuration.getTopic() == null) { throw new IllegalArgumentException( "To read from Pubsub, a subscription name or a topic name must be provided"); @@ -122,7 +118,7 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration } PubsubReadSchemaTransform transform = - new PubsubReadSchemaTransform(configuration, payloadSchema, payloadMapper); + new PubsubReadSchemaTransform(identifier(), configuration, payloadSchema, payloadMapper); if (configuration.getClientFactory() != null) { transform.setClientFactory(configuration.getClientFactory()); @@ -134,7 +130,8 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration return transform; } - private static class PubsubReadSchemaTransform extends SchemaTransform implements Serializable { + private static class PubsubReadSchemaTransform + extends SchemaTransform implements Serializable { final Schema beamSchema; final SerializableFunction valueMapper; final PubsubReadSchemaTransformConfiguration configuration; @@ -142,9 +139,11 @@ private static class PubsubReadSchemaTransform extends SchemaTransform implement @Nullable Clock clock; PubsubReadSchemaTransform( + String identifier, PubsubReadSchemaTransformConfiguration configuration, Schema payloadSchema, SerializableFunction valueMapper) { + super(configuration, identifier); this.configuration = configuration; Schema outputSchema; List attributes = configuration.getAttributes(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java index 6187f6f79d3e..177ed2f5bb44 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -67,11 +67,6 @@ public class PubsubWriteSchemaTransformProvider public static final Set VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(",")); - @Override - public Class configurationClass() { - return PubsubWriteSchemaTransformConfiguration.class; - } - public static class ErrorFn extends DoFn { private final SerializableFunction valueMapper; private final @Nullable Set attributes; @@ -137,20 +132,24 @@ public void processElement(@Element Row row, MultiOutputReceiver receiver) throw } @Override - public SchemaTransform from(PubsubWriteSchemaTransformConfiguration configuration) { + public SchemaTransform from( + PubsubWriteSchemaTransformConfiguration configuration) { if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase())) { throw new IllegalArgumentException( String.format( "Format %s not supported. Only supported formats are %s", configuration.getFormat(), VALID_FORMATS_STR)); } - return new PubsubWriteSchemaTransform(configuration); + return new PubsubWriteSchemaTransform(configuration, identifier()); } - private static class PubsubWriteSchemaTransform extends SchemaTransform implements Serializable { + private static class PubsubWriteSchemaTransform + extends SchemaTransform implements Serializable { final PubsubWriteSchemaTransformConfiguration configuration; - PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration configuration) { + PubsubWriteSchemaTransform( + PubsubWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index 8afe730f32ce..1792e76d73d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -85,12 +85,6 @@ public class PubsubLiteReadSchemaTransformProvider public static final TupleTag OUTPUT_TAG = new TupleTag() {}; public static final TupleTag ERROR_TAG = new TupleTag() {}; - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return PubsubLiteReadSchemaTransformConfiguration.class; - } - public static class ErrorFn extends DoFn { private final SerializableFunction valueMapper; private final Counter errorCounter; @@ -192,8 +186,9 @@ public void finish(FinishBundleContext c) { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteReadSchemaTransformConfiguration configuration) { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform< + PubsubLiteReadSchemaTransformConfiguration> + from(PubsubLiteReadSchemaTransformConfiguration configuration) { if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( String.format( @@ -242,7 +237,8 @@ public void finish(FinishBundleContext c) { "To read from Pubsub Lite in JSON or AVRO format, you must provide a schema."); } } - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { String project = configuration.getProject(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java index 8ba8176035da..1952c24f3c52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java @@ -80,12 +80,6 @@ public class PubsubLiteWriteSchemaTransformProvider private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class); - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return PubsubLiteWriteSchemaTransformConfiguration.class; - } - public static class ErrorCounterFn extends DoFn { private final SerializableFunction toBytesFn; private final Counter errorCounter; @@ -172,8 +166,9 @@ public void finish() { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteWriteSchemaTransformConfiguration configuration) { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform< + PubsubLiteWriteSchemaTransformConfiguration> + from(PubsubLiteWriteSchemaTransformConfiguration configuration) { if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( @@ -184,7 +179,8 @@ public void finish() { + String.join(", ", SUPPORTED_FORMATS)); } - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { List attributesConfigValue = configuration.getAttributes(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 6c8a2541a88b..42793dc9e495 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -55,21 +55,19 @@ public class SpannerWriteSchemaTransformProvider SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return SpannerWriteSchemaTransformConfiguration.class; + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + SpannerWriteSchemaTransformConfiguration> + from(SpannerWriteSchemaTransformConfiguration configuration) { + return new SpannerSchemaTransformWrite(configuration, identifier()); } - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerWriteSchemaTransformConfiguration configuration) { - return new SpannerSchemaTransformWrite(configuration); - } - - static class SpannerSchemaTransformWrite extends SchemaTransform implements Serializable { + static class SpannerSchemaTransformWrite + extends SchemaTransform implements Serializable { private final SpannerWriteSchemaTransformConfiguration configuration; - SpannerSchemaTransformWrite(SpannerWriteSchemaTransformConfiguration configuration) { + SpannerSchemaTransformWrite( + SpannerWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java index f3562e4cd917..d3035b1dea98 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java @@ -79,11 +79,6 @@ public class SpannerChangestreamsReadSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> { - @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return SpannerChangestreamsReadConfiguration.class; - } private static final Logger LOG = LoggerFactory.getLogger(SpannerChangestreamsReadSchemaTransformProvider.class); @@ -94,10 +89,9 @@ public class SpannerChangestreamsReadSchemaTransformProvider Schema.builder().addStringField("error").addNullableStringField("row").build(); @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration - configuration) { - return new SchemaTransform() { + public @UnknownKeyFor @NonNull @Initialized SchemaTransform + from(SpannerChangestreamsReadConfiguration configuration) { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { Pipeline p = input.getPipeline(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java index 81d3103f38bf..c2da6a02cfa9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java @@ -222,7 +222,7 @@ public void testRead() { .setInstanceId(instanceId) .setProjectId(projectId) .build(); - SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); PCollection rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); PAssert.that(rows).containsInAnyOrder(expectedRows); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java index dd5a9abd5ac8..fe1eece009e0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java @@ -182,7 +182,7 @@ public void testReadRaw() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")) @@ -229,7 +229,7 @@ public void testReadAttributes() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")) @@ -260,7 +260,7 @@ public void testReadAvro() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")).containsInAnyOrder(ROWS); @@ -288,7 +288,7 @@ public void testReadAvroWithError() throws IOException { .setClientFactory(clientFactory) .setClock(CLOCK) .build(); - SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get("output")).empty(); diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index 0139207235a0..44b2b937ab14 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -50,23 +50,20 @@ public class JdbcReadSchemaTransformProvider JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return JdbcReadSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcReadSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + JdbcReadSchemaTransformConfiguration> + from(JdbcReadSchemaTransformConfiguration configuration) { configuration.validate(); - return new JdbcReadSchemaTransform(configuration); + return new JdbcReadSchemaTransform(configuration, identifier()); } - static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { + static class JdbcReadSchemaTransform extends SchemaTransform + implements Serializable { JdbcReadSchemaTransformConfiguration config; - public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) { + public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index a409b604b11f..fc4a45ad9af9 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -55,23 +55,21 @@ public class JdbcWriteSchemaTransformProvider JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return JdbcWriteSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - JdbcWriteSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + JdbcWriteSchemaTransformConfiguration> + from(JdbcWriteSchemaTransformConfiguration configuration) { configuration.validate(); - return new JdbcWriteSchemaTransform(configuration); + return new JdbcWriteSchemaTransform(configuration, identifier()); } - static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { + static class JdbcWriteSchemaTransform + extends SchemaTransform implements Serializable { JdbcWriteSchemaTransformConfiguration config; - public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration config) { + public JdbcWriteSchemaTransform( + JdbcWriteSchemaTransformConfiguration config, String identifier) { + super(config, identifier); this.config = config; } diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java index 9e030821e5ca..4b5d73153616 100644 --- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java @@ -57,13 +57,8 @@ public class JsonWriteTransformProvider private static final String WRITE_RESULTS = "output"; @Override - protected Class configurationClass() { - return JsonWriteConfiguration.class; - } - - @Override - protected SchemaTransform from(JsonWriteConfiguration configuration) { - return new JsonWriteTransform(configuration); + protected SchemaTransform from(JsonWriteConfiguration configuration) { + return new JsonWriteTransform(configuration, identifier()); } @Override @@ -110,11 +105,12 @@ public abstract static class Builder { } /** A {@link SchemaTransform} for {@link JsonIO#write}. */ - protected static class JsonWriteTransform extends SchemaTransform { + protected static class JsonWriteTransform extends SchemaTransform { private final JsonWriteConfiguration configuration; - JsonWriteTransform(JsonWriteConfiguration configuration) { + JsonWriteTransform(JsonWriteConfiguration configuration, String identifier) { + super(configuration, identifier); configuration.validate(); this.configuration = configuration; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 13240ea9dc40..e0feb704e010 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -89,16 +89,12 @@ public KafkaReadSchemaTransformProvider() { this.testTimeoutSecs = testTimeoutSecs; } - @Override - protected Class configurationClass() { - return KafkaReadSchemaTransformConfiguration.class; - } - @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @Override - protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { + protected SchemaTransform from( + KafkaReadSchemaTransformConfiguration configuration) { configuration.validate(); final String inputSchema = configuration.getSchema(); @@ -122,7 +118,8 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); if (confluentSchemaRegUrl != null) { - return new SchemaTransform() { + return new SchemaTransform( + configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { final String confluentSchemaRegSubject = @@ -172,7 +169,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema); } - return new SchemaTransform() { + return new SchemaTransform(configuration, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { KafkaIO.Read kafkaRead = diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 26f37b790ef8..3942e0d09946 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -73,14 +73,9 @@ public class KafkaWriteSchemaTransformProvider LoggerFactory.getLogger(KafkaWriteSchemaTransformProvider.class); @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { - return KafkaWriteSchemaTransformConfiguration.class; - } - - @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - KafkaWriteSchemaTransformConfiguration configuration) { + protected @UnknownKeyFor @NonNull @Initialized SchemaTransform< + KafkaWriteSchemaTransformConfiguration> + from(KafkaWriteSchemaTransformConfiguration configuration) { if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( "Format " @@ -89,13 +84,16 @@ public class KafkaWriteSchemaTransformProvider + "Supported formats are: " + String.join(", ", SUPPORTED_FORMATS)); } - return new KafkaWriteSchemaTransform(configuration); + return new KafkaWriteSchemaTransform(configuration, identifier()); } - static final class KafkaWriteSchemaTransform extends SchemaTransform implements Serializable { + static final class KafkaWriteSchemaTransform + extends SchemaTransform implements Serializable { final KafkaWriteSchemaTransformConfiguration configuration; - KafkaWriteSchemaTransform(KafkaWriteSchemaTransformConfiguration configuration) { + KafkaWriteSchemaTransform( + KafkaWriteSchemaTransformConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java index 9511a59d472c..c1044d950101 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java @@ -39,16 +39,11 @@ public class SingleStoreSchemaTransformReadProvider extends TypedSchemaTransformProvider { private static final String OUTPUT_TAG = "OUTPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return SingleStoreSchemaTransformReadConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(SingleStoreSchemaTransformReadConfiguration configuration) { - return new SingleStoreReadSchemaTransform(configuration); + protected SchemaTransform from( + SingleStoreSchemaTransformReadConfiguration configuration) { + return new SingleStoreReadSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -79,10 +74,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using * {@link SingleStoreSchemaTransformReadConfiguration}. */ - private static class SingleStoreReadSchemaTransform extends SchemaTransform { + private static class SingleStoreReadSchemaTransform + extends SchemaTransform { private final SingleStoreSchemaTransformReadConfiguration configuration; - SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) { + SingleStoreReadSchemaTransform( + SingleStoreSchemaTransformReadConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java index dafa10087a4a..f24db96eb1d8 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java @@ -43,16 +43,11 @@ public class SingleStoreSchemaTransformWriteProvider private static final String OUTPUT_TAG = "OUTPUT"; public static final String INPUT_TAG = "INPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class configurationClass() { - return SingleStoreSchemaTransformWriteConfiguration.class; - } - /** Returns the expected {@link SchemaTransform} of the configuration. */ @Override - protected SchemaTransform from(SingleStoreSchemaTransformWriteConfiguration configuration) { - return new SingleStoreWriteSchemaTransform(configuration); + protected SchemaTransform from( + SingleStoreSchemaTransformWriteConfiguration configuration) { + return new SingleStoreWriteSchemaTransform(configuration, identifier()); } /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @@ -83,10 +78,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using * {@link SingleStoreSchemaTransformWriteConfiguration}. */ - private static class SingleStoreWriteSchemaTransform extends SchemaTransform { + private static class SingleStoreWriteSchemaTransform + extends SchemaTransform { private final SingleStoreSchemaTransformWriteConfiguration configuration; - SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { + SingleStoreWriteSchemaTransform( + SingleStoreSchemaTransformWriteConfiguration configuration, String identifier) { + super(configuration, identifier); this.configuration = configuration; } diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java index e1a280f1594e..1d0eeec3f7d6 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java @@ -136,7 +136,7 @@ private PipelineResult runWrite() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); Schema.Builder schemaBuilder = new Schema.Builder(); schemaBuilder.addField("id", Schema.FieldType.INT32); @@ -190,7 +190,7 @@ private PipelineResult runRead() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead); String tag = provider.outputCollectionNames().get(0); @@ -221,7 +221,7 @@ private PipelineResult runReadWithPartitions() { .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); + SchemaTransform schemaTransform = provider.from(configurationRow); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions); String tag = provider.outputCollectionNames().get(0); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index da4a0853fb39..bcb8f7e9b528 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -191,7 +191,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = + SchemaTransform underlyingTransform = new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); return input.apply(underlyingTransform); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 0702137cffd3..fc0df4cbc66f 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -81,7 +81,7 @@ public ManagedSchemaTransformProvider() { @DefaultSchema(AutoValueSchema.class) @AutoValue @VisibleForTesting - abstract static class ManagedConfig { + public abstract static class ManagedConfig { public static Builder builder() { return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); } @@ -136,7 +136,7 @@ protected void validate() { } @Override - protected SchemaTransform from(ManagedConfig managedConfig) { + protected SchemaTransform from(ManagedConfig managedConfig) { managedConfig.validate(); SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( @@ -147,16 +147,19 @@ protected SchemaTransform from(ManagedConfig managedConfig) { + "the specified transform not being supported.", managedConfig.getTransformIdentifier()); - return new ManagedSchemaTransform(managedConfig, schemaTransformProvider); + return new ManagedSchemaTransform(managedConfig, identifier(), schemaTransformProvider); } - static class ManagedSchemaTransform extends SchemaTransform { + static class ManagedSchemaTransform extends SchemaTransform { private final ManagedConfig managedConfig; private final Row underlyingTransformConfig; private final SchemaTransformProvider underlyingTransformProvider; ManagedSchemaTransform( - ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) { + ManagedConfig managedConfig, + String managedIdentifier, + SchemaTransformProvider underlyingTransformProvider) { + super(managedConfig, managedIdentifier); // parse config before expansion to check if it matches underlying transform's config schema Schema transformConfigSchema = underlyingTransformProvider.configurationSchema(); Row underlyingTransformConfig; @@ -185,20 +188,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { public ManagedConfig getManagedConfig() { return this.managedConfig; } - - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - return SchemaRegistry.createDefault() - .getToRowFunction(ManagedConfig.class) - .apply(managedConfig) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } } @VisibleForTesting diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java index 2771ecd01643..d214d1965eea 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java @@ -63,10 +63,10 @@ public abstract static class Builder { } @Override - public SchemaTransform from(Config config) { + public SchemaTransform from(Config config) { String extraString = config.getExtraString(); Integer extraInteger = config.getExtraInteger(); - return new SchemaTransform() { + return new SchemaTransform(config, identifier()) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { Schema schema =