From d826c2265e63557f4341ca9636cad228623f6e67 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 2 Jul 2024 10:41:18 -0700 Subject: [PATCH 1/3] Handle RestClientException separately, instead of returning a generic error. - Add tests - Clean up the tests; remove the legacy expected exception pattern - Better test assertions --- .../SchemaRegistryBasedAvroBytesDecoder.java | 36 ++++- ...hemaRegistryBasedAvroBytesDecoderTest.java | 152 ++++++++++++++---- 2 files changed, 150 insertions(+), 38 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 167b036e7d1f..def090e8deeb 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -36,7 +36,10 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.utils.DynamicConfigProviderUtils; +import org.asynchttpclient.util.HttpConstants; import javax.annotation.Nullable; import java.io.IOException; @@ -139,11 +142,34 @@ public GenericRecord parse(ByteBuffer bytes) ParsedSchema parsedSchema = registry.getSchemaById(id); schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; } - catch (IOException | RestClientException ex) { - throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] from registry. Check if the schema " - + "exists in the registry. Otherwise it could mean that there is " - + "malformed data in the stream or data that doesn't conform to the schema " - + "specified.", id); + catch (IOException ex1) { + throw new ParseException( + null, + ex1, + "Failed to fetch Avro schema id[%s] from registry. Check if the schema exists in the registry. Otherwise it" + + " could mean that there is malformed data in the stream or data that doesn't conform to the schema" + + " specified.", + id + ); + } + catch (RestClientException ex2) { + if (ex2.getErrorCode() == 401) { + throw new ParseException( + null, + ex2, + "Failed to authenticate to schema registry for Avro schema id[%s]. Please check your credentials.", + id + ); + } + // For all other errors, just include the code and message received from the library. + throw new ParseException( + null, + ex2, + "Failed to fetch Avro schema id[%s] from registry. Error code[%s] and message[%s].", + id, + ex2.getErrorCode(), + ex2.getMessage() + ); } if (schema == null) { throw new ParseException(null, "No Avro schema id[%s] in registry", id); diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 7644b61bb106..75b97a5fcb86 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -25,6 +25,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; @@ -32,7 +33,10 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; import org.hamcrest.CoreMatchers; @@ -61,15 +65,15 @@ public void setUp() @Test public void testMultipleUrls() throws Exception { + // Given String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -78,15 +82,15 @@ public void testMultipleUrls() throws Exception @Test public void testUrl() throws Exception { + // Given String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -95,15 +99,15 @@ public void testUrl() throws Exception @Test public void testConfig() throws Exception { + // Given String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + + // When + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); // Then Assert.assertNotEquals(decoder.hashCode(), 0); @@ -121,20 +125,31 @@ public void testParse() throws Exception ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + GenericRecord parse = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // Then + Assert.assertEquals(schema, parse.getSchema());; } - @Test(expected = ParseException.class) + @Test public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo() { // Given ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString("Failed to decode avro message, not enough bytes to decode (2)") + ); } - @Test(expected = ParseException.class) + @Test public void testParseCorruptedPartial() throws Exception { // Given @@ -145,19 +160,30 @@ public void testParseCorruptedPartial() throws Exception byte[] bytes = getAvroDatum(schema, someAvroDatum); ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class)); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Failed to decode Avro message for schema id[1234]")); } - @Test(expected = ParseException.class) + @Test public void testParseWrongSchemaType() throws Exception { // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class)); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); - // When - new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No Avro schema id[1234] in registry")); } @Test @@ -167,7 +193,8 @@ public void testParseWrongId() throws Exception Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran")); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); bb.rewind(); - // When + + // When / Then final ParseException e = Assert.assertThrows( ParseException.class, () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) @@ -187,17 +214,20 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I @Test public void testParseHeader() throws JsonProcessingException { + // Given String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); - Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + // When + Map header = DynamicConfigProviderUtils.extraConfigAndSetStringMap( + decoder.getHeaders(), + SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, + new DefaultObjectMapper() + ); // Then Assert.assertEquals(3, header.size()); @@ -209,17 +239,20 @@ public void testParseHeader() throws JsonProcessingException @Test public void testParseConfig() throws JsonProcessingException { + // Given String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}"; ObjectMapper mapper = new DefaultObjectMapper(); mapper.setInjectableValues( new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()) ); - SchemaRegistryBasedAvroBytesDecoder decoder; - decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper - .readerFor(AvroBytesDecoder.class) - .readValue(json); + SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json); - Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper()); + // When + Map config = DynamicConfigProviderUtils.extraConfigAndSetStringMap( + decoder.getConfig(), + SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, + new DefaultObjectMapper() + ); // Then Assert.assertEquals(3, config.size()); @@ -227,4 +260,57 @@ public void testParseConfig() throws JsonProcessingException Assert.assertEquals("value.2", config.get("registry.config.prop.2")); Assert.assertEquals("value.3", config.get("registry.config.prop.3")); } + + @Test + public void testUnauthenticatedRestClientException() throws IOException, RestClientException + { + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenThrow(new RestClientException("unauthenticated", 401, 401)); + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + byte[] bytes = getAvroDatum(schema, someAvroDatum); + ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); + bb.rewind(); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class)); + MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("unauthenticated")); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString( + "Failed to authenticate to schema registry for Avro schema id[1234]. Please check your credentials" + ) + ); + } + + @Test + public void testRestClientException() throws IOException, RestClientException + { + Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) + .thenThrow(new RestClientException("resource doesn't exist", 404, 404)); + GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + Schema schema = SomeAvroDatum.getClassSchema(); + byte[] bytes = getAvroDatum(schema, someAvroDatum); + ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4); + bb.rewind(); + + // When / Then + final ParseException e = Assert.assertThrows( + ParseException.class, + () -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb) + ); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class)); + MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("resource doesn't exist")); + MatcherAssert.assertThat( + e.getMessage(), + CoreMatchers.containsString( + "Failed to fetch Avro schema id[1234] from registry." + + " Error code[404] and message[resource doesn't exist; error code: 404]." + ) + ); + } } From cd14331c51514d0995aba309e93990bcbe29f629 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 2 Jul 2024 10:59:29 -0700 Subject: [PATCH 2/3] Rename tests --- .../input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 75b97a5fcb86..c70c1f8887d4 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -262,8 +262,9 @@ public void testParseConfig() throws JsonProcessingException } @Test - public void testUnauthenticatedRestClientException() throws IOException, RestClientException + public void testParseWhenUnauthenticatedException() throws IOException, RestClientException { + // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) .thenThrow(new RestClientException("unauthenticated", 401, 401)); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); @@ -288,8 +289,9 @@ public void testUnauthenticatedRestClientException() throws IOException, RestCli } @Test - public void testRestClientException() throws IOException, RestClientException + public void testParseWhenResourceNotFoundException() throws IOException, RestClientException { + // Given Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))) .thenThrow(new RestClientException("resource doesn't exist", 404, 404)); GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum(); From 1d6905e3480b7a68aab5377060a71c54c4f733c1 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 2 Jul 2024 11:30:15 -0700 Subject: [PATCH 3/3] checkstyle fixes --- .../input/avro/SchemaRegistryBasedAvroBytesDecoder.java | 3 --- .../input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java | 6 ++---- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index def090e8deeb..c7c7438b1cdc 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -36,10 +36,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.utils.DynamicConfigProviderUtils; -import org.asynchttpclient.util.HttpConstants; import javax.annotation.Nullable; import java.io.IOException; diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index c70c1f8887d4..2aad88e8ff58 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -33,10 +33,7 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; -import org.apache.druid.error.DruidException; -import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; import org.hamcrest.CoreMatchers; @@ -124,11 +121,12 @@ public void testParse() throws Exception byte[] bytes = getAvroDatum(schema, someAvroDatum); ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes); bb.rewind(); + // When GenericRecord parse = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); // Then - Assert.assertEquals(schema, parse.getSchema());; + Assert.assertEquals(schema, parse.getSchema()); } @Test