From 4979d65d5afb6017e88cbc65c412f17ab55970b1 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Thu, 13 Feb 2025 16:05:50 +0400 Subject: [PATCH 1/3] Do not use singleton ServiceLoader --- .../beam/sdk/schemas/utils/ConvertHelpers.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java index 7f2403035d97..637f59a9c3fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java @@ -57,10 +57,6 @@ "rawtypes" }) public class ConvertHelpers { - private static class SchemaInformationProviders { - private static final ServiceLoader INSTANCE = - ServiceLoader.load(SchemaInformationProvider.class); - } private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class); @@ -87,11 +83,15 @@ public static ConvertedSchemaInformation getConvertedSchemaInformation( ConvertedSchemaInformation schemaInformation = null; // Try to load schema information from loaded providers - for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) { - schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType); - if (schemaInformation != null) { - return schemaInformation; + try { + for (SchemaInformationProvider provider : ServiceLoader.load(SchemaInformationProvider.class)) { + schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType); + if (schemaInformation != null) { + return schemaInformation; + } } + } catch (Exception e) { + LOG.debug("No Schema information found for type {}", outputType, e); } if (schemaInformation == null) { @@ -107,7 +107,7 @@ public static ConvertedSchemaInformation getConvertedSchemaInformation( schemaRegistry.getToRowFunction(outputType), schemaRegistry.getFromRowFunction(outputType)); } catch (NoSuchSchemaException e) { - LOG.debug("No schema found for type " + outputType, e); + LOG.debug("No schema found for type {}", outputType, e); } FieldType unboxedType = null; // TODO: Properly handle nullable. From b19bccd00a550b66420205eb267aa19f3bc0b7ba Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Thu, 13 Feb 2025 22:07:04 +0400 Subject: [PATCH 2/3] Use AtomicReference lazy loading of SchemaInformationProvider list --- .../sdk/schemas/utils/ConvertHelpers.java | 98 +++++++++++-------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java index 637f59a9c3fd..da5ea872a8cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java @@ -22,7 +22,11 @@ import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; +import java.util.List; import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import net.bytebuddy.ByteBuddy; import net.bytebuddy.asm.AsmVisitorWrapper; import net.bytebuddy.description.type.TypeDescription; @@ -58,6 +62,21 @@ }) public class ConvertHelpers { + private static final AtomicReference> + SCHEMA_INFORMATION_PROVIDERS = new AtomicReference<>(); + + private static List getSchemaInformationProviders() { + return SCHEMA_INFORMATION_PROVIDERS.updateAndGet( + existing -> { + if (existing == null) { + ServiceLoader loader = + ServiceLoader.load(SchemaInformationProvider.class); + return StreamSupport.stream(loader.spliterator(), false).collect(Collectors.toList()); + } + return existing; + }); + } + private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class); /** Return value after converting a schema. */ @@ -81,10 +100,10 @@ public ConvertedSchemaInformation( public static ConvertedSchemaInformation getConvertedSchemaInformation( Schema inputSchema, TypeDescriptor outputType, SchemaRegistry schemaRegistry) { - ConvertedSchemaInformation schemaInformation = null; + ConvertedSchemaInformation schemaInformation; // Try to load schema information from loaded providers try { - for (SchemaInformationProvider provider : ServiceLoader.load(SchemaInformationProvider.class)) { + for (SchemaInformationProvider provider : getSchemaInformationProviders()) { schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType); if (schemaInformation != null) { return schemaInformation; @@ -94,48 +113,45 @@ public static ConvertedSchemaInformation getConvertedSchemaInformation( LOG.debug("No Schema information found for type {}", outputType, e); } - if (schemaInformation == null) { - // Otherwise, try to find a schema for the output type in the schema registry. - Schema outputSchema = null; - SchemaCoder outputSchemaCoder = null; - try { - outputSchema = schemaRegistry.getSchema(outputType); - outputSchemaCoder = - SchemaCoder.of( - outputSchema, - outputType, - schemaRegistry.getToRowFunction(outputType), - schemaRegistry.getFromRowFunction(outputType)); - } catch (NoSuchSchemaException e) { - LOG.debug("No schema found for type {}", outputType, e); - } - FieldType unboxedType = null; - // TODO: Properly handle nullable. - if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) { - // The schema is not convertible directly. Attempt to unbox it and see if the schema matches - // then. - Schema checkedSchema = inputSchema; - if (inputSchema.getFieldCount() == 1) { - unboxedType = inputSchema.getField(0).getType(); - if (unboxedType.getTypeName().isCompositeType() - && !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) { - checkedSchema = unboxedType.getRowSchema(); - } else { - checkedSchema = null; - } - } - if (checkedSchema != null) { - throw new RuntimeException( - "Cannot convert between types that don't have equivalent schemas." - + " input schema: " - + checkedSchema - + " output schema: " - + outputSchema); + // Otherwise, try to find a schema for the output type in the schema registry. + Schema outputSchema = null; + SchemaCoder outputSchemaCoder = null; + try { + outputSchema = schemaRegistry.getSchema(outputType); + outputSchemaCoder = + SchemaCoder.of( + outputSchema, + outputType, + schemaRegistry.getToRowFunction(outputType), + schemaRegistry.getFromRowFunction(outputType)); + } catch (NoSuchSchemaException e) { + LOG.debug("No schema found for type {}", outputType, e); + } + FieldType unboxedType = null; + // TODO: Properly handle nullable. + if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) { + // The schema is not convertible directly. Attempt to unbox it and see if the schema matches + // then. + Schema checkedSchema = inputSchema; + if (inputSchema.getFieldCount() == 1) { + unboxedType = inputSchema.getField(0).getType(); + if (unboxedType.getTypeName().isCompositeType() + && !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) { + checkedSchema = unboxedType.getRowSchema(); + } else { + checkedSchema = null; } } - schemaInformation = new ConvertedSchemaInformation(outputSchemaCoder, unboxedType); + if (checkedSchema != null) { + throw new RuntimeException( + "Cannot convert between types that don't have equivalent schemas." + + " input schema: " + + checkedSchema + + " output schema: " + + outputSchema); + } } - return schemaInformation; + return new ConvertedSchemaInformation<>(outputSchemaCoder, unboxedType); } /** From 8cf8611afcbc09dd0d59b26a7089a60503f993fa Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Mon, 17 Feb 2025 14:20:01 +0400 Subject: [PATCH 3/3] Use synchronize with lock --- .../sdk/schemas/utils/ConvertHelpers.java | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java index da5ea872a8cf..ff36faaaa1d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java @@ -22,11 +22,8 @@ import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; -import java.util.List; import java.util.ServiceLoader; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; +import javax.annotation.concurrent.GuardedBy; import net.bytebuddy.ByteBuddy; import net.bytebuddy.asm.AsmVisitorWrapper; import net.bytebuddy.description.type.TypeDescription; @@ -62,22 +59,14 @@ }) public class ConvertHelpers { - private static final AtomicReference> - SCHEMA_INFORMATION_PROVIDERS = new AtomicReference<>(); - - private static List getSchemaInformationProviders() { - return SCHEMA_INFORMATION_PROVIDERS.updateAndGet( - existing -> { - if (existing == null) { - ServiceLoader loader = - ServiceLoader.load(SchemaInformationProvider.class); - return StreamSupport.stream(loader.spliterator(), false).collect(Collectors.toList()); - } - return existing; - }); + private static class SchemaInformationProviders { + @GuardedBy("lock") + private static final ServiceLoader INSTANCE = + ServiceLoader.load(SchemaInformationProvider.class); } private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class); + private static final Object lock = new Object(); /** Return value after converting a schema. */ public static class ConvertedSchemaInformation implements Serializable { @@ -100,17 +89,19 @@ public ConvertedSchemaInformation( public static ConvertedSchemaInformation getConvertedSchemaInformation( Schema inputSchema, TypeDescriptor outputType, SchemaRegistry schemaRegistry) { - ConvertedSchemaInformation schemaInformation; // Try to load schema information from loaded providers try { - for (SchemaInformationProvider provider : getSchemaInformationProviders()) { - schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType); - if (schemaInformation != null) { - return schemaInformation; + synchronized (lock) { + for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) { + ConvertedSchemaInformation schemaInformation = + provider.getConvertedSchemaInformation(inputSchema, outputType); + if (schemaInformation != null) { + return schemaInformation; + } } } } catch (Exception e) { - LOG.debug("No Schema information found for type {}", outputType, e); + LOG.debug("No Schema information from loaded providers found for type {}", outputType, e); } // Otherwise, try to find a schema for the output type in the schema registry.