Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,14 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
try {
for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
// TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
// Load Avro provider from "extensions/avro" by any case.
registerProvider(providers, provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
registerProviderOptionally(providers, provider);
}
} else {
final String identifier =
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
checkState(
!providers.containsKey(identifier),
"Duplicate providers exist with identifier `%s` for class %s.",
identifier,
SchemaIOProvider.class);
registerProvider(providers, provider);
}
final String identifier =
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
checkState(
!providers.containsKey(identifier),
"Duplicate providers exist with identifier `%s` for class %s.",
identifier,
SchemaIOProvider.class);
registerProvider(providers, provider);
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
Expand All @@ -94,16 +79,6 @@ private void registerProvider(
new WriterBuilder(provider));
}

private void registerProviderOptionally(
Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
new ReaderBuilder(provider));
providers.putIfAbsent(
"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
new WriterBuilder(provider));
}

public static class Configuration {
String location = "";
byte[] config = new byte[0];
Expand Down
Loading