Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;

Expand All @@ -45,9 +41,6 @@
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
*
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
* {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
Expand Down Expand Up @@ -85,11 +78,10 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
}

@Override
public final Schema configurationSchema() {
public Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
// We also establish a `snake_case` convention for all SchemaTransform configurations
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand All @@ -98,12 +90,9 @@ public final Schema configurationSchema() {
}
}

/**
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
* `snake_case` naming convention.
*/
/** Produces a {@link SchemaTransform} from a Row configuration. */
@Override
public final SchemaTransform from(Row configuration) {
public SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}

Expand All @@ -114,20 +103,9 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti

private ConfigT configFromRow(Row configuration) {
try {
SchemaRegistry registry = SchemaRegistry.createDefault();

// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
// camelCase naming convention
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
&& checkNotNull(
((DefaultSchemaProvider) schemaProvider)
.getUnderlyingSchemaProvider(configurationClass()))
.getClass()
.equals(AutoValueSchema.class)) {
configuration = configuration.toCamelCase();
}
return registry.getFromRowFunction(configurationClass()).apply(configuration);
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public void testFrom() {

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
Expand All @@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.build();

assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
Expand Down Expand Up @@ -131,4 +132,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,15 @@ public Row apply(KV<String, SnapshotInfo> input) {
}
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ public void testFindTransformAndMakeItWork() {

assertEquals(
Sets.newHashSet(
"bootstrap_servers",
"bootstrapServers",
"topic",
"schema",
"auto_offset_reset_config",
"consumer_config_updates",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"format",
"confluent_schema_registry_subject",
"confluent_schema_registry_url",
"error_handling",
"file_descriptor_path",
"message_name"),
"confluentSchemaRegistrySubject",
"confluentSchemaRegistryUrl",
"errorHandling",
"fileDescriptorPath",
"messageName"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ Row getConfigurationRow() {
}
}

/** */
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
// May return an empty row (perhaps the underlying transform doesn't have any required
Expand All @@ -209,4 +208,15 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {

@Test
public void testGetConfigRowFromYamlString() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
Expand All @@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {

Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();

Row returnedRow =
Expand All @@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
Expand All @@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {

@Test
public void testBuildWithYamlString() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";

ManagedConfig config =
ManagedConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio

@Test
public void testReCreateTransformFromRowWithConfig() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";

ManagedConfig originalConfig =
ManagedConfig.builder()
Expand Down Expand Up @@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
.put("extra_string", "abc")
.put("extra_integer", 123)
.put("extraString", "abc")
.put("extraInteger", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
.withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
.withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));

runTestProviderTest(writeOp);
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/managed/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
# under the License.
#

extra_string: "abc"
extra_integer: 123
extraString: "abc"
extraInteger: 123
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2574,13 +2574,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
create_disposition=self._create_disposition,
write_disposition=self._write_disposition,
triggering_frequency_seconds=self._triggering_frequency,
auto_sharding=self._with_auto_sharding,
num_streams=self._num_storage_api_streams,
use_at_least_once_semantics=self._use_at_least_once,
error_handling={
createDisposition=self._create_disposition,
writeDisposition=self._write_disposition,
triggeringFrequencySeconds=self._triggering_frequency,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
useAtLeastOnceSemantics=self._use_at_least_once,
errorHandling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))

Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)

return (
input
Expand Down Expand Up @@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)

return (
input.pipeline
Expand Down
35 changes: 33 additions & 2 deletions sdks/python/apache_beam/transforms/external_transform_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ def snake_case_to_upper_camel_case(string):
return output


def snake_case_to_lower_camel_case(string):
"""Convert snake_case to lowerCamelCase"""
if len(string) <= 1:
return string.lower()
upper = snake_case_to_upper_camel_case(string)
return upper[0].lower() + upper[1:]


def camel_case_to_snake_case(string):
"""Convert camelCase to snake_case"""
arr = []
word = []
for i, n in enumerate(string):
# If seeing an upper letter after a lower letter, we just witnessed a word
# If seeing an upper letter and the next letter is lower, we may have just
# witnessed an all caps word
if n.isupper() and ((i > 0 and string[i - 1].islower()) or
(i + 1 < len(string) and string[i + 1].islower())):
arr.append(''.join(word))
word = [n.lower()]
else:
word.append(n.lower())
arr.append(''.join(word))
return '_'.join(arr).strip('_')


# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])

Expand All @@ -50,7 +76,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
fields_with_descriptions[field.name] = ParamInfo(
fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
Expand Down Expand Up @@ -79,11 +105,16 @@ def __init__(self, expansion_service=None, **kwargs):
expansion_service or self.default_expansion_service

def expand(self, input):
camel_case_kwargs = {
snake_case_to_lower_camel_case(k): v
for k, v in self._kwargs.items()
}

external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
**self._kwargs)
**camel_case_kwargs)

return input | external_schematransform

Expand Down
Loading