From f329ea8c5009ca65560067c0977348ef90872946 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 17 Dec 2021 18:10:08 -0600 Subject: [PATCH 1/2] Add option to throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder --- docs/ingestion/data-formats.md | 1 + .../SchemaRegistryBasedAvroBytesDecoder.java | 46 ++++++++++++------- .../data/input/AvroStreamInputFormatTest.java | 2 +- ...hemaRegistryBasedAvroBytesDecoderTest.java | 32 +++++++++++-- 4 files changed, 59 insertions(+), 22 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index fc56018c0154..6db959394a3d 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -442,6 +442,7 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | urls | Array | Specifies the URL endpoints of the multiple Schema Registry instances. | yes (if `url` is not provided) | | config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | | headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | +| failOnGetSchemaErrors | Boolean | If true, errors when getting the schema from the registry for a message are treated as fatal errors that will fail a task. If false, a parse exception that can be logged will be thrown instead. Defaults to true. | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 41847d562d84..7334b6aac60d 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -55,6 +55,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder private final Map config; private final Map headers; private final ObjectMapper jsonMapper; + private final Boolean failOnGetSchemaErrors; public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator @@ -64,6 +65,7 @@ public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("urls") @Nullable List urls, @JsonProperty("config") @Nullable Map config, @JsonProperty("headers") @Nullable Map headers, + @JsonProperty("failOnGetSchemaErrors") @Nullable Boolean failOnGetSchemaErrors, @JacksonInject @Json ObjectMapper jsonMapper ) { @@ -72,6 +74,7 @@ public SchemaRegistryBasedAvroBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; + this.failOnGetSchemaErrors = failOnGetSchemaErrors == null || failOnGetSchemaErrors; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); @@ -110,15 +113,22 @@ public Map getHeaders() return headers; } + @JsonProperty + public Boolean getFailOnGetSchemaErrors() + { + return failOnGetSchemaErrors; + } + //For UT only @VisibleForTesting - SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) + SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry, boolean failOnGetSchemaErrors) { this.url = null; this.capacity = Integer.MAX_VALUE; this.urls = null; this.config = null; this.headers = null; + this.failOnGetSchemaErrors = failOnGetSchemaErrors; this.registry = registry; this.jsonMapper = new ObjectMapper(); } @@ -141,10 +151,18 @@ public GenericRecord parse(ByteBuffer bytes) schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; } catch (IOException | RestClientException ex) { - throw new RE(ex, "Failed to get Avro schema: %s", id); + if (failOnGetSchemaErrors) { + throw new RE(ex, "Failed to get Avro schema: %s", id); + } else { + throw new ParseException(null, "Failed to get Avro schema: %s", id); + } } if (schema == null) { - throw new RE("Failed to find Avro schema: %s", id); + if (failOnGetSchemaErrors) { + throw new RE("Failed to find Avro schema: %s", id); + } else { + throw new ParseException(null, "Failed to find Avro schema: %s", id); + } } DatumReader reader = new GenericDatumReader<>(schema); try { @@ -164,24 +182,20 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; - - return Objects.equals(url, that.url) && - Objects.equals(capacity, that.capacity) && - Objects.equals(urls, that.urls) && - Objects.equals(config, that.config) && - Objects.equals(headers, that.headers); + return capacity == that.capacity + && Objects.equals(registry, that.registry) + && Objects.equals(url, that.url) + && Objects.equals(urls, that.urls) + && Objects.equals(config, that.config) + && Objects.equals(headers, that.headers) + && Objects.equals(jsonMapper, that.jsonMapper) + && Objects.equals(failOnGetSchemaErrors, that.failOnGetSchemaErrors); } @Override public int hashCode() { - int result = url != null ? url.hashCode() : 0; - result = 31 * result + capacity; - result = 31 * result + (urls != null ? urls.hashCode() : 0); - result = 31 * result + (config != null ? config.hashCode() : 0); - result = 31 * result + (headers != null ? headers.hashCode() : 0); - return result; + return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper, failOnGetSchemaErrors); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index 4a0c2df1cacf..97227b9f7ad4 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -155,7 +155,7 @@ public void testSerdeForSchemaRegistry() throws IOException { AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( flattenSpec, - new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null), + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null, null), false, false ); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index ec073c92750b..9ebf1f578248 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -120,7 +120,7 @@ public void testParse() throws Exception ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); } @Test(expected = ParseException.class) @@ -130,7 +130,7 @@ public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo() ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); } @Test(expected = ParseException.class) @@ -145,7 +145,7 @@ public void testParseCorruptedPartial() throws Exception ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); } @Test(expected = RE.class) @@ -156,7 +156,18 @@ public void testParseWrongSchemaType() throws Exception ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + } + + @Test(expected = ParseException.class) + public void testParseWrongSchemaTypeThrowParseException() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class)); + ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); + bb.rewind(); + // When + new SchemaRegistryBasedAvroBytesDecoder(registry, false).parse(bb); } @Test(expected = RE.class) @@ -167,7 +178,18 @@ public void testParseWrongId() throws Exception ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + } + + @Test(expected = ParseException.class) + public void testParseWrongIdThrowParseException() throws Exception + { + // Given + Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); + ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); + bb.rewind(); + // When + new SchemaRegistryBasedAvroBytesDecoder(registry, false).parse(bb); } private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException From 5889575b30b6f2a4ae8c8783aeb5361894948683 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 21 Dec 2021 13:23:27 -0600 Subject: [PATCH 2/2] Remove option --- docs/ingestion/data-formats.md | 7 +++- .../SchemaRegistryBasedAvroBytesDecoder.java | 32 +++-------------- .../data/input/AvroStreamInputFormatTest.java | 2 +- ...hemaRegistryBasedAvroBytesDecoderTest.java | 35 ++++--------------- 4 files changed, 18 insertions(+), 58 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 6db959394a3d..15872734082e 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -442,7 +442,6 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | urls | Array | Specifies the URL endpoints of the multiple Schema Registry instances. | yes (if `url` is not provided) | | config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | | headers | Json | To send headers to the Schema Registry. This can be supplied via a [DynamicConfigProvider](../operations/dynamic-config-provider.md) | no | -| failOnGetSchemaErrors | Boolean | If true, errors when getting the schema from the registry for a message are treated as fatal errors that will fail a task. If false, a parse exception that can be logged will be thrown instead. Defaults to true. | no | For a single schema registry instance, use Field `url` or `urls` for multi instances. @@ -489,6 +488,12 @@ Multiple Instances: ... ``` +###### Parse exceptions + +The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as `maxParseExceptions` and `maxSavedParseExceptions`: +- Failure to retrieve a schema due to misconfiguration or corrupt records (invalid schema IDs) +- Failure to decode an Avro message + ### Avro OCF To load the Avro OCF input format, load the Druid Avro extension ([`druid-avro-extensions`](../development/extensions-core/avro.md)). diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 7334b6aac60d..f7006cb02c21 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -35,7 +35,6 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.druid.guice.annotations.Json; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; @@ -55,7 +54,6 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder private final Map config; private final Map headers; private final ObjectMapper jsonMapper; - private final Boolean failOnGetSchemaErrors; public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; @JsonCreator @@ -65,7 +63,6 @@ public SchemaRegistryBasedAvroBytesDecoder( @JsonProperty("urls") @Nullable List urls, @JsonProperty("config") @Nullable Map config, @JsonProperty("headers") @Nullable Map headers, - @JsonProperty("failOnGetSchemaErrors") @Nullable Boolean failOnGetSchemaErrors, @JacksonInject @Json ObjectMapper jsonMapper ) { @@ -74,7 +71,6 @@ public SchemaRegistryBasedAvroBytesDecoder( this.urls = urls; this.config = config; this.headers = headers; - this.failOnGetSchemaErrors = failOnGetSchemaErrors == null || failOnGetSchemaErrors; this.jsonMapper = jsonMapper; if (url != null && !url.isEmpty()) { this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, DynamicConfigProviderUtils.extraConfigAndSetObjectMap(config, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper), DynamicConfigProviderUtils.extraConfigAndSetStringMap(headers, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, this.jsonMapper)); @@ -113,22 +109,15 @@ public Map getHeaders() return headers; } - @JsonProperty - public Boolean getFailOnGetSchemaErrors() - { - return failOnGetSchemaErrors; - } - //For UT only @VisibleForTesting - SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry, boolean failOnGetSchemaErrors) + SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) { this.url = null; this.capacity = Integer.MAX_VALUE; this.urls = null; this.config = null; this.headers = null; - this.failOnGetSchemaErrors = failOnGetSchemaErrors; this.registry = registry; this.jsonMapper = new ObjectMapper(); } @@ -151,18 +140,10 @@ public GenericRecord parse(ByteBuffer bytes) schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; } catch (IOException | RestClientException ex) { - if (failOnGetSchemaErrors) { - throw new RE(ex, "Failed to get Avro schema: %s", id); - } else { - throw new ParseException(null, "Failed to get Avro schema: %s", id); - } + throw new ParseException(null, "Failed to get Avro schema: %s", id); } if (schema == null) { - if (failOnGetSchemaErrors) { - throw new RE("Failed to find Avro schema: %s", id); - } else { - throw new ParseException(null, "Failed to find Avro schema: %s", id); - } + throw new ParseException(null, "Failed to find Avro schema: %s", id); } DatumReader reader = new GenericDatumReader<>(schema); try { @@ -184,18 +165,15 @@ public boolean equals(Object o) } SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; return capacity == that.capacity - && Objects.equals(registry, that.registry) && Objects.equals(url, that.url) && Objects.equals(urls, that.urls) && Objects.equals(config, that.config) - && Objects.equals(headers, that.headers) - && Objects.equals(jsonMapper, that.jsonMapper) - && Objects.equals(failOnGetSchemaErrors, that.failOnGetSchemaErrors); + && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper, failOnGetSchemaErrors); + return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index 97227b9f7ad4..4a0c2df1cacf 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -155,7 +155,7 @@ public void testSerdeForSchemaRegistry() throws IOException { AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( flattenSpec, - new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null, null), + new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null, null), false, false ); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 9ebf1f578248..9348e9486aef 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -33,7 +33,6 @@ import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; import org.junit.Assert; @@ -120,7 +119,7 @@ public void testParse() throws Exception ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } @Test(expected = ParseException.class) @@ -130,7 +129,7 @@ public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo() ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } @Test(expected = ParseException.class) @@ -145,10 +144,10 @@ public void testParseCorruptedPartial() throws Exception ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } - @Test(expected = RE.class) + @Test(expected = ParseException.class) public void testParseWrongSchemaType() throws Exception { // Given @@ -156,21 +155,10 @@ public void testParseWrongSchemaType() throws Exception ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } @Test(expected = ParseException.class) - public void testParseWrongSchemaTypeThrowParseException() throws Exception - { - // Given - Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class)); - ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); - bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry, false).parse(bb); - } - - @Test(expected = RE.class) public void testParseWrongId() throws Exception { // Given @@ -178,18 +166,7 @@ public void testParseWrongId() throws Exception ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry, true).parse(bb); - } - - @Test(expected = ParseException.class) - public void testParseWrongIdThrowParseException() throws Exception - { - // Given - Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); - ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); - bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry, false).parse(bb); + new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException