From 9c193889dc5013f45f546d8b336d622d2a5345ee Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 22 May 2024 12:18:03 -0400 Subject: [PATCH 1/6] default schematransform configs to snake_case --- ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../TypedSchemaTransformProvider.java | 46 +++++++--- .../TypedSchemaTransformProviderTest.java | 8 +- .../IcebergReadSchemaTransformProvider.java | 12 --- .../IcebergWriteSchemaTransformProvider.java | 11 --- .../KafkaReadSchemaTransformProviderTest.java | 16 ++-- .../ManagedSchemaTransformProvider.java | 11 --- .../ManagedSchemaTransformProviderTest.java | 12 +-- ...ManagedSchemaTransformTranslationTest.java | 6 +- .../apache/beam/sdk/managed/ManagedTest.java | 2 +- .../src/test/resources/test_config.yaml | 4 +- sdks/python/apache_beam/io/gcp/bigquery.py | 14 ++-- sdks/python/apache_beam/io/gcp/bigtableio.py | 12 +-- .../transforms/external_transform_provider.py | 35 +------- .../external_transform_provider_it_test.py | 22 ----- sdks/python/apache_beam/yaml/standard_io.yaml | 84 +++++++++---------- sdks/python/apache_beam/yaml/yaml_provider.py | 2 +- sdks/python/gen_xlang_wrappers.py | 21 +---- 18 files changed, 118 insertions(+), 202 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index b26833333238..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index d5c6c724c6f5..206fcb1cdee9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.schemas.transforms; +import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.lang.reflect.ParameterizedType; import java.util.List; @@ -26,9 +28,8 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.*; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; /** @@ -38,8 +39,12 @@ *

ConfigT should be available in the SchemaRegistry. * *

{@link #configurationSchema()} produces a configuration {@link Schema} that is inferred from - * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link - * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema. + * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used to produce a + * {@link SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration + * Schema. + * + *

NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the + * {@code snake_case} naming convention. * *

Internal only: This interface is actively being worked on and it will likely change as * we provide implementations for more standard Beam transforms. We provide no backwards @@ -78,10 +83,11 @@ Optional> dependencies(ConfigT configuration, PipelineOptions optio } @Override - public Schema configurationSchema() { + public final Schema configurationSchema() { try { // Sort the fields by name to ensure a consistent schema is produced - return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted(); + // We also establish a `snake_case` convention for all SchemaTransform configurations + return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase(); } catch (NoSuchSchemaException e) { throw new RuntimeException( "Unable to find schema for " @@ -90,9 +96,12 @@ public Schema configurationSchema() { } } - /** Produces a {@link SchemaTransform} from a Row configuration. */ + /** + * Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have + * `snake_case` naming convention. + */ @Override - public SchemaTransform from(Row configuration) { + public final SchemaTransform from(Row configuration) { return from(configFromRow(configuration)); } @@ -103,9 +112,22 @@ public final Optional> dependencies(Row configuration, PipelineOpti private ConfigT configFromRow(Row configuration) { try { - return SchemaRegistry.createDefault() - .getFromRowFunction(configurationClass()) - .apply(configuration); + SchemaRegistry registry = SchemaRegistry.createDefault(); + SerializableFunction rowToConfigT = + registry.getFromRowFunction(configurationClass()); + + // Configuration objects handled by the AutoValueSchema provider will expect Row fields with + // camelCase naming convention + SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass()); + if (schemaProvider.getClass().equals(DefaultSchemaProvider.class) + && checkNotNull( + ((DefaultSchemaProvider) schemaProvider) + .getUnderlyingSchemaProvider(configurationClass())) + .getClass() + .equals(AutoValueSchema.class)) { + configuration = configuration.toCamelCase(); + } + return rowToConfigT.apply(configuration); } catch (NoSuchSchemaException e) { throw new RuntimeException( "Unable to find schema for " + identifier() + "SchemaTransformProvider's config"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index b1dc0911a927..2eef0e30f805 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -130,8 +130,8 @@ public void testFrom() { Row inputConfig = Row.withSchema(provider.configurationSchema()) - .withFieldValue("stringField", "field1") - .withFieldValue("integerField", Integer.valueOf(13)) + .withFieldValue("string_field", "field1") + .withFieldValue("integer_field", Integer.valueOf(13)) .build(); Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config; @@ -150,8 +150,8 @@ public void testDependencies() { SchemaTransformProvider provider = new FakeTypedSchemaIOProvider(); Row inputConfig = Row.withSchema(provider.configurationSchema()) - .withFieldValue("stringField", "field1") - .withFieldValue("integerField", Integer.valueOf(13)) + .withFieldValue("string_field", "field1") + .withFieldValue("integer_field", Integer.valueOf(13)) .build(); assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index bfe2fab1f9a2..fb32e18d9374 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -132,15 +131,4 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.of(OUTPUT_TAG, output); } } - - // TODO: set global snake_case naming convention and remove these special cases - @Override - public SchemaTransform from(Row rowConfig) { - return super.from(rowConfig.toCamelCase()); - } - - @Override - public Schema configurationSchema() { - return super.configurationSchema().toSnakeCase(); - } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 71183c6b0a03..b490693a9adb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -176,15 +176,4 @@ public Row apply(KV input) { } } } - - // TODO: set global snake_case naming convention and remove these special cases - @Override - public SchemaTransform from(Row rowConfig) { - return super.from(rowConfig.toCamelCase()); - } - - @Override - public Schema configurationSchema() { - return super.configurationSchema().toSnakeCase(); - } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index d5962a737baf..f5ac5bb54ad7 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -121,17 +121,17 @@ public void testFindTransformAndMakeItWork() { assertEquals( Sets.newHashSet( - "bootstrapServers", + "bootstrap_servers", "topic", "schema", - "autoOffsetResetConfig", - "consumerConfigUpdates", + "auto_offset_reset_config", + "consumer_config_updates", "format", - "confluentSchemaRegistrySubject", - "confluentSchemaRegistryUrl", - "errorHandling", - "fileDescriptorPath", - "messageName"), + "confluent_schema_registry_subject", + "confluent_schema_registry_url", + "error_handling", + "file_descriptor_path", + "message_name"), kafkaProvider.configurationSchema().getFields().stream() .map(field -> field.getName()) .collect(Collectors.toSet())); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 54e1404c650c..1060c542ac02 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -239,15 +239,4 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) { Map getAllProviders() { return schemaTransformProviders; } - - // TODO: set global snake_case naming convention and remove these special cases - @Override - public SchemaTransform from(Row rowConfig) { - return super.from(rowConfig.toCamelCase()); - } - - @Override - public Schema configurationSchema() { - return super.configurationSchema().toSnakeCase(); - } } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java index 3a3465406c03..e9edf8751e34 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() { @Test public void testGetConfigRowFromYamlString() { - String yamlString = "extraString: abc\n" + "extraInteger: 123"; + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; ManagedConfig config = ManagedConfig.builder() .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) @@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() { Row expectedRow = Row.withSchema(TestSchemaTransformProvider.SCHEMA) - .withFieldValue("extraString", "abc") - .withFieldValue("extraInteger", 123) + .withFieldValue("extra_string", "abc") + .withFieldValue("extra_integer", 123) .build(); Row returnedRow = @@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException { Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); Row expectedRow = Row.withSchema(configSchema) - .withFieldValue("extraString", "abc") - .withFieldValue("extraInteger", 123) + .withFieldValue("extra_string", "abc") + .withFieldValue("extra_integer", 123) .build(); Row configRow = ManagedSchemaTransformProvider.getRowConfig( @@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException { @Test public void testBuildWithYamlString() { - String yamlString = "extraString: abc\n" + "extraInteger: 123"; + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; ManagedConfig config = ManagedConfig.builder() diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java index 7a418976079f..b4b41ded841c 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java @@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio @Test public void testReCreateTransformFromRowWithConfig() { - String yamlString = "extraString: abc\n" + "extraInteger: 123"; + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; ManagedConfig originalConfig = ManagedConfig.builder() @@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception { .setRowSchema(inputSchema); Map underlyingConfig = ImmutableMap.builder() - .put("extraString", "abc") - .put("extraInteger", 123) + .put("extra_string", "abc") + .put("extra_integer", 123) .build(); String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig); Managed.ManagedTransform transform = diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java index 260085486c81..7ed364d0e174 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() { .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) .build() .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) - .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123)); + .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123)); runTestProviderTest(writeOp); } diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml index 3967b6095eac..da3bd68546cf 100644 --- a/sdks/java/managed/src/test/resources/test_config.yaml +++ b/sdks/java/managed/src/test/resources/test_config.yaml @@ -17,5 +17,5 @@ # under the License. # -extraString: "abc" -extraInteger: 123 +extra_string: "abc" +extra_integer: 123 diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 43bd17022180..d89ce712d8f6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2574,13 +2574,13 @@ def expand(self, input): expansion_service=self._expansion_service, rearrange_based_on_discovery=True, table=table, - createDisposition=self._create_disposition, - writeDisposition=self._write_disposition, - triggeringFrequencySeconds=self._triggering_frequency, - autoSharding=self._with_auto_sharding, - numStreams=self._num_storage_api_streams, - useAtLeastOnceSemantics=self._use_at_least_once, - errorHandling={ + create_disposition=self._create_disposition, + write_disposition=self._write_disposition, + triggering_frequency_seconds=self._triggering_frequency, + auto_sharding=self._with_auto_sharding, + num_streams=self._num_storage_api_streams, + use_at_least_once_semantics=self._use_at_least_once, + error_handling={ 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS })) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index f8534f38ddfc..0f3944a791bd 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -225,9 +225,9 @@ def expand(self, input): identifier=self.schematransform_config.identifier, expansion_service=self._expansion_service, rearrange_based_on_discovery=True, - tableId=self._table_id, - instanceId=self._instance_id, - projectId=self._project_id) + table_id=self._table_id, + instance_id=self._instance_id, + project_id=self._project_id) return ( input @@ -323,9 +323,9 @@ def expand(self, input): identifier=self.schematransform_config.identifier, expansion_service=self._expansion_service, rearrange_based_on_discovery=True, - tableId=self._table_id, - instanceId=self._instance_id, - projectId=self._project_id) + table_id=self._table_id, + instance_id=self._instance_id, + project_id=self._project_id) return ( input.pipeline diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py index 2799bd1b9e93..67adda5aec03 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -39,32 +39,6 @@ def snake_case_to_upper_camel_case(string): return output -def snake_case_to_lower_camel_case(string): - """Convert snake_case to lowerCamelCase""" - if len(string) <= 1: - return string.lower() - upper = snake_case_to_upper_camel_case(string) - return upper[0].lower() + upper[1:] - - -def camel_case_to_snake_case(string): - """Convert camelCase to snake_case""" - arr = [] - word = [] - for i, n in enumerate(string): - # If seeing an upper letter after a lower letter, we just witnessed a word - # If seeing an upper letter and the next letter is lower, we may have just - # witnessed an all caps word - if n.isupper() and ((i > 0 and string[i - 1].islower()) or - (i + 1 < len(string) and string[i + 1].islower())): - arr.append(''.join(word)) - word = [n.lower()] - else: - word.append(n.lower()) - arr.append(''.join(word)) - return '_'.join(arr).strip('_') - - # Information regarding a Wrapper parameter. ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name']) @@ -76,7 +50,7 @@ def get_config_with_descriptions( descriptions = schematransform.configuration_schema._field_descriptions fields_with_descriptions = {} for field in schema.fields: - fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo( + fields_with_descriptions[field.name] = ParamInfo( typing_from_runner_api(field.type), descriptions[field.name], field.name) @@ -105,16 +79,11 @@ def __init__(self, expansion_service=None, **kwargs): expansion_service or self.default_expansion_service def expand(self, input): - camel_case_kwargs = { - snake_case_to_lower_camel_case(k): v - for k, v in self._kwargs.items() - } - external_schematransform = SchemaAwareExternalTransform( identifier=self.identifier, expansion_service=self._expansion_service, rearrange_based_on_discovery=True, - **camel_case_kwargs) + **self._kwargs) return input | external_schematransform diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py index a53001c85fd3..95720cee7eee 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py @@ -37,9 +37,7 @@ from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN from apache_beam.transforms.external_transform_provider import ExternalTransform from apache_beam.transforms.external_transform_provider import ExternalTransformProvider -from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case from apache_beam.transforms.external_transform_provider import infer_name_from_identifier -from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case from apache_beam.transforms.xlang.io import GenerateSequence @@ -54,26 +52,6 @@ def test_snake_case_to_upper_camel_case(self): for case in test_cases: self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) - def test_snake_case_to_lower_camel_case(self): - test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), - ("test_double_underscore", "testDoubleUnderscore"), - ("TEST_CAPITALIZED", "testCapitalized"), - ("_prepended_underscore", "prependedUnderscore"), - ("appended_underscore_", "appendedUnderscore")] - for case in test_cases: - self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) - - def test_camel_case_to_snake_case(self): - test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), - ("TestDoubleUnderscore", - "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), - ("BEGINNINGAllCaps", - "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), - ("AllCapsMIDDLEWord", "all_caps_middle_word"), - ("lowerCamelCase", "lower_camel_case")] - for case in test_cases: - self.assertEqual(case[1], camel_case_to_snake_case(case[0])) - def test_infer_name_from_identifier(self): standard_test_cases = [ ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 8a5ffd9f6a9c..45227fa75241 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -30,14 +30,14 @@ mappings: 'ReadFromBigQuery': query: 'query' - table: 'tableSpec' - fields: 'selectedFields' - row_restriction: 'rowRestriction' + table: 'table_spec' + fields: 'selected_fields' + row_restriction: 'row_restriction' 'WriteToBigQuery': table: 'table' - create_disposition: 'createDisposition' - write_disposition: 'writeDisposition' - error_handling: 'errorHandling' + create_disposition: 'create_disposition' + write_disposition: 'write_disposition' + error_handling: 'error_handling' # TODO(https://github.com/apache/beam/issues/30058): Required until autosharding support is fixed num_streams: 'numStreams' underlying_provider: @@ -56,24 +56,24 @@ mappings: 'ReadFromKafka': 'schema': 'schema' - 'consumer_config': 'consumerConfigUpdates' + 'consumer_config': 'consumer_config_updates' 'format': 'format' 'topic': 'topic' - 'bootstrap_servers': 'bootstrapServers' - 'confluent_schema_registry_url': 'confluentSchemaRegistryUrl' - 'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject' - 'auto_offset_reset_config': 'autoOffsetResetConfig' - 'error_handling': 'errorHandling' - 'file_descriptor_path': 'fileDescriptorPath' - 'message_name': 'messageName' + 'bootstrap_servers': 'bootstrap_servers' + 'confluent_schema_registry_url': 'confluent_schema_registry_url' + 'confluent_schema_registry_subject': 'confluent_schema_registry_subject' + 'auto_offset_reset_config': 'auto_offset_reset_config' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' 'WriteToKafka': 'format': 'format' 'topic': 'topic' - 'bootstrap_servers': 'bootstrapServers' - 'producer_config_updates': 'producerConfigUpdates' - 'error_handling': 'errorHandling' - 'file_descriptor_path': 'fileDescriptorPath' - 'message_name': 'messageName' + 'bootstrap_servers': 'bootstrap_servers' + 'producer_config_updates': 'producer_config_updates' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' 'schema': 'schema' underlying_provider: type: beamJar @@ -93,24 +93,24 @@ 'project': 'project' 'schema': 'schema' 'format': 'format' - 'subscription_name': 'subscriptionName' + 'subscription_name': 'subscription_name' 'location': 'location' 'attributes': 'attributes' - 'attribute_map': 'attributeMap' - 'attribute_id': 'attributeId' - 'error_handling': 'errorHandling' - 'file_descriptor_path': 'fileDescriptorPath' - 'message_name': 'messageName' + 'attribute_map': 'attribute_map' + 'attribute_id': 'attribute_id' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' 'WriteToPubSubLite': 'project': 'project' 'format': 'format' - 'topic_name': 'topicName' + 'topic_name': 'topic_name' 'location': 'location' 'attributes': 'attributes' - 'attribute_id': 'attributeId' - 'error_handling': 'errorHandling' - 'file_descriptor_path': 'fileDescriptorPath' - 'message_name': 'messageName' + 'attribute_id': 'attribute_id' + 'error_handling': 'error_handling' + 'file_descriptor_path': 'file_descriptor_path' + 'message_name': 'message_name' 'schema': 'schema' underlying_provider: type: beamJar @@ -206,25 +206,25 @@ mappings: 'ReadFromJdbc': driver_class_name: 'driverClassName' - type: 'jdbcType' - url: 'jdbcUrl' + type: 'jdbc_type' + url: 'jdbc_url' username: 'username' password: 'password' table: 'location' - query: 'readQuery' - driver_jars: 'driverJars' - connection_properties: 'connectionProperties' - connection_init_sql: 'connectionInitSql' + query: 'read_query' + driver_jars: 'driver_jars' + connection_properties: 'connection_properties' + connection_init_sql: 'connection_init_sql' 'WriteToJdbc': - driver_class_name: 'driverClassName' - type: 'jdbcType' - url: 'jdbcUrl' + driver_class_name: 'driver_class_name' + type: 'jdbc_type' + url: 'jdbc_url' username: 'username' password: 'password' table: 'location' - driver_jars: 'driverJars' - connection_properties: 'connectionProperties' - connection_init_sql: 'connectionInitSql' + driver_jars: 'driver_jars' + connection_properties: 'connection_properties' + connection_init_sql: 'connection_init_sql' 'ReadFromMySql': 'ReadFromJdbc' 'WriteToMySql': 'WriteToJdbc' 'ReadFromPostgres': 'ReadFromJdbc' diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 5f53302028c8..1e9c7c605460 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -889,7 +889,7 @@ def java_window_into(java_provider, windowing): return java_provider.create_transform( 'WindowIntoStrategy', { - 'serializedWindowingStrategy': windowing_strategy.to_runner_api( + 'serialized_windowing_strategy': windowing_strategy.to_runner_api( empty_context).SerializeToString() }, None) diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py index a75fc05cba73..ea4f496c2d04 100644 --- a/sdks/python/gen_xlang_wrappers.py +++ b/sdks/python/gen_xlang_wrappers.py @@ -233,24 +233,6 @@ def pretty_type(tp): return (tp, nullable) -def camel_case_to_snake_case(string): - """Convert camelCase to snake_case""" - arr = [] - word = [] - for i, n in enumerate(string): - # If seeing an upper letter after a lower letter, we just witnessed a word - # If seeing an upper letter and the next letter is lower, we may have just - # witnessed an all caps word - if n.isupper() and ((i > 0 and string[i - 1].islower()) or - (i + 1 < len(string) and string[i + 1].islower())): - arr.append(''.join(word)) - word = [n.lower()] - else: - word.append(n.lower()) - arr.append(''.join(word)) - return '_'.join(arr).strip('_') - - def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]: """ Generates code for external transform wrapper classes (subclasses of @@ -287,9 +269,8 @@ def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]: parameters = [] for param, info in fields.items(): - pythonic_name = camel_case_to_snake_case(param) param_details = { - "name": pythonic_name, + "name": param, "type": info['type'], "description": info['description'], } From 9599890271b2a8391f7e7efd90e8ab6c018d985c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 22 May 2024 14:20:47 -0400 Subject: [PATCH 2/6] cleanup --- .../TypedSchemaTransformProvider.java | 8 ++++++- .../managed/ManagedTransformConstants.java | 21 ++----------------- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java index 206fcb1cdee9..d9b49dd3ca27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java @@ -28,7 +28,13 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.*; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaProvider; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 8165633cf15e..141544305a38 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -45,27 +45,10 @@ public class ManagedTransformConstants { public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1"; private static final Map KAFKA_READ_MAPPINGS = - ImmutableMap.builder() - .put("topic", "topic") - .put("bootstrap_servers", "bootstrapServers") - .put("consumer_config_updates", "consumerConfigUpdates") - .put("confluent_schema_registry_url", "confluentSchemaRegistryUrl") - .put("confluent_schema_registry_subject", "confluentSchemaRegistrySubject") - .put("data_format", "format") - .put("schema", "schema") - .put("file_descriptor_path", "fileDescriptorPath") - .put("message_name", "messageName") - .build(); + ImmutableMap.builder().put("data_format", "format").build(); private static final Map KAFKA_WRITE_MAPPINGS = - ImmutableMap.builder() - .put("topic", "topic") - .put("bootstrap_servers", "bootstrapServers") - .put("producer_config_updates", "producerConfigUpdates") - .put("data_format", "format") - .put("file_descriptor_path", "fileDescriptorPath") - .put("message_name", "messageName") - .build(); + ImmutableMap.builder().put("data_format", "format").build(); public static final Map> MAPPINGS = ImmutableMap.>builder() From 196e0b124dc2fcf8efcf04a10ec337aea6250da5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 11:10:41 -0400 Subject: [PATCH 3/6] add to CHANGES.md --- CHANGES.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index a6c61da45107..3468f08d0a4a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,6 +82,15 @@ This new implementation still supports all (immutable) List methods as before, but some of the random access methods like get() and size() will be slower. To use the old implementation one can use View.asList().withRandomAccess(). +* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a + configuration Schema with snake_case naming convention + ([#31374](https://github.com/apache/beam/pull/31374)). This will make the following + cases problematic: + * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform, + and vice versa: + * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform + * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381) + should be updated to use new snake_case parameter names. ## Deprecations From 80257ade608443e8ad5dc9c4a30d159420744334 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 24 May 2024 11:26:28 -0400 Subject: [PATCH 4/6] spotless --- CHANGES.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3468f08d0a4a..8f9e6981b832 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,14 +83,14 @@ but some of the random access methods like get() and size() will be slower. To use the old implementation one can use View.asList().withRandomAccess(). * SchemaTransforms implemented with TypedSchemaTransformProvider now produce a - configuration Schema with snake_case naming convention + configuration Schema with snake_case naming convention ([#31374](https://github.com/apache/beam/pull/31374)). This will make the following cases problematic: - * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform, + * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform, and vice versa: * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381) - should be updated to use new snake_case parameter names. + should be updated to use new snake_case parameter names. ## Deprecations From 8ea294d11b2b4e280ef1e9a195894ccb2ac77e35 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 28 May 2024 11:33:26 -0400 Subject: [PATCH 5/6] update Go's bigtable wrapper to export snake_case param names --- sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go index 5b6d7d916310..81df24223cac 100644 --- a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go +++ b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go @@ -62,9 +62,9 @@ import ( ) type bigtableConfig struct { - InstanceId string `beam:"instanceId"` - ProjectId string `beam:"projectId"` - TableId string `beam:"tableId"` + InstanceId string `beam:"instance_id"` + ProjectId string `beam:"project_id"` + TableId string `beam:"table_id"` } // Cell represents a single cell in a Bigtable row. From 4c32a1a5f029f0e9fa781c3e8b0534b3d03ac91c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 30 May 2024 13:49:05 -0400 Subject: [PATCH 6/6] make more yaml snake_case changes --- sdks/python/apache_beam/yaml/standard_io.yaml | 4 ++-- sdks/python/apache_beam/yaml/standard_providers.yaml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 45227fa75241..005e1af05495 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -39,7 +39,7 @@ write_disposition: 'write_disposition' error_handling: 'error_handling' # TODO(https://github.com/apache/beam/issues/30058): Required until autosharding support is fixed - num_streams: 'numStreams' + num_streams: 'num_streams' underlying_provider: type: beamJar transforms: @@ -205,7 +205,7 @@ config: mappings: 'ReadFromJdbc': - driver_class_name: 'driverClassName' + driver_class_name: 'driver_class_name' type: 'jdbc_type' url: 'jdbc_url' username: 'username' diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 89b0cc9d553e..8d0037d4dd9f 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -68,20 +68,20 @@ append: 'append' drop: 'drop' fields: 'fields' - error_handling: 'errorHandling' + error_handling: 'error_handling' 'MapToFields-java': language: 'language' append: 'append' drop: 'drop' fields: 'fields' - error_handling: 'errorHandling' + error_handling: 'error_handling' 'Filter-java': language: 'language' keep: 'keep' - error_handling: 'errorHandling' + error_handling: 'error_handling' 'Explode': fields: 'fields' - cross_product: 'crossProduct' + cross_product: 'cross_product' underlying_provider: type: beamJar transforms: