diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index 283720e09772..8b7724341075 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -20,6 +20,9 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; /** * An abstraction representing schema capable and aware transforms. The interface is intended to be @@ -33,5 +36,39 @@ * compatibility guarantees and it should not be implemented outside of the Beam repository. */ @Internal -public abstract class SchemaTransform - extends PTransform {} +public abstract class SchemaTransform extends PTransform { + private @Nullable Row configurationRow; + private @Nullable String identifier; + private boolean registered = false; + + /** + * Stores the transform's identifier and configuration {@link Row} used to build this instance. + * Doing so allows this transform to be translated from/to proto using {@link + * org.apache.beam.sdk.util.construction.PTransformTranslation.SchemaTransformTranslator}. + */ + public SchemaTransform register(Row configurationRow, String identifier) { + this.configurationRow = configurationRow; + this.identifier = identifier; + registered = true; + + return this; + } + + public Row getConfigurationRow() { + return Preconditions.checkNotNull( + configurationRow, + "Could not fetch Row configuration for %s. Please store it using .register().", + getClass()); + } + + public String getIdentifier() { + return Preconditions.checkNotNull( + identifier, + "Could not fetch identifier for %s. Please store it using .register().", + getClass()); + } + + public boolean isRegistered() { + return registered; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java index 15553411f4c1..ec9d01a95881 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java @@ -23,6 +23,9 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.coders.RowCoder; @@ -36,13 +39,24 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** - * A {@link TransformPayloadTranslator} implementation that translates between a Java {@link - * SchemaTransform} and a protobuf payload for that transform. + * A default {@link TransformPayloadTranslator} implementation for {@link SchemaTransform}s. + * + *

Note: This is only eligible for registered SchemaTransform instances (using {@link + * SchemaTransform#register(Row, String)} or {@link TypedSchemaTransformProvider#register(Object, + * SchemaTransform)}). */ public class SchemaTransformTranslation { - public abstract static class SchemaTransformPayloadTranslator - implements TransformPayloadTranslator { - public abstract SchemaTransformProvider provider(); + public static class SchemaTransformPayloadTranslator + implements TransformPayloadTranslator { + private final SchemaTransformProvider provider; + + public String identifier() { + return provider.identifier(); + } + + public SchemaTransformPayloadTranslator(SchemaTransformProvider provider) { + this.provider = provider; + } @Override public String getUrn() { @@ -52,18 +66,19 @@ public String getUrn() { @Override @SuppressWarnings("argument") public @Nullable FunctionSpec translate( - AppliedPTransform application, SdkComponents components) throws IOException { + AppliedPTransform application, SdkComponents components) + throws IOException { SchemaApi.Schema expansionSchema = - SchemaTranslation.schemaToProto(provider().configurationSchema(), true); + SchemaTranslation.schemaToProto(provider.configurationSchema(), true); Row configRow = toConfigRow(application.getTransform()); ByteArrayOutputStream os = new ByteArrayOutputStream(); - RowCoder.of(provider().configurationSchema()).encode(configRow, os); + RowCoder.of(provider.configurationSchema()).encode(configRow, os); return FunctionSpec.newBuilder() .setUrn(getUrn()) .setPayload( ExternalTransforms.SchemaTransformPayload.newBuilder() - .setIdentifier(provider().identifier()) + .setIdentifier(provider.identifier()) .setConfigurationSchema(expansionSchema) .setConfigurationRow(ByteString.copyFrom(os.toByteArray())) .build() @@ -72,8 +87,21 @@ public String getUrn() { } @Override - public T fromConfigRow(Row configRow, PipelineOptions options) { - return (T) provider().from(configRow); + public Row toConfigRow(SchemaTransform transform) { + return transform.getConfigurationRow(); + } + + @Override + public SchemaTransform fromConfigRow(Row configRow, PipelineOptions options) { + return provider.from(configRow); + } + } + + public static Map getDefaultTranslators() { + Map translators = new HashMap<>(); + for (SchemaTransformProvider provider : ServiceLoader.load(SchemaTransformProvider.class)) { + translators.put(provider.identifier(), new SchemaTransformPayloadTranslator(provider)); } + return translators; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index d9b49dd3ca27..b3f1fc00aa53 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -74,6 +74,28 @@ protected Class configurationClass() { return (Class) parameterizedType.getActualTypeArguments()[0]; } + /** Like {@link SchemaTransform#register(Row, String)}, but with a configuration POJO. */ + protected SchemaTransform register(ConfigT configuration, SchemaTransform transform) { + SchemaRegistry registry = SchemaRegistry.createDefault(); + try { + // Get initial row with values + // then sort lexicographically and convert to snake_case + Row configRow = + registry + .getToRowFunction(configurationClass()) + .apply(configuration) + .sorted() + .toSnakeCase(); + return transform.register(configRow, identifier()); + } catch (NoSuchSchemaException e) { + throw new RuntimeException( + String.format( + "Unable to find schema for this SchemaTransform's config type: %s", + configurationClass()), + e); + } + } + /** * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a * {@link InvalidSchemaException}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 3d9142723737..12ae717456fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -46,6 +46,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; @@ -59,6 +61,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedSet; @@ -253,6 +256,7 @@ private static Collection> loadKnownTranslators() { .add(new KnownTransformPayloadTranslator()) .add(ParDoTranslator.create()) .add(ExternalTranslator.create()) + .add(new SchemaTransformTranslator()) .build(); } @@ -581,6 +585,86 @@ public RunnerApi.PTransform translate( } } + /** + * Translates {@link SchemaTransform}s by populating the {@link FunctionSpec} with a {@link + * ExternalTransforms.SchemaTransformPayload} containing the transform's configuration {@link + * Schema} and {@link Row}. + * + *

This can be used as a default translator for SchemaTransforms. If further customization is + * needed, you can develop a {@link TransformPayloadTranslator} implementation and include it in a + * {@link TransformPayloadTranslatorRegistrar}, which will be picked up by {@link + * KnownTransformPayloadTranslator}. + * + *

Note: This default translator is only eligible for registered SchemaTransform instances + * (using {@link SchemaTransform#register(Row, String)} or {@link + * org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider#register(Object, + * SchemaTransform)}). + */ + private static class SchemaTransformTranslator implements TransformTranslator { + @Override + public @Nullable String getUrn(SchemaTransform transform) { + return transform.getIdentifier(); + } + + @Override + public boolean canTranslate(PTransform transform) { + // Can translate only if the SchemaTransform's configuration Row and identifier are accessible + if (transform instanceof SchemaTransform) { + return (((SchemaTransform) transform).isRegistered()); + } + return false; + } + + @Override + public RunnerApi.PTransform translate( + AppliedPTransform appliedPTransform, + List> subtransforms, + SdkComponents components) + throws IOException { + RunnerApi.PTransform.Builder transformBuilder = + translateAppliedPTransform(appliedPTransform, subtransforms, components); + + String identifier = ((SchemaTransform) appliedPTransform.getTransform()).getIdentifier(); + TransformPayloadTranslator payloadTranslator = + SchemaTransformTranslation.getDefaultTranslators().get(identifier); + + FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components); + Row configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform()); + + if (spec != null) { + transformBuilder.setSpec(spec); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { + String underlyingIdentifier = + Preconditions.checkNotNull( + configRow.getString("transform_identifier"), + "Encountered a Managed Transform that has an empty \"transform_identifier\": %n%s", + configRow); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.MANAGED_UNDERLYING_TRANSFORM_URN_KEY), + ByteString.copyFromUtf8(underlyingIdentifier)); + } + } + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_KEY), + ByteString.copyFrom( + CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow))); + transformBuilder.putAnnotations( + BeamUrns.getConstant(Annotations.Enum.CONFIG_ROW_SCHEMA_KEY), + ByteString.copyFrom( + SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray())); + + for (Entry annotation : + appliedPTransform.getTransform().getAnnotations().entrySet()) { + transformBuilder.putAnnotations( + annotation.getKey(), ByteString.copyFrom(annotation.getValue())); + } + + return transformBuilder.build(); + } + } /** * Translates an {@link AppliedPTransform} by: * diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslationTest.java new file mode 100644 index 000000000000..aa4cb63706e3 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslationTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.junit.Test; + +/** Base class for standard {@link SchemaTransform} translation tests. */ +public abstract class SchemaTransformTranslationTest { + protected abstract SchemaTransformProvider provider(); + + protected abstract Row configurationRow(); + + /** Input used for this SchemaTransform. Used to build a pipeline to test proto translation. */ + protected PCollectionRowTuple input(Pipeline p) { + return PCollectionRowTuple.empty(p); + }; + + @Test + public void testRecreateTransformFromRow() { + SchemaTransformProvider provider = provider(); + SchemaTransformPayloadTranslator translator = new SchemaTransformPayloadTranslator(provider); + SchemaTransform originalTransform = provider.from(configurationRow()); + + Row translatedConfigRow = translator.toConfigRow(originalTransform); + SchemaTransform translatedTransform = + translator.fromConfigRow(translatedConfigRow, PipelineOptionsFactory.create()); + + assertEquals(configurationRow(), translatedTransform.getConfigurationRow()); + } + + @Test + public void testTransformProtoTranslation() throws InvalidProtocolBufferException, IOException { + SchemaTransformProvider provider = provider(); + Row configurationRow = configurationRow(); + + // Infer if it's a read or write SchemaTransform and build pipeline accordingly + Pipeline p = Pipeline.create(); + SchemaTransform schemaTransform = provider.from(configurationRow); + input(p).apply(schemaTransform); + + // Then translate the pipeline to a proto and extract the SchemaTransform's proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List schemaTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(provider.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, schemaTransformProto.size()); + RunnerApi.FunctionSpec spec = schemaTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + ExternalTransforms.SchemaTransformPayload payload = + ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema translatedSchema = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(provider.configurationSchema(), translatedSchema); + Row translatedConfigRow = + RowCoder.of(translatedSchema).decode(payload.getConfigurationRow().newInput()); + + assertEquals(configurationRow, translatedConfigRow); + + // Use the information in the proto to recreate the transform + SchemaTransform translatedTransform = + new SchemaTransformPayloadTranslator(provider) + .fromConfigRow(translatedConfigRow, PipelineOptionsFactory.create()); + + assertEquals(configurationRow, translatedTransform.getConfigurationRow()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index 2eef0e30f805..319b72900436 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -29,8 +29,10 @@ import org.apache.beam.sdk.testing.UsesSchema; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -38,6 +40,7 @@ @RunWith(JUnit4.class) @Category(UsesSchema.class) public class TypedSchemaTransformProviderTest { + @Rule public ExpectedException thrown = ExpectedException.none(); /** flat schema to select from. */ @DefaultSchema(AutoValueSchema.class) @@ -105,7 +108,7 @@ public String identifier() { @Override public SchemaTransform from(Configuration config) { - return new FakeSchemaTransform(config); + return register(config, new FakeSchemaTransform(config)); } } @@ -123,6 +126,38 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + @Test + public void testInferConfigurationClass() { + assertEquals(Configuration.class, new FakeTypedSchemaIOProvider().configurationClass()); + assertEquals(Configuration.class, new FakeMinimalTypedProvider().configurationClass()); + } + + @Test + public void testGetConfigurationRow() { + FakeMinimalTypedProvider minimalProvider = new FakeMinimalTypedProvider(); + Configuration inputConfig = Configuration.create("field1", 13); + SchemaTransform transform = minimalProvider.from(inputConfig); + + Row expectedConfig = + Row.withSchema(minimalProvider.configurationSchema()) + .withFieldValue("string_field", "field1") + .withFieldValue("integer_field", 13) + .build(); + + assertEquals(expectedConfig, transform.getConfigurationRow()); + assertEquals(minimalProvider.identifier(), transform.getIdentifier()); + + // FakeTypedSchemaIOProvider doesn't register its schematransform. + // Check that a helpful error message is returned. + FakeTypedSchemaIOProvider fakeProvider = new FakeTypedSchemaIOProvider(); + SchemaTransform unregisteredTransform = fakeProvider.from(inputConfig); + thrown.expect(NullPointerException.class); + thrown.expectMessage("Could not fetch Row configuration"); + thrown.expectMessage("FakeSchemaTransform"); + thrown.expectMessage("Please store it using .register()"); + unregisteredTransform.getConfigurationRow(); + } + @Test public void testFrom() { SchemaTransformProvider provider = new FakeTypedSchemaIOProvider(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java index a54383b46e7c..cce196676def 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java @@ -41,7 +41,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -178,7 +178,7 @@ public Schema configurationSchema() { @Override public SchemaTransform from(Row configuration) { - return new TestSchemaTransform(); + return new TestSchemaTransform().register(configuration, identifier()); } } @@ -190,19 +190,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - static class TestSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new TestSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(TestSchemaTransform transform) { - return Row.withSchema(Schema.builder().build()).build(); - } - } - @AutoService(TransformPayloadTranslatorRegistrar.class) public static class TestSchemaTransformPayloadTranslatorRegistrar implements TransformPayloadTranslatorRegistrar { @@ -213,7 +200,10 @@ public static class TestSchemaTransformPayloadTranslatorRegistrar public Map, ? extends TransformPayloadTranslator> getTransformPayloadTranslators() { return ImmutableMap., TransformPayloadTranslator>builder() - .put(TestSchemaTransform.class, new TestSchemaTransformTranslator()) + .put( + TestSchemaTransform.class, + new SchemaTransformTranslation.SchemaTransformPayloadTranslator( + new TestSchemaTransformProvider())) .build(); } } @@ -371,7 +361,9 @@ public void testTransformUpgradeSchemaTransform() throws Exception { Pipeline pipeline = Pipeline.create(); // Build the pipeline - PCollectionRowTuple.empty(pipeline).apply(new TestSchemaTransform()); + TestSchemaTransformProvider provider = new TestSchemaTransformProvider(); + PCollectionRowTuple.empty(pipeline) + .apply(provider.from(Row.nullRow(provider.configurationSchema()))); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); ExternalTranslationOptions options = diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 770da14fa1cf..32b51253fc2d 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -68,6 +68,7 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -139,11 +140,23 @@ public interface ExpansionServiceRegistrar { public static class ExternalTransformRegistrarLoader implements ExpansionService.ExpansionServiceRegistrar { + /** + * Map of known PTransform URNs (ie. transforms listed in a {@link + * org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar}) and their + * corresponding TransformProviders. + */ @Override public Map knownTransforms() { Map providers = new HashMap<>(); - // First check and register ExternalTransformBuilder in serviceloader style, converting + // First populate with SchemaTransform URNs and their default translator implementation. + // These can be overwritten below if a custom translator is found. + for (Map.Entry entry : + SchemaTransformTranslation.getDefaultTranslators().entrySet()) { + providers.put(entry.getKey(), new TransformProviderForPayloadTranslator(entry.getValue())); + } + + // Then check and register ExternalTransformBuilder in serviceloader style, converting // to TransformProvider after validation. Map registeredBuilders = loadTransformBuilders(); for (Map.Entry registeredBuilder : @@ -179,7 +192,7 @@ public Map knownTransforms() { continue; } else if (urn.equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) && translator instanceof SchemaTransformPayloadTranslator) { - urn = ((SchemaTransformPayloadTranslator) translator).provider().identifier(); + urn = ((SchemaTransformPayloadTranslator) translator).identifier(); } } catch (Exception e) { LOG.info( @@ -245,7 +258,7 @@ private static class TransformProviderForPayloadTranslator< InputT extends PInput, OutputT extends POutput> implements TransformProvider { - private final TransformPayloadTranslator> payloadTranslator; + private final TransformPayloadTranslator payloadTranslator; // Returns true if the underlying transform represented by this is a schema-aware transform. private boolean isSchemaTransform() { diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..0796f329adbc 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -26,7 +26,9 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -174,6 +176,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return Collections.singletonList("output"); } + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class DebeziumReadSchemaTransformConfiguration { public abstract String getUsername(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index fb32e18d9374..8478f2bb3ee0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -24,8 +24,6 @@ import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -48,7 +46,8 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv @Override protected SchemaTransform from(Config configuration) { configuration.validate(); - return new IcebergReadSchemaTransform(configuration); + + return register(configuration, new IcebergReadSchemaTransform(configuration)); } @Override @@ -93,20 +92,6 @@ static class IcebergReadSchemaTransform extends SchemaTransform { this.configuration = configuration; } - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - return SchemaRegistry.createDefault() - .getToRowFunction(Config.class) - .apply(configuration) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java deleted file mode 100644 index c33f7d6261e8..000000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslation.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform; -import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform; -import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; - -import com.google.auto.service.AutoService; -import java.util.Map; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -@SuppressWarnings({"rawtypes", "nullness"}) -public class IcebergSchemaTransformTranslation { - static class IcebergReadSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new IcebergReadSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(IcebergReadSchemaTransform transform) { - return transform.getConfigurationRow(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap., TransformPayloadTranslator>builder() - .put(IcebergReadSchemaTransform.class, new IcebergReadSchemaTransformTranslator()) - .build(); - } - } - - static class IcebergWriteSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new IcebergWriteSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(IcebergWriteSchemaTransform transform) { - return transform.getConfigurationRow(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap., TransformPayloadTranslator>builder() - .put(IcebergWriteSchemaTransform.class, new IcebergWriteSchemaTransformTranslator()) - .build(); - } - } -} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index b490693a9adb..179e0b1254d5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -24,9 +24,7 @@ import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -65,7 +63,7 @@ public String description() { @Override protected SchemaTransform from(Config configuration) { configuration.validate(); - return new IcebergWriteSchemaTransform(configuration); + return register(configuration, new IcebergWriteSchemaTransform(configuration)); } @Override @@ -117,20 +115,6 @@ static class IcebergWriteSchemaTransform extends SchemaTransform { this.configuration = configuration; } - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - return SchemaRegistry.createDefault() - .getToRowFunction(Config.class) - .apply(configuration) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java index fb4c98cb0bdf..3498267bc445 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java @@ -17,230 +17,97 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; -import static org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.IcebergReadSchemaTransform; -import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG; -import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.IcebergWriteSchemaTransform; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslationTest; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.construction.BeamUrns; -import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +@RunWith(Enclosed.class) public class IcebergSchemaTransformTranslationTest { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - @Rule - public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); - - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - static final IcebergWriteSchemaTransformProvider WRITE_PROVIDER = - new IcebergWriteSchemaTransformProvider(); - static final IcebergReadSchemaTransformProvider READ_PROVIDER = - new IcebergReadSchemaTransformProvider(); - - @Test - public void testReCreateWriteTransformFromRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); - Row transformConfigRow = - Row.withSchema(WRITE_PROVIDER.configurationSchema()) - .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) - .build(); - IcebergWriteSchemaTransform writeTransform = - (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); - - IcebergSchemaTransformTranslation.IcebergWriteSchemaTransformTranslator translator = - new IcebergSchemaTransformTranslation.IcebergWriteSchemaTransformTranslator(); - Row row = translator.toConfigRow(writeTransform); - - IcebergWriteSchemaTransform writeTransformFromRow = - translator.fromConfigRow(row, PipelineOptionsFactory.create()); - - assertEquals(transformConfigRow, writeTransformFromRow.getConfigurationRow()); + public static class ReadTranslationTest extends SchemaTransformTranslationTest { + @Rule + public transient TestDataWarehouse warehouse = + new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + String tableIdentifier; + + @Before + public void setup() { + tableIdentifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); + warehouse.createTable(TableIdentifier.parse(tableIdentifier), TestFixtures.SCHEMA); + } + + static final IcebergReadSchemaTransformProvider READ_PROVIDER = + new IcebergReadSchemaTransformProvider(); + + @Override + protected SchemaTransformProvider provider() { + return READ_PROVIDER; + } + + @Override + protected Row configurationRow() { + Row catalogConfigRow = + Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) + .withFieldValue("catalog_name", "test_read") + .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .withFieldValue("warehouse_location", warehouse.location) + .build(); + return Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("table", tableIdentifier) + .withFieldValue("catalog_config", catalogConfigRow) + .build(); + } } - @Test - public void testWriteTransformProtoTranslation() - throws InvalidProtocolBufferException, IOException { - // First build a pipeline - Pipeline p = Pipeline.create(); - Schema inputSchema = Schema.builder().addStringField("str").build(); - PCollection input = - p.apply( - Create.of( - Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build()))) - .setRowSchema(inputSchema); - - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("catalog_implementation", "test_implementation") - .withFieldValue("warehouse_location", warehouse.location) - .build(); - Row transformConfigRow = - Row.withSchema(WRITE_PROVIDER.configurationSchema()) - .withFieldValue("table", "test_identifier") - .withFieldValue("catalog_config", catalogConfigRow) - .build(); - - IcebergWriteSchemaTransform writeTransform = - (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); - PCollectionRowTuple.of(INPUT_TAG, input).apply(writeTransform); - - // Then translate the pipeline to a proto and extract IcebergWriteSchemaTransform proto - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - List writeTransformProto = - pipelineProto.getComponents().getTransformsMap().values().stream() - .filter( - tr -> { - RunnerApi.FunctionSpec spec = tr.getSpec(); - try { - return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) - && SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier() - .equals(WRITE_PROVIDER.identifier()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); - assertEquals(1, writeTransformProto.size()); - RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); - - // Check that the proto contains correct values - SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); - Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); - assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); - Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - - assertEquals(transformConfigRow, rowFromSpec); - - // Use the information in the proto to recreate the IcebergWriteSchemaTransform - IcebergSchemaTransformTranslation.IcebergWriteSchemaTransformTranslator translator = - new IcebergSchemaTransformTranslation.IcebergWriteSchemaTransformTranslator(); - IcebergWriteSchemaTransform writeTransformFromSpec = - translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - - assertEquals(transformConfigRow, writeTransformFromSpec.getConfigurationRow()); - } - - @Test - public void testReCreateReadTransformFromRow() { - // setting a subset of fields here. - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); - Row transformConfigRow = - Row.withSchema(READ_PROVIDER.configurationSchema()) - .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) - .build(); - - IcebergReadSchemaTransform readTransform = - (IcebergReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); - - IcebergSchemaTransformTranslation.IcebergReadSchemaTransformTranslator translator = - new IcebergSchemaTransformTranslation.IcebergReadSchemaTransformTranslator(); - Row row = translator.toConfigRow(readTransform); - - IcebergReadSchemaTransform readTransformFromRow = - translator.fromConfigRow(row, PipelineOptionsFactory.create()); - - assertEquals(transformConfigRow, readTransformFromRow.getConfigurationRow()); - } - - @Test - public void testReadTransformProtoTranslation() - throws InvalidProtocolBufferException, IOException { - // First build a pipeline - Pipeline p = Pipeline.create(); - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", warehouse.location) - .build(); - String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); - warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA); - - Row transformConfigRow = - Row.withSchema(READ_PROVIDER.configurationSchema()) - .withFieldValue("table", identifier) - .withFieldValue("catalog_config", catalogConfigRow) - .build(); - - IcebergReadSchemaTransform readTransform = - (IcebergReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); - - PCollectionRowTuple.empty(p).apply(readTransform); - - // Then translate the pipeline to a proto and extract IcebergReadSchemaTransform proto - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - List readTransformProto = - pipelineProto.getComponents().getTransformsMap().values().stream() - .filter( - tr -> { - RunnerApi.FunctionSpec spec = tr.getSpec(); - try { - return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) - && SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier() - .equals(READ_PROVIDER.identifier()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); - assertEquals(1, readTransformProto.size()); - RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); - - // Check that the proto contains correct values - SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); - Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); - assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); - Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - assertEquals(transformConfigRow, rowFromSpec); - - // Use the information in the proto to recreate the IcebergReadSchemaTransform - IcebergSchemaTransformTranslation.IcebergReadSchemaTransformTranslator translator = - new IcebergSchemaTransformTranslation.IcebergReadSchemaTransformTranslator(); - IcebergReadSchemaTransform readTransformFromSpec = - translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - - assertEquals(transformConfigRow, readTransformFromSpec.getConfigurationRow()); + public static class WriteTranslationTest extends SchemaTransformTranslationTest { + static final IcebergWriteSchemaTransformProvider WRITE_PROVIDER = + new IcebergWriteSchemaTransformProvider(); + + @Override + protected SchemaTransformProvider provider() { + return WRITE_PROVIDER; + } + + @Override + protected Row configurationRow() { + Row catalogConfigRow = + Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) + .withFieldValue("catalog_name", "test_write") + .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .withFieldValue("warehouse_location", "warehouse.location") + .build(); + return Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("table", "test_identifier") + .withFieldValue("catalog_config", catalogConfigRow) + .build(); + } + + @Override + protected PCollectionRowTuple input(Pipeline p) { + Schema inputSchema = Schema.builder().addStringField("str").build(); + PCollection inputRows = + p.apply( + Create.of( + Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build()))) + .setRowSchema(inputSchema); + return PCollectionRowTuple.of("input", inputRows); + } } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b2eeb1a54d1d..0fb0a16f968b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -40,9 +40,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -89,7 +87,7 @@ protected Class configurationClass() { }) @Override protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { - return new KafkaReadSchemaTransform(configuration); + return register(configuration, new KafkaReadSchemaTransform(configuration)); } public static SerializableFunction getRawBytesToRowFunction(Schema rawSchema) { @@ -123,20 +121,6 @@ static class KafkaReadSchemaTransform extends SchemaTransform { this.configuration = configuration; } - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically - return SchemaRegistry.createDefault() - .getToRowFunction(KafkaReadSchemaTransformConfiguration.class) - .apply(configuration) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { configuration.validate(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java deleted file mode 100644 index 4b83e2b6f558..000000000000 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.kafka; - -import static org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform; -import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform; -import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; - -import com.google.auto.service.AutoService; -import java.util.Map; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.construction.PTransformTranslation; -import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -public class KafkaSchemaTransformTranslation { - static class KafkaReadSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new KafkaReadSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(KafkaReadSchemaTransform transform) { - return transform.getConfigurationRow(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map< - ? extends Class, - ? extends PTransformTranslation.TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap - ., PTransformTranslation.TransformPayloadTranslator>builder() - .put(KafkaReadSchemaTransform.class, new KafkaReadSchemaTransformTranslator()) - .build(); - } - } - - static class KafkaWriteSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new KafkaWriteSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(KafkaWriteSchemaTransform transform) { - return transform.getConfigurationRow(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map< - ? extends Class, - ? extends PTransformTranslation.TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap - ., PTransformTranslation.TransformPayloadTranslator>builder() - .put(KafkaWriteSchemaTransform.class, new KafkaWriteSchemaTransformTranslator()) - .build(); - } - } -} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 09b338492b47..aed3ab206f3d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -31,9 +31,7 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -91,7 +89,7 @@ public class KafkaWriteSchemaTransformProvider + "Supported formats are: " + String.join(", ", SUPPORTED_FORMATS)); } - return new KafkaWriteSchemaTransform(configuration); + return register(configuration, new KafkaWriteSchemaTransform(configuration)); } static final class KafkaWriteSchemaTransform extends SchemaTransform implements Serializable { @@ -101,20 +99,6 @@ static final class KafkaWriteSchemaTransform extends SchemaTransform implements this.configuration = configuration; } - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically - return SchemaRegistry.createDefault() - .getToRowFunction(KafkaWriteSchemaTransformConfiguration.class) - .apply(configuration) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - public static class ErrorCounterFn extends DoFn> { private final SerializableFunction toBytesFn; private final Counter errorCounter; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index f0e0fc9a3d04..19c336e1d24e 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java index b297227bb7aa..e46372719f6f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java @@ -17,50 +17,33 @@ */ package org.apache.beam.sdk.io.kafka; -import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; -import static org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform; -import static org.apache.beam.sdk.io.kafka.KafkaSchemaTransformTranslation.KafkaReadSchemaTransformTranslator; -import static org.apache.beam.sdk.io.kafka.KafkaSchemaTransformTranslation.KafkaWriteSchemaTransformTranslator; -import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.RowCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslationTest; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.construction.BeamUrns; -import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +@RunWith(Enclosed.class) public class KafkaSchemaTransformTranslationTest { - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - static final KafkaWriteSchemaTransformProvider WRITE_PROVIDER = - new KafkaWriteSchemaTransformProvider(); - static final KafkaReadSchemaTransformProvider READ_PROVIDER = - new KafkaReadSchemaTransformProvider(); - - static final Row READ_CONFIG = - Row.withSchema(READ_PROVIDER.configurationSchema()) + public static class ReadTranslationTest extends SchemaTransformTranslationTest { + static final KafkaReadSchemaTransformProvider READ_PROVIDER = + new KafkaReadSchemaTransformProvider(); + + @Override + protected SchemaTransformProvider provider() { + return READ_PROVIDER; + } + + @Override + protected Row configurationRow() { + return Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("format", "RAW") .withFieldValue("topic", "test_topic") .withFieldValue("bootstrap_servers", "host:port") @@ -73,9 +56,21 @@ public class KafkaSchemaTransformTranslationTest { .withFieldValue("consumer_config_updates", ImmutableMap.builder().build()) .withFieldValue("error_handling", null) .build(); + } + } + + public static class WriteTranslationTest extends SchemaTransformTranslationTest { + static final KafkaWriteSchemaTransformProvider WRITE_PROVIDER = + new KafkaWriteSchemaTransformProvider(); - static final Row WRITE_CONFIG = - Row.withSchema(WRITE_PROVIDER.configurationSchema()) + @Override + protected SchemaTransformProvider provider() { + return WRITE_PROVIDER; + } + + @Override + protected Row configurationRow() { + return Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("format", "RAW") .withFieldValue("topic", "test_topic") .withFieldValue("bootstrap_servers", "host:port") @@ -85,132 +80,18 @@ public class KafkaSchemaTransformTranslationTest { .withFieldValue("message_name", "test_message") .withFieldValue("schema", "test_schema") .build(); - - @Test - public void testRecreateWriteTransformFromRow() { - KafkaWriteSchemaTransform writeTransform = - (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); - - KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); - Row translatedRow = translator.toConfigRow(writeTransform); - - KafkaWriteSchemaTransform writeTransformFromRow = - translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); - - assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); - } - - @Test - public void testWriteTransformProtoTranslation() - throws InvalidProtocolBufferException, IOException { - // First build a pipeline - Pipeline p = Pipeline.create(); - Schema inputSchema = Schema.builder().addByteArrayField("b").build(); - PCollection input = - p.apply( - Create.of( - Collections.singletonList( - Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build()))) - .setRowSchema(inputSchema); - - KafkaWriteSchemaTransform writeTransform = - (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); - PCollectionRowTuple.of("input", input).apply(writeTransform); - - // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - List writeTransformProto = - pipelineProto.getComponents().getTransformsMap().values().stream() - .filter( - tr -> { - RunnerApi.FunctionSpec spec = tr.getSpec(); - try { - return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) - && SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier() - .equals(WRITE_PROVIDER.identifier()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); - assertEquals(1, writeTransformProto.size()); - RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); - - // Check that the proto contains correct values - SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); - Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); - assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); - Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - - assertEquals(WRITE_CONFIG, rowFromSpec); - - // Use the information in the proto to recreate the KafkaWriteSchemaTransform - KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); - KafkaWriteSchemaTransform writeTransformFromSpec = - translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - - assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); - } - - @Test - public void testReCreateReadTransformFromRow() { - // setting a subset of fields here. - KafkaReadSchemaTransform readTransform = - (KafkaReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); - - KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); - Row row = translator.toConfigRow(readTransform); - - KafkaReadSchemaTransform readTransformFromRow = - translator.fromConfigRow(row, PipelineOptionsFactory.create()); - - assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); - } - - @Test - public void testReadTransformProtoTranslation() - throws InvalidProtocolBufferException, IOException { - // First build a pipeline - Pipeline p = Pipeline.create(); - - KafkaReadSchemaTransform readTransform = - (KafkaReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); - - PCollectionRowTuple.empty(p).apply(readTransform); - - // Then translate the pipeline to a proto and extract KafkaReadSchemaTransform proto - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - List readTransformProto = - pipelineProto.getComponents().getTransformsMap().values().stream() - .filter( - tr -> { - RunnerApi.FunctionSpec spec = tr.getSpec(); - try { - return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) - && SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier() - .equals(READ_PROVIDER.identifier()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); - assertEquals(1, readTransformProto.size()); - RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); - - // Check that the proto contains correct values - SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); - Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); - assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); - Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - assertEquals(READ_CONFIG, rowFromSpec); - - // Use the information in the proto to recreate the KafkaReadSchemaTransform - KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); - KafkaReadSchemaTransform readTransformFromSpec = - translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - - assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); + } + + @Override + protected PCollectionRowTuple input(Pipeline p) { + Schema inputSchema = Schema.builder().addByteArrayField("b").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build()))) + .setRowSchema(inputSchema); + return PCollectionRowTuple.of("input", input); + } } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 6f97983d3260..a1ae46839fee 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -35,9 +35,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -147,11 +145,11 @@ protected SchemaTransform from(ManagedConfig managedConfig) { + "the specified transform not being supported.", managedConfig.getTransformIdentifier()); - return new ManagedSchemaTransform(managedConfig, schemaTransformProvider); + return register( + managedConfig, new ManagedSchemaTransform(managedConfig, schemaTransformProvider)); } static class ManagedSchemaTransform extends SchemaTransform { - private final ManagedConfig managedConfig; private final Row underlyingTransformConfig; private final SchemaTransformProvider underlyingTransformProvider; @@ -167,7 +165,6 @@ static class ManagedSchemaTransform extends SchemaTransform { "Encountered an error when retrieving a Row configuration", e); } - this.managedConfig = managedConfig; this.underlyingTransformConfig = underlyingTransformConfig; this.underlyingTransformProvider = underlyingTransformProvider; } @@ -181,26 +178,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); } - - public ManagedConfig getManagedConfig() { - return this.managedConfig; - } - - Row getConfigurationRow() { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - return SchemaRegistry.createDefault() - .getToRowFunction(ManagedConfig.class) - .apply(managedConfig) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } } + /** Returns the underlying transform's {@link Row} config. */ @VisibleForTesting static Row getRowConfig(ManagedConfig config, Schema transformSchema) { // May return an empty row (perhaps the underlying transform doesn't have any required diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java deleted file mode 100644 index 2b1e6544ef8b..000000000000 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslation.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.managed; - -import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform; -import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; -import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; - -import com.google.auto.service.AutoService; -import java.util.Map; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -public class ManagedSchemaTransformTranslation { - static class ManagedSchemaTransformTranslator - extends SchemaTransformPayloadTranslator { - @Override - public SchemaTransformProvider provider() { - return new ManagedSchemaTransformProvider(null); - } - - @Override - public Row toConfigRow(ManagedSchemaTransform transform) { - return transform.getConfigurationRow(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class ManagedTransformRegistrar implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap., TransformPayloadTranslator>builder() - .put(ManagedSchemaTransform.class, new ManagedSchemaTransformTranslator()) - .build(); - } - } -} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java index 0d122646d899..54830b634e2d 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java @@ -25,10 +25,9 @@ import static org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig; import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedSchemaTransform; -import static org.apache.beam.sdk.managed.ManagedSchemaTransformTranslation.ManagedSchemaTransformTranslator; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; import static org.apache.beam.sdk.util.construction.PTransformTranslation.MANAGED_TRANSFORM_URN; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.net.URISyntaxException; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.CoderUtils; @@ -58,6 +58,8 @@ public class ManagedSchemaTransformTranslationTest { static final ManagedSchemaTransformProvider PROVIDER = new ManagedSchemaTransformProvider(null); + static final SchemaTransformPayloadTranslator TRANSLATOR = + new SchemaTransformPayloadTranslator(PROVIDER); @Test public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxException { @@ -75,17 +77,17 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio ManagedSchemaTransform originalTransform = (ManagedSchemaTransform) PROVIDER.from(originalConfig); - ManagedSchemaTransformTranslator translator = new ManagedSchemaTransformTranslator(); - Row configRow = translator.toConfigRow(originalTransform); + Row configRow = TRANSLATOR.toConfigRow(originalTransform); - ManagedSchemaTransform transformFromRow = - translator.fromConfigRow(configRow, PipelineOptionsFactory.create()); - ManagedConfig configFromRow = transformFromRow.getManagedConfig(); + SchemaTransform translatedTransform = + TRANSLATOR.fromConfigRow(configRow, PipelineOptionsFactory.create()); + Row translatedConfigRow = translatedTransform.getConfigurationRow(); - assertNotNull(transformFromRow.getManagedConfig()); - assertEquals(originalConfig.getTransformIdentifier(), configFromRow.getTransformIdentifier()); - assertEquals(originalConfig.getConfigUrl(), configFromRow.getConfigUrl()); - assertNull(configFromRow.getConfig()); + assertEquals( + originalConfig.getTransformIdentifier(), + translatedConfigRow.getValue("transform_identifier")); + assertEquals(originalConfig.getConfigUrl(), translatedConfigRow.getValue("config_url")); + assertNull(translatedConfigRow.getValue("config")); } @Test @@ -101,17 +103,17 @@ public void testReCreateTransformFromRowWithConfig() { ManagedSchemaTransform originalTransform = (ManagedSchemaTransform) PROVIDER.from(originalConfig); - ManagedSchemaTransformTranslator translator = new ManagedSchemaTransformTranslator(); - Row configRow = translator.toConfigRow(originalTransform); + Row configRow = TRANSLATOR.toConfigRow(originalTransform); - ManagedSchemaTransform transformFromRow = - translator.fromConfigRow(configRow, PipelineOptionsFactory.create()); - ManagedConfig configFromRow = transformFromRow.getManagedConfig(); + SchemaTransform translatedTransform = + TRANSLATOR.fromConfigRow(configRow, PipelineOptionsFactory.create()); + Row translatedConfigRow = translatedTransform.getConfigurationRow(); - assertNotNull(transformFromRow.getManagedConfig()); - assertEquals(originalConfig.getTransformIdentifier(), configFromRow.getTransformIdentifier()); - assertEquals(configFromRow.getConfig(), yamlString); - assertNull(originalConfig.getConfigUrl()); + assertEquals( + originalConfig.getTransformIdentifier(), + translatedConfigRow.getValue("transform_identifier")); + assertEquals(originalConfig.getConfig(), translatedConfigRow.getValue("config")); + assertNull(translatedConfigRow.getValue("config_url")); } @Test @@ -207,14 +209,14 @@ public void testProtoTranslation() throws Exception { assertEquals(expectedRow, rowFromSpec); // Use the information in the proto to recreate the ManagedSchemaTransform - ManagedSchemaTransformTranslator translator = new ManagedSchemaTransformTranslator(); - ManagedSchemaTransform transformFromSpec = - translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + SchemaTransform translatedTransform = + TRANSLATOR.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + Row translatedConfigRow = translatedTransform.getConfigurationRow(); assertEquals( TestSchemaTransformProvider.IDENTIFIER, - transformFromSpec.getManagedConfig().getTransformIdentifier()); - assertEquals(yamlStringConfig, transformFromSpec.getManagedConfig().getConfig()); - assertNull(transformFromSpec.getManagedConfig().getConfigUrl()); + translatedConfigRow.getValue("transform_identifier")); + assertEquals(yamlStringConfig, translatedConfigRow.getValue("config")); + assertNull(translatedConfigRow.getValue("config_url")); } }