From 08d767ac5ed40e4694767cd2124f3235b22e9db4 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Wed, 17 Aug 2022 15:50:55 +0200 Subject: [PATCH 1/6] #12912 Fix KafkaEmitter not emitting queryType for a native query --- .../emitter/kafka/EventToJsonSerializer.java | 103 +++++++ .../druid/emitter/kafka/KafkaEmitter.java | 23 +- .../kafka/EventToJsonSerializerTest.java | 269 ++++++++++++++++++ 3 files changed, 382 insertions(+), 13 deletions(-) create mode 100644 extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java create mode 100644 extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java new file mode 100644 index 000000000000..e2b3f1e6d5d2 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.druid.java.util.emitter.core.Event; + +import java.util.HashMap; +import java.util.Map; + +/** + * Serializes the any event to a JSON string prepresntation + */ +public class EventToJsonSerializer +{ + private final ObjectMapper objectMapper; + private final Map additionalProperties; + + /** + * Returns serializer without additional properties + */ + public static EventToJsonSerializer of(final ObjectMapper objectMapper) + { + return new EventToJsonSerializer(objectMapper, new HashMap<>()); + } + + /** + * Constructs serializer with given ObjectMapper and additional properties + * + * @param objectMapper - will be used for serializaing data to JSON String + * @param additionalProperties - additional properties that will be added to the output JSON + */ + public EventToJsonSerializer(final ObjectMapper objectMapper, final Map additionalProperties) + { + this.objectMapper = objectMapper; + this.additionalProperties = additionalProperties; + } + + /** + * Adds the value to the additional properties if the value is not null + */ + public EventToJsonSerializer withProperty(String name, Object value) + { + if (value != null) { + additionalProperties.put(name, value); + } + return this; + } + + /** + * Serializes Event to a JSON String representation + */ + public String serialize(Event event) throws JsonProcessingException + { + JsonBuilder builder = new JsonBuilder(objectMapper); + + event.toMap().forEach(builder::putKeyValue); + additionalProperties.forEach(builder::putKeyValue); + + return objectMapper.writeValueAsString(builder.build()); + } + + private static class JsonBuilder + { + private final ObjectNode json; + private final ObjectMapper objectMapper; + + private JsonBuilder(final ObjectMapper objectMapper) + { + this.json = objectMapper.createObjectNode(); + this.objectMapper = objectMapper; + } + + private void putKeyValue(String key, Object value) + { + json.set(key, objectMapper.valueToTree(value)); + } + + private ObjectNode build() + { + return json; + } + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index cdab2a7d877e..9986aaa372eb 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -39,7 +38,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; -import java.util.Map; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -60,7 +58,7 @@ public class KafkaEmitter implements Emitter private final KafkaEmitterConfig config; private final Producer producer; - private final ObjectMapper jsonMapper; + private final EventToJsonSerializer jsonSerializer; private final MemoryBoundLinkedBlockingQueue metricQueue; private final MemoryBoundLinkedBlockingQueue alertQueue; private final MemoryBoundLinkedBlockingQueue requestQueue; @@ -71,7 +69,8 @@ public class KafkaEmitter implements Emitter public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; - this.jsonMapper = jsonMapper; + this.jsonSerializer = EventToJsonSerializer.of(jsonMapper) + .withProperty("clusterName", config.getClusterName()); this.producer = setKafkaProducer(); // same with kafka producer's buffer.memory long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() @@ -126,8 +125,12 @@ public void start() scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS); } scheduler.scheduleWithFixedDelay(() -> { - log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", - metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get() + log.info( + "Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", + metricLost.get(), + alertLost.get(), + requestLost.get(), + invalidLost.get() ); }, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); @@ -166,14 +169,8 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue resultBuilder = ImmutableMap.builder().putAll(event.toMap()); - if (config.getClusterName() != null) { - resultBuilder.put("clusterName", config.getClusterName()); - } - Map result = resultBuilder.build(); - try { - String resultJson = jsonMapper.writeValueAsString(result); + String resultJson = jsonSerializer.serialize(event); ObjectContainer objectContainer = new ObjectContainer<>( resultJson, StringUtils.toUtf8(resultJson).length diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java new file mode 100644 index 000000000000..7c446a51337d --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import junit.framework.TestCase; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.server.QueryStats; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; +import org.joda.time.DateTime; + +import java.util.Collections; + +public class EventToJsonSerializerTest extends TestCase +{ + + public void testSerializeServiceMetricEvent() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); + DateTime timestamp = DateTimes.nowUtc(); + Event event = ServiceMetricEvent.builder() + .setFeed("my-feed") + .build(timestamp, "m1", 1) + .build("my-service", "my-host"); + + String actual = serializer.serialize(event); + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"metric\":\"m1\"," + + "\"value\":1," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"" + + "}"; + assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + public void testSerializeAlertEvent() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); + DateTime timestamp = DateTimes.nowUtc(); + Event event = new AlertEvent( + timestamp, + "my-service", + "my-host", + AlertEvent.Severity.DEFAULT, + "my-description", + Collections.emptyMap() + ); + + String actual = serializer.serialize(event); + String expected = "{" + + "\"feed\":\"alerts\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"severity\":\"" + + AlertEvent.Severity.DEFAULT + + "\"," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"," + + "\"description\":\"my-description\"," + + "\"data\":{}" + + "}"; + assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + public void testSerializeSqlLogRequest() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); + DateTime timestamp = DateTimes.nowUtc(); + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder( + "requests", + RequestLogLine.forSql( + "SELECT * FROM dummy", + Collections.emptyMap(), + timestamp, + "127.0.0.1", + new QueryStats(ImmutableMap.of()) + ) + ) + .build("my-service", "my-host"); + + String actual = serializer.serialize(event); + String expected = "{" + + "\"feed\":\"requests\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"," + + "\"sql\":\"SELECT * FROM dummy\"," + + "\"sqlQueryContext\":{}," + + "\"queryStats\":{}," + + "\"remoteAddr\":\"127.0.0.1\"" + + "}"; + + assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + public void testSerializeNativeLogRequest() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); + + RequestLogLine nativeLine = RequestLogLine.forNative( + new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of( + "2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value") + ), + DateTimes.of(2019, 12, 12, 3, 1), + "127.0.0.1", + new QueryStats(ImmutableMap.of( + "query/time", + 13L, + "query/bytes", + 10L, + "success", + true, + "identity", + "allowAll" + )) + ); + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder("my-feed", nativeLine) + .build("my-service", "my-host"); + + String actual = serializer.serialize(event); + String queryString = "{" + + "\"queryType\":\"timeseries\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," + + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," + + "\"descending\":true," + + "\"granularity\":{\"type\":\"all\"}," + + "\"limit\":5," + + "\"context\":{\"key\":\"value\"}" + + "}"; + + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"host\":\"my-host\"," + + "\"service\":\"my-service\"," + + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," + + "\"query\":" + + queryString + + "," + + "\"remoteAddr\":\"127.0.0.1\"," + + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; + + assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper) + .withProperty("number", 1) + .withProperty("text", "some text") + .withProperty("null", null); + + RequestLogLine nativeLine = RequestLogLine.forNative( + new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of( + "2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value") + ), + DateTimes.of(2019, 12, 12, 3, 1), + "127.0.0.1", + new QueryStats(ImmutableMap.of( + "query/time", + 13L, + "query/bytes", + 10L, + "success", + true, + "identity", + "allowAll" + )) + ); + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder("my-feed", nativeLine) + .build("my-service", "my-host"); + + String actual = serializer.serialize(event); + String queryString = "{" + + "\"queryType\":\"timeseries\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," + + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," + + "\"descending\":true," + + "\"granularity\":{\"type\":\"all\"}," + + "\"limit\":5," + + "\"context\":{\"key\":\"value\"}" + + "}"; + + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"host\":\"my-host\"," + + "\"service\":\"my-service\"," + + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," + + "\"query\":" + + queryString + + "," + + "\"remoteAddr\":\"127.0.0.1\"," + + "\"number\":1," + + "\"text\":\"some text\"," + + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; + + assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } +} From 5505d730a17a5d5846f0473be2e6d64a4f7784c1 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Wed, 17 Aug 2022 19:50:36 +0200 Subject: [PATCH 2/6] Remove dependency on joda DateTime in UnitTests --- .../emitter/kafka/EventToJsonSerializerTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java index 7c446a51337d..b632a3746662 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java @@ -38,7 +38,6 @@ import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; -import org.joda.time.DateTime; import java.util.Collections; @@ -49,10 +48,10 @@ public void testSerializeServiceMetricEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); - DateTime timestamp = DateTimes.nowUtc(); + String timestamp = "2022-08-17T18:51:00.000Z"; Event event = ServiceMetricEvent.builder() .setFeed("my-feed") - .build(timestamp, "m1", 1) + .build(DateTimes.of(timestamp), "m1", 1) .build("my-service", "my-host"); String actual = serializer.serialize(event); @@ -73,9 +72,9 @@ public void testSerializeAlertEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); - DateTime timestamp = DateTimes.nowUtc(); + String timestamp = "2022-08-17T18:51:00.000Z"; Event event = new AlertEvent( - timestamp, + DateTimes.of(timestamp), "my-service", "my-host", AlertEvent.Severity.DEFAULT, @@ -104,7 +103,7 @@ public void testSerializeSqlLogRequest() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); - DateTime timestamp = DateTimes.nowUtc(); + String timestamp = "2022-08-17T18:51:00.000Z"; Event event = DefaultRequestLogEventBuilderFactory.instance() .createRequestLogEventBuilder( @@ -112,7 +111,7 @@ public void testSerializeSqlLogRequest() throws JsonProcessingException RequestLogLine.forSql( "SELECT * FROM dummy", Collections.emptyMap(), - timestamp, + DateTimes.of(timestamp), "127.0.0.1", new QueryStats(ImmutableMap.of()) ) From 56237efe0bcbadd2f07c2cb0fdbf666e19b53e7d Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Mon, 22 Aug 2022 11:30:33 +0200 Subject: [PATCH 3/6] #12912 Replace bare Map with EventMap in Event#toMap --- .../druid/java/util/emitter/core/Event.java | 4 +- .../java/util/emitter/core/EventMap.java | 99 +++++++++++++++++ .../util/emitter/core/EventMapSerializer.java | 40 +++++++ .../java/util/emitter/service/AlertEvent.java | 6 +- .../emitter/service/ServiceMetricEvent.java | 45 ++++---- .../java/util/emitter/core/IntEvent.java | 4 +- .../java/util/emitter/service/UnitEvent.java | 15 ++- .../emitter/kafka/EventToJsonSerializer.java | 103 ------------------ .../druid/emitter/kafka/KafkaEmitter.java | 14 ++- .../kafka/EventToJsonSerializerTest.java | 27 ++--- .../OpenTelemetryEmitterTest.java | 5 +- .../server/log/DefaultRequestLogEvent.java | 33 +++--- 12 files changed, 223 insertions(+), 172 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java create mode 100644 core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java delete mode 100644 extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java index 37e6e57bf5f4..1f5a20c6188f 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java @@ -19,13 +19,11 @@ package org.apache.druid.java.util.emitter.core; -import java.util.Map; - /** */ public interface Event { - Map toMap(); + EventMap toMap(); String getFeed(); } diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java new file mode 100644 index 000000000000..a4e0267e749f --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.HashMap; +import java.util.Map; + +/** + * EventMap can be safely serialzed to JSON using Jackson serializer as it respects the polymorphic annotations + * on entires (unlike standard Map). The example of polymorphic class is a query interface, where different native + * query types are resolved by additional field called "queryType". This implementation esures that the annotation on + * the values are respected during serialization. + */ +@JsonSerialize(using = EventMapSerializer.class) +public class EventMap extends HashMap +{ + /** + * Returns builder with Fluent API to build EventMap instance using method chaining + */ + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + + private final EventMap map; + + protected Builder() + { + map = new EventMap(); + } + + /** + * Adds key -> value pair to the map + */ + public Builder put(String key, Object value) + { + map.put(key, value); + return this; + } + + /** + * Adds key -> value pair to the map only if value is not null + */ + public Builder putNonNull(String key, Object value) + { + if (value != null) { + map.put(key, value); + } + return this; + } + + /** + * Adds map entry to the map + */ + public Builder put(Map.Entry entry) + { + map.put(entry.getKey(), entry.getValue()); + return this; + } + + /** + * Adds all key -> value pairs from other map + */ + public Builder putAll(Map other) + { + map.putAll(other); + return this; + } + + /** Builds and returns the EventMap */ + public EventMap build() + { + return map; + } + } + +} diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java new file mode 100644 index 000000000000..ff1121973ee8 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.util.Map; + +public class EventMapSerializer extends JsonSerializer +{ + @Override + public void serialize(EventMap map, JsonGenerator gen, SerializerProvider serializers) throws IOException + { + gen.writeStartObject(); + for (Map.Entry entry : map.entrySet()) { + gen.writeObjectField(entry.getKey(), entry.getValue()); + } + gen.writeEndObject(); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java b/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java index a5d805dc14fb..e2a7987c15ae 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java @@ -24,6 +24,7 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.joda.time.DateTime; import java.util.Collections; @@ -148,9 +149,10 @@ public Map getDataMap() @Override @JsonValue - public Map toMap() + public EventMap toMap() { - return ImmutableMap.builder() + return EventMap + .builder() .put("feed", getFeed()) .put("timestamp", createdTime.toString()) .putAll(serviceDimensions) diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java index 65ddc70341cb..f40491ce7016 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.joda.time.DateTime; import java.util.Arrays; @@ -34,6 +35,7 @@ import java.util.TreeMap; /** + * */ @PublicApi public class ServiceMetricEvent implements Event @@ -105,28 +107,29 @@ public Number getValue() @Override @JsonValue - public Map toMap() + public EventMap toMap() { - return ImmutableMap.builder() - .put("feed", getFeed()) - .put("timestamp", createdTime.toString()) - .putAll(serviceDims) - .put("metric", metric) - .put("value", value) - .putAll( - Maps.filterEntries( - userDims, - new Predicate>() - { - @Override - public boolean apply(Map.Entry input) - { - return input.getKey() != null; - } - } - ) - ) - .build(); + return EventMap + .builder() + .put("feed", getFeed()) + .put("timestamp", createdTime.toString()) + .putAll(serviceDims) + .put("metric", metric) + .put("value", value) + .putAll( + Maps.filterEntries( + userDims, + new Predicate>() + { + @Override + public boolean apply(Map.Entry input) + { + return input.getKey() != null; + } + } + ) + ) + .build(); } public static class Builder diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java index 3fae083e155e..143504706e41 100644 --- a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java @@ -19,8 +19,6 @@ package org.apache.druid.java.util.emitter.core; -import java.util.Map; - class IntEvent implements Event { int index; @@ -30,7 +28,7 @@ class IntEvent implements Event } @Override - public Map toMap() + public EventMap toMap() { return null; } diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java b/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java index d65769d9f470..f8e408db7091 100644 --- a/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java +++ b/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java @@ -23,12 +23,13 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** + * */ public class UnitEvent implements Event { @@ -50,12 +51,14 @@ public UnitEvent(String feed, Number value, Map dimensions) @Override @JsonValue - public Map toMap() + public EventMap toMap() { - Map result = new HashMap<>(dimensions); - result.put("feed", feed); - result.put("metrics", ImmutableMap.of("value", value)); - return ImmutableMap.copyOf(result); + return EventMap + .builder() + .putAll(dimensions) + .put("feed", feed) + .put("metrics", ImmutableMap.of("value", value)) + .build(); } @Override diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java deleted file mode 100644 index e2b3f1e6d5d2..000000000000 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/EventToJsonSerializer.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.emitter.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.druid.java.util.emitter.core.Event; - -import java.util.HashMap; -import java.util.Map; - -/** - * Serializes the any event to a JSON string prepresntation - */ -public class EventToJsonSerializer -{ - private final ObjectMapper objectMapper; - private final Map additionalProperties; - - /** - * Returns serializer without additional properties - */ - public static EventToJsonSerializer of(final ObjectMapper objectMapper) - { - return new EventToJsonSerializer(objectMapper, new HashMap<>()); - } - - /** - * Constructs serializer with given ObjectMapper and additional properties - * - * @param objectMapper - will be used for serializaing data to JSON String - * @param additionalProperties - additional properties that will be added to the output JSON - */ - public EventToJsonSerializer(final ObjectMapper objectMapper, final Map additionalProperties) - { - this.objectMapper = objectMapper; - this.additionalProperties = additionalProperties; - } - - /** - * Adds the value to the additional properties if the value is not null - */ - public EventToJsonSerializer withProperty(String name, Object value) - { - if (value != null) { - additionalProperties.put(name, value); - } - return this; - } - - /** - * Serializes Event to a JSON String representation - */ - public String serialize(Event event) throws JsonProcessingException - { - JsonBuilder builder = new JsonBuilder(objectMapper); - - event.toMap().forEach(builder::putKeyValue); - additionalProperties.forEach(builder::putKeyValue); - - return objectMapper.writeValueAsString(builder.build()); - } - - private static class JsonBuilder - { - private final ObjectNode json; - private final ObjectMapper objectMapper; - - private JsonBuilder(final ObjectMapper objectMapper) - { - this.json = objectMapper.createObjectNode(); - this.objectMapper = objectMapper; - } - - private void putKeyValue(String key, Object value) - { - json.set(key, objectMapper.valueToTree(value)); - } - - private ObjectNode build() - { - return json; - } - } -} diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 9986aaa372eb..1534c72924a4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.log.RequestLogEvent; @@ -58,7 +59,7 @@ public class KafkaEmitter implements Emitter private final KafkaEmitterConfig config; private final Producer producer; - private final EventToJsonSerializer jsonSerializer; + private final ObjectMapper jsonMapper; private final MemoryBoundLinkedBlockingQueue metricQueue; private final MemoryBoundLinkedBlockingQueue alertQueue; private final MemoryBoundLinkedBlockingQueue requestQueue; @@ -69,8 +70,7 @@ public class KafkaEmitter implements Emitter public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) { this.config = config; - this.jsonSerializer = EventToJsonSerializer.of(jsonMapper) - .withProperty("clusterName", config.getClusterName()); + this.jsonMapper = jsonMapper; this.producer = setKafkaProducer(); // same with kafka producer's buffer.memory long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() @@ -170,7 +170,13 @@ public void emit(final Event event) { if (event != null) { try { - String resultJson = jsonSerializer.serialize(event); + EventMap map = event.toMap(); + if (config.getClusterName() != null) { + map.put("clusterName", config.getClusterName()); + } + + String resultJson = jsonMapper.writeValueAsString(map); + ObjectContainer objectContainer = new ObjectContainer<>( resultJson, StringUtils.toUtf8(resultJson).length diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java index b632a3746662..185a0843ce12 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.TableDataSource; @@ -47,14 +48,13 @@ public class EventToJsonSerializerTest extends TestCase public void testSerializeServiceMetricEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); String timestamp = "2022-08-17T18:51:00.000Z"; Event event = ServiceMetricEvent.builder() .setFeed("my-feed") .build(DateTimes.of(timestamp), "m1", 1) .build("my-service", "my-host"); - String actual = serializer.serialize(event); + String actual = mapper.writeValueAsString(event.toMap()); String expected = "{" + "\"feed\":\"my-feed\"," + "\"timestamp\":\"" @@ -71,7 +71,6 @@ public void testSerializeServiceMetricEvent() throws JsonProcessingException public void testSerializeAlertEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); String timestamp = "2022-08-17T18:51:00.000Z"; Event event = new AlertEvent( DateTimes.of(timestamp), @@ -82,7 +81,7 @@ public void testSerializeAlertEvent() throws JsonProcessingException Collections.emptyMap() ); - String actual = serializer.serialize(event); + String actual = mapper.writeValueAsString(event.toMap()); String expected = "{" + "\"feed\":\"alerts\"," + "\"timestamp\":\"" @@ -102,7 +101,6 @@ public void testSerializeAlertEvent() throws JsonProcessingException public void testSerializeSqlLogRequest() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); String timestamp = "2022-08-17T18:51:00.000Z"; Event event = DefaultRequestLogEventBuilderFactory.instance() @@ -118,7 +116,7 @@ public void testSerializeSqlLogRequest() throws JsonProcessingException ) .build("my-service", "my-host"); - String actual = serializer.serialize(event); + String actual = mapper.writeValueAsString(event.toMap()); String expected = "{" + "\"feed\":\"requests\"," + "\"timestamp\":\"" @@ -138,7 +136,6 @@ public void testSerializeSqlLogRequest() throws JsonProcessingException public void testSerializeNativeLogRequest() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper); RequestLogLine nativeLine = RequestLogLine.forNative( new TimeseriesQuery( @@ -173,7 +170,7 @@ public void testSerializeNativeLogRequest() throws JsonProcessingException .createRequestLogEventBuilder("my-feed", nativeLine) .build("my-service", "my-host"); - String actual = serializer.serialize(event); + String actual = mapper.writeValueAsString(event.toMap()); String queryString = "{" + "\"queryType\":\"timeseries\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," @@ -201,10 +198,7 @@ public void testSerializeNativeLogRequest() throws JsonProcessingException public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); - EventToJsonSerializer serializer = EventToJsonSerializer.of(mapper) - .withProperty("number", 1) - .withProperty("text", "some text") - .withProperty("null", null); + RequestLogLine nativeLine = RequestLogLine.forNative( new TimeseriesQuery( @@ -239,7 +233,14 @@ public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonP .createRequestLogEventBuilder("my-feed", nativeLine) .build("my-service", "my-host"); - String actual = serializer.serialize(event); + EventMap map = EventMap.builder() + .putNonNull("number", 1) + .putNonNull("text", "some text") + .putNonNull("null", null) + .putAll(event.toMap()) + .build(); + + String actual = mapper.writeValueAsString(map); String queryString = "{" + "\"queryType\":\"timeseries\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," diff --git a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java index 1db937498f97..d20eaa430253 100644 --- a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java +++ b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java @@ -32,6 +32,7 @@ import io.opentelemetry.sdk.trace.export.SpanExporter; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.joda.time.DateTime; import org.junit.Assert; @@ -98,9 +99,9 @@ public void testNoEmitNotServiceMetric() new Event() { @Override - public Map toMap() + public EventMap toMap() { - return Collections.emptyMap(); + return new EventMap(); } @Override diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java index 4744b2f6d09f..c5141d39cbed 100644 --- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java +++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java @@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.collect.ImmutableMap; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.query.Query; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.joda.time.DateTime; -import java.util.HashMap; import java.util.Map; /** @@ -55,23 +55,26 @@ public final class DefaultRequestLogEvent implements RequestLogEvent */ @JsonValue(value = false) @Override - public Map toMap() + public EventMap toMap() { - final Map map = new HashMap<>(); - map.put("feed", getFeed()); - map.put("timestamp", getCreatedTime()); - map.put("service", getService()); - map.put("host", getHost()); - if (getQuery() != null) { - map.put("query", getQuery()); - } + final EventMap.Builder builder = EventMap + .builder() + .put("feed", getFeed()) + .put("timestamp", getCreatedTime()) + .put("service", getService()) + .put("host", getHost()) + .putNonNull("query", getQuery()); + if (getSql() != null) { - map.put("sql", getSql()); - map.put("sqlQueryContext", getSqlQueryContext()); + builder.put("sql", getSql()) + .put("sqlQueryContext", getSqlQueryContext()); } - map.put("remoteAddr", getRemoteAddr()); - map.put("queryStats", getQueryStats()); - return map; + + builder + .put("remoteAddr", getRemoteAddr()) + .put("queryStats", getQueryStats()); + + return builder.build(); } @Override From 59e2b383d368a0306837c30aaad3f58d7b08d309 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Mon, 22 Aug 2022 14:49:08 +0200 Subject: [PATCH 4/6] #12912 Use @Test instead of TestCase --- .../kafka/EventToJsonSerializerTest.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java index 185a0843ce12..8ffb6ca11012 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import junit.framework.TestCase; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -39,12 +38,16 @@ import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; +import org.junit.Assert; +import org.junit.Test; import java.util.Collections; -public class EventToJsonSerializerTest extends TestCase + +public class EventToJsonSerializerTest { + @Test public void testSerializeServiceMetricEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); @@ -65,9 +68,10 @@ public void testSerializeServiceMetricEvent() throws JsonProcessingException + "\"service\":\"my-service\"," + "\"host\":\"my-host\"" + "}"; - assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); } + @Test public void testSerializeAlertEvent() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); @@ -95,9 +99,10 @@ public void testSerializeAlertEvent() throws JsonProcessingException + "\"description\":\"my-description\"," + "\"data\":{}" + "}"; - assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); } + @Test public void testSerializeSqlLogRequest() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); @@ -130,9 +135,10 @@ public void testSerializeSqlLogRequest() throws JsonProcessingException + "\"remoteAddr\":\"127.0.0.1\"" + "}"; - assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); } + @Test public void testSerializeNativeLogRequest() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); @@ -192,9 +198,10 @@ public void testSerializeNativeLogRequest() throws JsonProcessingException + "\"remoteAddr\":\"127.0.0.1\"," + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; - assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); } + @Test public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonProcessingException { ObjectMapper mapper = new DefaultObjectMapper(); @@ -264,6 +271,6 @@ public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonP + "\"text\":\"some text\"," + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; - assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); } } From 4cb8ee08096a0e9a1a075f96065e4c7015eae522 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Tue, 23 Aug 2022 10:56:45 +0200 Subject: [PATCH 5/6] #12912 Refactor: move test to correct modules, modify EventMap with builder --- .../java/util/emitter/core/EventMap.java | 20 +- .../service/ServiceMetricEventTest.java | 1 + .../druid/emitter/kafka/KafkaEmitter.java | 4 +- .../kafka/EventToJsonSerializerTest.java | 276 ------------------ .../OpenTelemetryEmitterTest.java | 1 - .../server/log/DefaultRequestLogEvent.java | 5 +- .../server/log/AlertMetricServiceTest.java | 65 +++++ .../log/DefaultRequestLogEventTest.java | 177 +++++++++++ .../server/log/ServiceMetricSerdeTest.java | 58 ++++ 9 files changed, 321 insertions(+), 286 deletions(-) delete mode 100644 extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java create mode 100644 server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java create mode 100644 server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java index a4e0267e749f..5cf031b2dc37 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java @@ -25,10 +25,10 @@ import java.util.Map; /** - * EventMap can be safely serialzed to JSON using Jackson serializer as it respects the polymorphic annotations - * on entires (unlike standard Map). The example of polymorphic class is a query interface, where different native - * query types are resolved by additional field called "queryType". This implementation esures that the annotation on - * the values are respected during serialization. + * EventMap is a hash map implementation. It can be safely serialzed to JSON using Jackson serializer as it + * respects the polymorphic annotations on entires (unlike standard Map). The example of polymorphic class is a query + * interface, where different native query types are resolved by additional field called "queryType". + * This implementation ensures that the annotation on the values are respected during serialization. */ @JsonSerialize(using = EventMapSerializer.class) public class EventMap extends HashMap @@ -41,6 +41,14 @@ public static Builder builder() return new Builder(); } + /** + * Convert this EventMap to a builder. Performs copy of the whole EventMap. + */ + public Builder asBuilder() + { + return new Builder().putAll(this); + } + public static class Builder { @@ -89,7 +97,9 @@ public Builder putAll(Map other) return this; } - /** Builds and returns the EventMap */ + /** + * Builds and returns the EventMap + */ public EventMap build() { return map; diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java index d713a016b010..5a97f76564c8 100644 --- a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java +++ b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java @@ -290,4 +290,5 @@ public void testNaN2() { ServiceMetricEvent.builder().build("foo", 0 / 0f); } + } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 1534c72924a4..129a374b5849 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -172,7 +172,9 @@ public void emit(final Event event) try { EventMap map = event.toMap(); if (config.getClusterName() != null) { - map.put("clusterName", config.getClusterName()); + map = map.asBuilder() + .put("clusterName", config.getClusterName()) + .build(); } String resultJson = jsonMapper.writeValueAsString(map); diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java deleted file mode 100644 index 8ffb6ca11012..000000000000 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/EventToJsonSerializerTest.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.emitter.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.emitter.core.Event; -import org.apache.druid.java.util.emitter.core.EventMap; -import org.apache.druid.java.util.emitter.service.AlertEvent; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; -import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; -import org.apache.druid.query.timeseries.TimeseriesQuery; -import org.apache.druid.segment.VirtualColumns; -import org.apache.druid.server.QueryStats; -import org.apache.druid.server.RequestLogLine; -import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collections; - - -public class EventToJsonSerializerTest -{ - - @Test - public void testSerializeServiceMetricEvent() throws JsonProcessingException - { - ObjectMapper mapper = new DefaultObjectMapper(); - String timestamp = "2022-08-17T18:51:00.000Z"; - Event event = ServiceMetricEvent.builder() - .setFeed("my-feed") - .build(DateTimes.of(timestamp), "m1", 1) - .build("my-service", "my-host"); - - String actual = mapper.writeValueAsString(event.toMap()); - String expected = "{" - + "\"feed\":\"my-feed\"," - + "\"timestamp\":\"" - + timestamp - + "\"," - + "\"metric\":\"m1\"," - + "\"value\":1," - + "\"service\":\"my-service\"," - + "\"host\":\"my-host\"" - + "}"; - Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); - } - - @Test - public void testSerializeAlertEvent() throws JsonProcessingException - { - ObjectMapper mapper = new DefaultObjectMapper(); - String timestamp = "2022-08-17T18:51:00.000Z"; - Event event = new AlertEvent( - DateTimes.of(timestamp), - "my-service", - "my-host", - AlertEvent.Severity.DEFAULT, - "my-description", - Collections.emptyMap() - ); - - String actual = mapper.writeValueAsString(event.toMap()); - String expected = "{" - + "\"feed\":\"alerts\"," - + "\"timestamp\":\"" - + timestamp - + "\"," - + "\"severity\":\"" - + AlertEvent.Severity.DEFAULT - + "\"," - + "\"service\":\"my-service\"," - + "\"host\":\"my-host\"," - + "\"description\":\"my-description\"," - + "\"data\":{}" - + "}"; - Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); - } - - @Test - public void testSerializeSqlLogRequest() throws JsonProcessingException - { - ObjectMapper mapper = new DefaultObjectMapper(); - String timestamp = "2022-08-17T18:51:00.000Z"; - - Event event = DefaultRequestLogEventBuilderFactory.instance() - .createRequestLogEventBuilder( - "requests", - RequestLogLine.forSql( - "SELECT * FROM dummy", - Collections.emptyMap(), - DateTimes.of(timestamp), - "127.0.0.1", - new QueryStats(ImmutableMap.of()) - ) - ) - .build("my-service", "my-host"); - - String actual = mapper.writeValueAsString(event.toMap()); - String expected = "{" - + "\"feed\":\"requests\"," - + "\"timestamp\":\"" - + timestamp - + "\"," - + "\"service\":\"my-service\"," - + "\"host\":\"my-host\"," - + "\"sql\":\"SELECT * FROM dummy\"," - + "\"sqlQueryContext\":{}," - + "\"queryStats\":{}," - + "\"remoteAddr\":\"127.0.0.1\"" - + "}"; - - Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); - } - - @Test - public void testSerializeNativeLogRequest() throws JsonProcessingException - { - ObjectMapper mapper = new DefaultObjectMapper(); - - RequestLogLine nativeLine = RequestLogLine.forNative( - new TimeseriesQuery( - new TableDataSource("dummy"), - new MultipleIntervalSegmentSpec( - ImmutableList.of(Intervals.of( - "2015-01-01/2015-01-02"))), - true, - VirtualColumns.EMPTY, - null, - Granularities.ALL, - ImmutableList.of(), - ImmutableList.of(), - 5, - ImmutableMap.of("key", "value") - ), - DateTimes.of(2019, 12, 12, 3, 1), - "127.0.0.1", - new QueryStats(ImmutableMap.of( - "query/time", - 13L, - "query/bytes", - 10L, - "success", - true, - "identity", - "allowAll" - )) - ); - - Event event = DefaultRequestLogEventBuilderFactory.instance() - .createRequestLogEventBuilder("my-feed", nativeLine) - .build("my-service", "my-host"); - - String actual = mapper.writeValueAsString(event.toMap()); - String queryString = "{" - + "\"queryType\":\"timeseries\"," - + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," - + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," - + "\"descending\":true," - + "\"granularity\":{\"type\":\"all\"}," - + "\"limit\":5," - + "\"context\":{\"key\":\"value\"}" - + "}"; - - String expected = "{" - + "\"feed\":\"my-feed\"," - + "\"host\":\"my-host\"," - + "\"service\":\"my-service\"," - + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," - + "\"query\":" - + queryString - + "," - + "\"remoteAddr\":\"127.0.0.1\"," - + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; - - Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); - } - - @Test - public void testSerializeNativeLogRequestWithAdditionalParameters() throws JsonProcessingException - { - ObjectMapper mapper = new DefaultObjectMapper(); - - - RequestLogLine nativeLine = RequestLogLine.forNative( - new TimeseriesQuery( - new TableDataSource("dummy"), - new MultipleIntervalSegmentSpec( - ImmutableList.of(Intervals.of( - "2015-01-01/2015-01-02"))), - true, - VirtualColumns.EMPTY, - null, - Granularities.ALL, - ImmutableList.of(), - ImmutableList.of(), - 5, - ImmutableMap.of("key", "value") - ), - DateTimes.of(2019, 12, 12, 3, 1), - "127.0.0.1", - new QueryStats(ImmutableMap.of( - "query/time", - 13L, - "query/bytes", - 10L, - "success", - true, - "identity", - "allowAll" - )) - ); - - Event event = DefaultRequestLogEventBuilderFactory.instance() - .createRequestLogEventBuilder("my-feed", nativeLine) - .build("my-service", "my-host"); - - EventMap map = EventMap.builder() - .putNonNull("number", 1) - .putNonNull("text", "some text") - .putNonNull("null", null) - .putAll(event.toMap()) - .build(); - - String actual = mapper.writeValueAsString(map); - String queryString = "{" - + "\"queryType\":\"timeseries\"," - + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," - + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," - + "\"descending\":true," - + "\"granularity\":{\"type\":\"all\"}," - + "\"limit\":5," - + "\"context\":{\"key\":\"value\"}" - + "}"; - - String expected = "{" - + "\"feed\":\"my-feed\"," - + "\"host\":\"my-host\"," - + "\"service\":\"my-service\"," - + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," - + "\"query\":" - + queryString - + "," - + "\"remoteAddr\":\"127.0.0.1\"," - + "\"number\":1," - + "\"text\":\"some text\"," - + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; - - Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); - } -} diff --git a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java index d20eaa430253..58ee485f9c9c 100644 --- a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java +++ b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java @@ -40,7 +40,6 @@ import org.junit.Test; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java index c5141d39cbed..d7f75d216ebf 100644 --- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java +++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java @@ -70,9 +70,8 @@ public EventMap toMap() .put("sqlQueryContext", getSqlQueryContext()); } - builder - .put("remoteAddr", getRemoteAddr()) - .put("queryStats", getQueryStats()); + builder.put("remoteAddr", getRemoteAddr()) + .put("queryStats", getQueryStats()); return builder.build(); } diff --git a/server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java b/server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java new file mode 100644 index 000000000000..c462fc636695 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.log; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class AlertMetricServiceTest +{ + @Test + public void testSerializeAlertEventMap() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + String timestamp = "2022-08-17T18:51:00.000Z"; + Event event = new AlertEvent( + DateTimes.of(timestamp), + "my-service", + "my-host", + AlertEvent.Severity.DEFAULT, + "my-description", + Collections.emptyMap() + ); + + String actual = mapper.writeValueAsString(event.toMap()); + String expected = "{" + + "\"feed\":\"alerts\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"severity\":\"" + + AlertEvent.Severity.DEFAULT + + "\"," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"," + + "\"description\":\"my-description\"," + + "\"data\":{}" + + "}"; + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } +} diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java index 126705ec8bca..ebeffcf8c904 100644 --- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java +++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server.log; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -26,6 +27,8 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -37,6 +40,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -162,4 +166,177 @@ public void testDefaultRequestLogEventToMapSQL() Assert.assertEquals(expected, defaultRequestLogEvent.toMap()); } + + @Test + public void testSerializeSqlLogRequestMap() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + String timestamp = "2022-08-17T18:51:00.000Z"; + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder( + "requests", + RequestLogLine.forSql( + "SELECT * FROM dummy", + Collections.emptyMap(), + DateTimes.of(timestamp), + "127.0.0.1", + new QueryStats(ImmutableMap.of()) + ) + ) + .build("my-service", "my-host"); + + String actual = mapper.writeValueAsString(event.toMap()); + String expected = "{" + + "\"feed\":\"requests\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"," + + "\"sql\":\"SELECT * FROM dummy\"," + + "\"sqlQueryContext\":{}," + + "\"queryStats\":{}," + + "\"remoteAddr\":\"127.0.0.1\"" + + "}"; + + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + @Test + public void testSerializeNativeLogRequestMap() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + + RequestLogLine nativeLine = RequestLogLine.forNative( + new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of( + "2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value") + ), + DateTimes.of(2019, 12, 12, 3, 1), + "127.0.0.1", + new QueryStats(ImmutableMap.of( + "query/time", + 13L, + "query/bytes", + 10L, + "success", + true, + "identity", + "allowAll" + )) + ); + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder("my-feed", nativeLine) + .build("my-service", "my-host"); + + String actual = mapper.writeValueAsString(event.toMap()); + String queryString = "{" + + "\"queryType\":\"timeseries\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," + + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," + + "\"descending\":true," + + "\"granularity\":{\"type\":\"all\"}," + + "\"limit\":5," + + "\"context\":{\"key\":\"value\"}" + + "}"; + + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"host\":\"my-host\"," + + "\"service\":\"my-service\"," + + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," + + "\"query\":" + + queryString + + "," + + "\"remoteAddr\":\"127.0.0.1\"," + + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; + + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + + @Test + public void testSerializeNativeLogRequestMapWithAdditionalParameters() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + + + RequestLogLine nativeLine = RequestLogLine.forNative( + new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec( + ImmutableList.of(Intervals.of( + "2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value") + ), + DateTimes.of(2019, 12, 12, 3, 1), + "127.0.0.1", + new QueryStats(ImmutableMap.of( + "query/time", + 13L, + "query/bytes", + 10L, + "success", + true, + "identity", + "allowAll" + )) + ); + + Event event = DefaultRequestLogEventBuilderFactory.instance() + .createRequestLogEventBuilder("my-feed", nativeLine) + .build("my-service", "my-host"); + + EventMap map = EventMap.builder() + .putNonNull("number", 1) + .putNonNull("text", "some text") + .putNonNull("null", null) + .putAll(event.toMap()) + .build(); + + String actual = mapper.writeValueAsString(map); + String queryString = "{" + + "\"queryType\":\"timeseries\"," + + "\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"}," + + "\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]}," + + "\"descending\":true," + + "\"granularity\":{\"type\":\"all\"}," + + "\"limit\":5," + + "\"context\":{\"key\":\"value\"}" + + "}"; + + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"host\":\"my-host\"," + + "\"service\":\"my-service\"," + + "\"timestamp\":\"2019-12-12T03:01:00.000Z\"," + + "\"query\":" + + queryString + + "," + + "\"remoteAddr\":\"127.0.0.1\"," + + "\"number\":1," + + "\"text\":\"some text\"," + + "\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}"; + + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + } diff --git a/server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java b/server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java new file mode 100644 index 000000000000..0fd6f6ba2058 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.log; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.junit.Assert; +import org.junit.Test; + +public class ServiceMetricSerdeTest +{ + + @Test + public void testSerializeServiceMetricEventMap() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + String timestamp = "2022-08-17T18:51:00.000Z"; + Event event = ServiceMetricEvent.builder() + .setFeed("my-feed") + .build(DateTimes.of(timestamp), "m1", 1) + .build("my-service", "my-host"); + + String actual = mapper.writeValueAsString(event.toMap()); + String expected = "{" + + "\"feed\":\"my-feed\"," + + "\"timestamp\":\"" + + timestamp + + "\"," + + "\"metric\":\"m1\"," + + "\"value\":1," + + "\"service\":\"my-service\"," + + "\"host\":\"my-host\"" + + "}"; + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } + +} From 8d250aebf7c46d8df75f969eda600b507902a047 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Tue, 23 Aug 2022 11:47:40 +0200 Subject: [PATCH 6/6] #12912 Refactor: rename unit test classes --- .../{AlertMetricServiceTest.java => AlertEventSerdeTest.java} | 2 +- ...iceMetricSerdeTest.java => ServiceMetricEventSerdeTest.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename server/src/test/java/org/apache/druid/server/log/{AlertMetricServiceTest.java => AlertEventSerdeTest.java} (98%) rename server/src/test/java/org/apache/druid/server/log/{ServiceMetricSerdeTest.java => ServiceMetricEventSerdeTest.java} (98%) diff --git a/server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java b/server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java similarity index 98% rename from server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java rename to server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java index c462fc636695..7fcd0db1898b 100644 --- a/server/src/test/java/org/apache/druid/server/log/AlertMetricServiceTest.java +++ b/server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java @@ -30,7 +30,7 @@ import java.util.Collections; -public class AlertMetricServiceTest +public class AlertEventSerdeTest { @Test public void testSerializeAlertEventMap() throws JsonProcessingException diff --git a/server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java b/server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java similarity index 98% rename from server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java rename to server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java index 0fd6f6ba2058..341bab57bb0b 100644 --- a/server/src/test/java/org/apache/druid/server/log/ServiceMetricSerdeTest.java +++ b/server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java @@ -28,7 +28,7 @@ import org.junit.Assert; import org.junit.Test; -public class ServiceMetricSerdeTest +public class ServiceMetricEventSerdeTest { @Test