Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions docs/development/extensions-core/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,16 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |


For a single schema registry instance, use Field `url` or `urls` for multi instances.

Single Instance:
```json
...
"avroBytesDecoder" : {
Expand All @@ -181,6 +188,27 @@ Details can be found in Schema Registry [documentation](http://docs.confluent.io
...
```

Multiple Instances:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"schema.registry.basic.auth.credentials.source" : "USER_INFO",
"schema.registry.basic.auth.user.info" : "fred:letmein",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...

}
}
...
```

### Avro Hadoop Parser

This is for batch ingestion using the `HadoopDruidIndexer`. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, e.g.: `"avro.schema.input.value.path": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). Make sure to include "org.apache.druid.extensions:druid-avro-extensions" as an extension.
Expand Down
2 changes: 1 addition & 1 deletion extensions-core/avro-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<confluent.version>5.2.0</confluent.version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why CI is failing, could you update the entry for this in licenses.yaml. We use this file to ensure we keep our LICENSE information correct to make doing releases easier.

Based on the error message in CI, it looks like maybe the new version pulls in a few additional jars, so you might need to add a few additional entries for extensions/druid-avro-extensions (or exclude these jars if they aren't actually needed for operation of the extension)

</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
Expand All @@ -41,11 +44,18 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") String url,
@JsonProperty("capacity") Integer capacity
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would probably make sense to mark the url parameter as @Deprecated in favor of the new urls parameter. If you agree, I think we should remove mention of url from the avro extension documentation.

Additionally, would you mind adding a JSON serialization/deserialization unit test for this class to make sure the new properties work as expected?

@JsonProperty("config") @Nullable Map<String, ?> config,
@JsonProperty("headers") @Nullable Map<String, String> headers
)
{
int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity);
if (!url.isEmpty()) {
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers);
} else {
this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers);
}
}

//For UT only
Expand All @@ -63,7 +73,7 @@ public GenericRecord parse(ByteBuffer bytes)
int id = bytes.getInt(); // extract schema registry id
int length = bytes.limit() - 1 - 4;
int offset = bytes.position() + bytes.arrayOffset();
Schema schema = registry.getByID(id);
Schema schema = registry.getById(id);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void setUp()
public void testParse() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand All @@ -68,7 +68,7 @@ public void testParse() throws Exception
public void testParseCorrupted() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
Mockito.when(registry.getById(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand All @@ -81,7 +81,7 @@ public void testParseCorrupted() throws Exception
public void testParseWrongId() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
Mockito.when(registry.getById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand Down