Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions distribution/bin/check-licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,15 @@ def build_compatible_license_names():
compatible_licenses['Apache 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache-2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['The Apache License, Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License Version 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0'

Expand Down Expand Up @@ -260,6 +262,14 @@ def build_compatible_license_names():
compatible_licenses['Eclipse Public License - Version 1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['Eclipse Public License, Version 1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['Eclipse Public License v1.0'] = 'Eclipse Public License 1.0'
compatible_licenses['EPL 1.0'] = 'Eclipse Public License 1.0'

compatible_licenses['Eclipse Public License 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['The Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License - Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License, Version 2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['Eclipse Public License v2.0'] = 'Eclipse Public License 2.0'
compatible_licenses['EPL 2.0'] = 'Eclipse Public License 2.0'

compatible_licenses['Eclipse Distribution License 1.0'] = 'Eclipse Distribution License 1.0'
compatible_licenses['Eclipse Distribution License - v 1.0'] = 'Eclipse Distribution License 1.0'
Expand Down
31 changes: 31 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,13 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry | no |
| headers | Json | To send headers to the Schema Registry | no |

For a single schema registry instance, use Field `url` or `urls` for multi instances.

Single Instance:
```json
...
"avroBytesDecoder" : {
Expand All @@ -1026,6 +1032,31 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu
...
```

Multiple Instances:
```json
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>"
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
...
}
}
...
```

### Protobuf Parser

> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser.
Expand Down
18 changes: 17 additions & 1 deletion extensions-core/avro-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<confluent.version>5.5.1</confluent.version>
</properties>

<repositories>
Expand Down Expand Up @@ -169,6 +169,22 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema;
Expand All @@ -31,7 +33,10 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
Expand All @@ -40,12 +45,19 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder

@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") String url,
@JsonProperty("capacity") Integer capacity
@JsonProperty("url") @Deprecated String url,
@JsonProperty("capacity") Integer capacity,
@JsonProperty("urls") @Nullable List<String> urls,
@JsonProperty("config") @Nullable Map<String, ?> config,
@JsonProperty("headers") @Nullable Map<String, String> headers
)
{
int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity);
if (url != null && !url.isEmpty()) {
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers);
} else {
this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers);
}
}

//For UT only
Expand All @@ -63,7 +75,8 @@ public GenericRecord parse(ByteBuffer bytes)
int id = bytes.getInt(); // extract schema registry id
int length = bytes.limit() - 1 - 4;
int offset = bytes.position() + bytes.arrayOffset();
Schema schema = registry.getByID(id);
ParsedSchema parsedSchema = registry.getSchemaById(id);
Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.data.input.avro;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -48,11 +50,53 @@ public void setUp()
registry = Mockito.mock(SchemaRegistryClient.class);
}

@Test
public void testMultipleUrls() throws Exception
{
String json = "{\"urls\":[\"http://localhost\"],\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}

@Test
public void testUrl() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\"}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}

@Test
public void testConfig() throws Exception
{
String json = "{\"url\":\"http://localhost\",\"type\": \"schema_registry\", \"config\":{}}";
ObjectMapper mapper = new ObjectMapper();
SchemaRegistryBasedAvroBytesDecoder decoder;
decoder = (SchemaRegistryBasedAvroBytesDecoder) mapper
.readerFor(AvroBytesDecoder.class)
.readValue(json);

// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}

@Test
public void testParse() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand All @@ -68,7 +112,7 @@ public void testParse() throws Exception
public void testParseCorrupted() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand All @@ -81,7 +125,7 @@ public void testParseCorrupted() throws Exception
public void testParseWrongId() throws Exception
{
// Given
Mockito.when(registry.getByID(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
Expand Down
86 changes: 84 additions & 2 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# under the License.

name: conjunctive normal form conversion code, a variance aggregator algorithm, and Bloom filter adapted from Apache Hive
version:
Expand Down Expand Up @@ -3382,13 +3382,95 @@ notices:
---

name: Kafka Schema Registry Client
version: 3.0.1
version: 5.5.1
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- io.confluent: kafka-schema-registry-client
- io.confluent: common-config
- io.confluent: common-utils

---

name: Kafka Client
version: 5.5.1-ccs
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- org.apache.kafka: kafka-clients

---

name: swagger-annotations
version: 1.6.0
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- io.swagger: swagger-annotations

---

name: jersey-common
version: '2.30'
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
libraries:
- org.glassfish.jersey.core: jersey-common

---

name: osgi-resource-locator
version: 1.0.3
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- org.glassfish.hk2: osgi-resource-locator

---

name: jakarta.inject
version: 2.6.1
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- org.glassfish.hk2.external: jakarta.inject

---

name: jakarta.annotation
version: 1.3.5
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- jakarta.annotation: jakarta.annotation-api

---

name: javax.ws.rs-api
version: 2.1.1
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- javax.ws.rs: javax.ws.rs-api

---

name: jakarta.ws.rs-api
version: 2.1.6
license_category: binary
module: extensions/druid-avro-extensions
license_name: Eclipse Public License 2.0
libraries:
- jakarta.ws.rs: jakarta.ws.rs-api

---

name: Apache Velocity Engine
Expand Down
2 changes: 2 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ untrusted
useFilterCNF
uptime
uris
urls
useFieldDiscovery
v1
v2
Expand Down Expand Up @@ -938,6 +939,7 @@ ctrl
jsonLowercase
listDelimiter
timestampSpec
urls
- ../docs/ingestion/data-management.md
1GB
IOConfig
Expand Down