Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,34 @@ public GenericRecord parse(ByteBuffer bytes)
ParsedSchema parsedSchema = registry.getSchemaById(id);
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
}
catch (IOException | RestClientException ex) {
throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] from registry. Check if the schema "
+ "exists in the registry. Otherwise it could mean that there is "
+ "malformed data in the stream or data that doesn't conform to the schema "
+ "specified.", id);
catch (IOException ex1) {
throw new ParseException(
null,
ex1,
"Failed to fetch Avro schema id[%s] from registry. Check if the schema exists in the registry. Otherwise it"
+ " could mean that there is malformed data in the stream or data that doesn't conform to the schema"
+ " specified.",
id
);
}
catch (RestClientException ex2) {
if (ex2.getErrorCode() == 401) {
throw new ParseException(
null,
ex2,
"Failed to authenticate to schema registry for Avro schema id[%s]. Please check your credentials.",
id
);
}
// For all other errors, just include the code and message received from the library.
throw new ParseException(
null,
ex2,
"Failed to fetch Avro schema id[%s] from registry. Error code[%s] and message[%s].",
id,
ex2.getErrorCode(),
ex2.getMessage()
);
}
if (schema == null) {
throw new ParseException(null, "No Avro schema id[%s] in registry", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
Expand Down Expand Up @@ -61,15 +62,15 @@ public void setUp()
@Test
public void testMultipleUrls() throws Exception
{
// Given
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -78,15 +79,15 @@ public void testMultipleUrls() throws Exception
@Test
public void testUrl() throws Exception
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -95,15 +96,15 @@ public void testUrl() throws Exception
@Test
public void testConfig() throws Exception
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// When
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
Expand All @@ -120,21 +121,33 @@ public void testParse() throws Exception
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
bb.rewind();

// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
GenericRecord parse = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// Then
Assert.assertEquals(schema, parse.getSchema());
}

@Test(expected = ParseException.class)
@Test
public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
{
// Given
ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString("Failed to decode avro message, not enough bytes to decode (2)")
);
}

@Test(expected = ParseException.class)
@Test
public void testParseCorruptedPartial() throws Exception
{
// Given
Expand All @@ -145,19 +158,30 @@ public void testParseCorruptedPartial() throws Exception
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Failed to decode Avro message for schema id[1234]"));
}

@Test(expected = ParseException.class)
@Test
public void testParseWrongSchemaType() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("No Avro schema id[1234] in registry"));
}

@Test
Expand All @@ -167,7 +191,8 @@ public void testParseWrongId() throws Exception
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind();
// When

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
Expand All @@ -187,17 +212,20 @@ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws I
@Test
public void testParseHeader() throws JsonProcessingException
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{},\"headers\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.header.prop.2\":\"value.2\", \"registry.header.prop.3\":\"value.3\"}},\"registry.header.prop.1\":\"value.1\",\"registry.header.prop.2\":\"value.4\"}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getHeaders(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// When
Map<String, String> header = DynamicConfigProviderUtils.extraConfigAndSetStringMap(
decoder.getHeaders(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
new DefaultObjectMapper()
);

// Then
Assert.assertEquals(3, header.size());
Expand All @@ -209,22 +237,80 @@ public void testParseHeader() throws JsonProcessingException
@Test
public void testParseConfig() throws JsonProcessingException
{
// Given
String json = "{\"url\":\"http://localhost\",\"type\":\"schema_registry\",\"config\":{\"druid.dynamic.config.provider\":{\"type\":\"mapString\", \"config\":{\"registry.config.prop.2\":\"value.2\", \"registry.config.prop.3\":\"value.3\"}},\"registry.config.prop.1\":\"value.1\",\"registry.config.prop.2\":\"value.4\"},\"headers\":{}}";
ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())
);
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);
SchemaRegistryBasedAvroBytesDecoder decoder = mapper.readerFor(AvroBytesDecoder.class).readValue(json);

Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(decoder.getConfig(), SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, new DefaultObjectMapper());
// When
Map<String, ?> config = DynamicConfigProviderUtils.extraConfigAndSetStringMap(
decoder.getConfig(),
SchemaRegistryBasedAvroBytesDecoder.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
new DefaultObjectMapper()
);

// Then
Assert.assertEquals(3, config.size());
Assert.assertEquals("value.1", config.get("registry.config.prop.1"));
Assert.assertEquals("value.2", config.get("registry.config.prop.2"));
Assert.assertEquals("value.3", config.get("registry.config.prop.3"));
}

@Test
public void testParseWhenUnauthenticatedException() throws IOException, RestClientException
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenThrow(new RestClientException("unauthenticated", 401, 401));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("unauthenticated"));
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString(
"Failed to authenticate to schema registry for Avro schema id[1234]. Please check your credentials"
)
);
}

@Test
public void testParseWhenResourceNotFoundException() throws IOException, RestClientException
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
.thenThrow(new RestClientException("resource doesn't exist", 404, 404));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
bb.rewind();

// When / Then
final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RestClientException.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("resource doesn't exist"));
MatcherAssert.assertThat(
e.getMessage(),
CoreMatchers.containsString(
"Failed to fetch Avro schema id[1234] from registry."
+ " Error code[404] and message[resource doesn't exist; error code: 404]."
)
);
}
}