From b40a5b23c73c503c232e846deebadada070a494c Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 10 Jun 2025 10:44:40 -0300 Subject: [PATCH 1/5] Migrate PubSub removing flaky test Signed-off-by: Matheus Cruz --- .../pubsub/http/DaprPubSubIT.java | 637 ++++++++++++++++++ .../pubsub/http/SubscriberController.java | 271 ++++++++ .../pubsub/http/TestPubSubApplication.java | 23 + 3 files changed, 931 insertions(+) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java new file mode 100644 index 0000000000..f63b548faf --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -0,0 +1,637 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.BulkPublishEntry; +import io.dapr.client.domain.BulkPublishRequest; +import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.CloudEvent; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.Metadata; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.config.Properties; +import io.dapr.it.pubsub.http.PubSubIT; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.spring.boot.autoconfigure.client.DaprClientAutoConfiguration; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.utils.TypeRef; +import org.assertj.core.api.SoftAssertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; + +import static io.dapr.it.Retry.callWithRetry; +import static io.dapr.it.TestUtils.assertThrowsDaprException; +import static io.dapr.it.TestUtils.assertThrowsDaprExceptionWithReason; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, + classes = { + TestPubSubApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class DaprPubSubIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final Random RANDOM = new Random(); + private static final int PORT = RANDOM.nextInt(1000) + 8000; + private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PUBSUB_APP_ID = "pubsub-dapr-app"; + private static final String PUBSUB_NAME = "pubsub"; + + // topics + private static final String TOPIC_BULK = "testingbulktopic"; + private static final String TOPIC_NAME = "testingtopic"; + private static final String ANOTHER_TOPIC_NAME = "anothertopic"; + private static final String TYPED_TOPIC_NAME = "typedtestingtopic"; + private static final String BINARY_TOPIC_NAME = "binarytopic"; + private static final String TTL_TOPIC_NAME = "ttltopic"; + private static final String LONG_TOPIC_NAME = "testinglongvalues"; + + + private static final int NUM_MESSAGES = 10; + + // typeRefs + private static final TypeRef> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() { + }; + private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = + new TypeRef<>() { + }; + private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = + new TypeRef<>() { + }; + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName(PUBSUB_APP_ID) + .withNetwork(DAPR_NETWORK) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .withAppPort(PORT); + + /** + * Expose the Dapr ports to the host. + * + * @param registry the dynamic property registry + */ + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); + registry.add("server.port", () -> PORT); + } + + + @BeforeEach + public void setUp() { + org.testcontainers.Testcontainers.exposeHostPorts(PORT); + } + + @Test + @DisplayName("Should receive INVALID_ARGUMENT when the specified Pub/Sub name does not exist") + public void shouldReceiveInvalidArgument() throws Exception { + Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER); + + try (DaprClient client = createDaprClientBuilder().build()) { + assertThrowsDaprExceptionWithReason( + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: pubsub unknown pubsub is not found", + "DAPR_PUBSUB_NOT_FOUND", + () -> client.publishEvent("unknown pubsub", "mytopic", "payload").block()); + } + } + + @Test + @DisplayName("Should receive INVALID_ARGUMENT using bulk publish when the specified Pub/Sub name does not exist") + public void shouldReceiveInvalidArgumentWithBulkPublish() throws Exception { + try (DaprPreviewClient client = createDaprClientBuilder().buildPreviewClient()) { + assertThrowsDaprException( + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: pubsub unknown pubsub is not found", + () -> client.publishEvents("unknown pubsub", "mytopic", "text/plain", "message").block()); + } + } + + @Test + @DisplayName("Should publish some payload types successfully") + public void shouldPublishSomePayloadTypesWithNoError() throws Exception { + + DaprObjectSerializer serializer = createJacksonObjectSerializer(); + + try ( + DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build(); + DaprPreviewClient previewClient = createDaprClientBuilder().withObjectSerializer(serializer) + .buildPreviewClient() + ) { + + publishBulkStringsAsserting(previewClient); + + publishMyObjectAsserting(previewClient); + + publishByteAsserting(previewClient); + + publishCloudEventAsserting(previewClient); + + Thread.sleep(10000); + + callWithRetry(() -> validatePublishedMessages(client), 2000); + } + } + + @Test + @DisplayName("Should publish various payload types to different topics") + public void testPubSub() throws Exception { + + DaprObjectSerializer serializer = createJacksonObjectSerializer(); + + // Send a batch of messages on one topic + try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + + sendBulkMessagesAsText(client, TOPIC_NAME); + + sendBulkMessagesAsText(client, ANOTHER_TOPIC_NAME); + + //Publishing an object. + PubSubIT.MyObject object = new PubSubIT.MyObject(); + object.setId("123"); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block(); + System.out.println("Published one object."); + + client.publishEvent(PUBSUB_NAME, TYPED_TOPIC_NAME, object).block(); + System.out.println("Published another object."); + + //Publishing a single byte: Example of non-string based content published + publishOneByteSync(client, TOPIC_NAME); + + CloudEvent cloudEvent = new CloudEvent<>(); + cloudEvent.setId("1234"); + cloudEvent.setData("message from cloudevent"); + cloudEvent.setSource("test"); + cloudEvent.setSpecversion("1"); + cloudEvent.setType("myevent"); + cloudEvent.setDatacontenttype("text/plain"); + + //Publishing a cloud event. + client.publishEvent(new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEvent) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event."); + + { + CloudEvent cloudEventV2 = new CloudEvent<>(); + cloudEventV2.setId("2222"); + cloudEventV2.setData("message from cloudevent v2"); + cloudEventV2.setSource("test"); + cloudEventV2.setSpecversion("1"); + cloudEventV2.setType("myevent.v2"); + cloudEventV2.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v2."); + } + + { + CloudEvent cloudEventV3 = new CloudEvent<>(); + cloudEventV3.setId("3333"); + cloudEventV3.setData("message from cloudevent v3"); + cloudEventV3.setSource("test"); + cloudEventV3.setSpecversion("1"); + cloudEventV3.setType("myevent.v3"); + cloudEventV3.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v3."); + } + + Thread.sleep(2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(13) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .contains( + "AQ==", + "message from cloudevent" + ); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_NAME); + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .anyMatch(expectedMessage::equals); + } + + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(LinkedHashMap.class::isInstance) + .map(data -> (String) ((LinkedHashMap) data).get("id")) + .contains("123"); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V2"); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopicV2", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(1); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V3"); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopicV3", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(1); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TYPED_TOPIC_NAME); + + List> messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/typedtestingtopic", + null, + HttpExtension.GET, + CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .filteredOn(PubSubIT.MyObject.class::isInstance) + .map(PubSubIT.MyObject::getId) + .contains("123"); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/anothertopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(10); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME); + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .anyMatch(expectedMessage::equals); + } + }, 2000); + + } + } + + @Test + @DisplayName("Should publish binary payload type successfully") + public void shouldPublishBinary() throws Exception { + + DaprObjectSerializer serializer = createBinaryObjectSerializer(); + + try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + publishOneByteSync(client, BINARY_TOPIC_NAME); + } + + Thread.sleep(3000); + + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + BINARY_TOPIC_NAME); + final List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/binarytopic", + null, + HttpExtension.GET, CLOUD_EVENT_LIST_TYPE_REF).block(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(messages.size()).isEqualTo(1); + softly.assertThat(messages.get(0).getData()).isNull(); + softly.assertThat(messages.get(0).getBinaryData()).isEqualTo(new byte[] {1}); + }); + }, 2000); + } + } + + private static void publishOneByteSync(DaprClient client, String topicName) { + client.publishEvent( + PUBSUB_NAME, + topicName, + new byte[] {1}).block(); + } + + private static void sendBulkMessagesAsText(DaprClient client, String topicName) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s", i, topicName); + client.publishEvent(PUBSUB_NAME, topicName, message).block(); + } + } + + private void publishMyObjectAsserting(DaprPreviewClient previewClient) { + PubSubIT.MyObject object = new PubSubIT.MyObject(); + object.setId("123"); + BulkPublishResponse response = previewClient.publishEvents( + PUBSUB_NAME, + TOPIC_BULK, + "application/json", + Collections.singletonList(object) + ).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishBulkStringsAsserting(DaprPreviewClient previewClient) { + List messages = new ArrayList<>(); + for (int i = 0; i < NUM_MESSAGES; i++) { + messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK)); + } + BulkPublishResponse response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishByteAsserting(DaprPreviewClient previewClient) { + BulkPublishResponse response = previewClient.publishEvents( + PUBSUB_NAME, + TOPIC_BULK, + "", + Collections.singletonList(new byte[] {1}) + ).block(); + SoftAssertions.assertSoftly(softly -> { + assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishCloudEventAsserting(DaprPreviewClient previewClient) { + CloudEvent cloudEvent = new CloudEvent<>(); + cloudEvent.setId("1234"); + cloudEvent.setData("message from cloudevent"); + cloudEvent.setSource("test"); + cloudEvent.setSpecversion("1"); + cloudEvent.setType("myevent"); + cloudEvent.setDatacontenttype("text/plain"); + + BulkPublishRequest> req = new BulkPublishRequest<>( + PUBSUB_NAME, + TOPIC_BULK, + Collections.singletonList( + new BulkPublishEntry<>("1", cloudEvent, "application/cloudevents+json", null) + ) + ); + BulkPublishResponse> response = previewClient.publishEvents(req).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void validatePublishedMessages(DaprClient client) { + List cloudEventMessages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/redis/testingbulktopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(cloudEventMessages) + .as("expected non-null list of cloud events") + .isNotNull() + .hasSize(13); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_BULK); + assertThat(cloudEventMessages) + .as("expected text payload to match for message %d", i) + .anySatisfy(event -> assertThat(event.getData()).isEqualTo(expectedMessage)); + } + + assertThat(cloudEventMessages) + .filteredOn(event -> event.getData() instanceof LinkedHashMap) + .map(event -> (LinkedHashMap) event.getData()) + .anySatisfy(map -> assertThat(map.get("id")).isEqualTo("123")); + + assertThat(cloudEventMessages) + .map(CloudEvent::getData) + .anySatisfy(data -> assertThat(data).isEqualTo("AQ==")); + + assertThat(cloudEventMessages) + .map(CloudEvent::getData) + .anySatisfy(data -> assertThat(data).isEqualTo("message from cloudevent")); + } + + @Test + @DisplayName("Should publish with TTL") + public void testPubSubTTLMetadata() throws Exception { + + // Send a batch of messages on one topic, all to be expired in 1 second. + try (DaprClient client = createDaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME); + //Publishing messages + client.publishEvent( + PUBSUB_NAME, + TTL_TOPIC_NAME, + message, + Map.of(Metadata.TTL_IN_SECONDS, "1")) + .block(); + System.out.printf("Published message: '%s' to topic '%s' pubsub_name '%s'%n", message, TOPIC_NAME, PUBSUB_NAME); + } + } + + // Sleeps for two seconds to let them expire. + Thread.sleep(2000); + + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + TTL_TOPIC_NAME); + final List + messages = client.invokeMethod(PUBSUB_APP_ID, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block(); + assertThat(messages).hasSize(0); + }, 2000); + } + } + + @Test + @DisplayName("Should publish long values") + public void testLongValues() throws Exception { + + Random random = new Random(590518626939830271L); + Set values = new HashSet<>(); + values.add(new PubSubIT.ConvertToLong().setVal(590518626939830271L)); + PubSubIT.ConvertToLong val; + for (int i = 0; i < NUM_MESSAGES - 1; i++) { + do { + val = new PubSubIT.ConvertToLong().setVal(random.nextLong()); + } while (values.contains(val)); + values.add(val); + } + Iterator valuesIt = values.iterator(); + try (DaprClient client = createDaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + PubSubIT.ConvertToLong value = valuesIt.next(); + System.out.println("The long value sent " + value.getValue()); + //Publishing messages + client.publishEvent( + PUBSUB_NAME, + LONG_TOPIC_NAME, + value, + Map.of(Metadata.TTL_IN_SECONDS, "30")).block(); + + try { + Thread.sleep((long) (1000 * Math.random())); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + } + } + + Set actual = new HashSet<>(); + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + LONG_TOPIC_NAME); + final List> messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testinglongvalues", + null, + HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block(); + assertNotNull(messages); + for (CloudEvent message : messages) { + actual.add(message.getData()); + } + assertThat(values).isEqualTo(actual); + }, 2000); + } + } + + private static DaprClientBuilder createDaprClientBuilder() { + return new DaprClientBuilder() + .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getHttpPort()) + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getGrpcPort()); + } + + private DaprObjectSerializer createJacksonObjectSerializer() { + return new DaprObjectSerializer() { + @Override + public byte[] serialize(Object o) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsBytes(o); + } + + @Override + public T deserialize(byte[] data, TypeRef type) throws IOException { + return OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); + } + + @Override + public String getContentType() { + return "application/json"; + } + }; + } + + private @NotNull DaprObjectSerializer createBinaryObjectSerializer() { + return new DaprObjectSerializer() { + @Override + public byte[] serialize(Object o) { + return (byte[]) o; + } + + @Override + public T deserialize(byte[] data, TypeRef type) { + return (T) data; + } + + @Override + public String getContentType() { + return "application/octet-stream"; + } + }; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java new file mode 100644 index 0000000000..0fc85a2a8a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java @@ -0,0 +1,271 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.pubsub.http; + +import io.dapr.Rule; +import io.dapr.Topic; +import io.dapr.client.domain.BulkSubscribeAppResponse; +import io.dapr.client.domain.BulkSubscribeAppResponseEntry; +import io.dapr.client.domain.BulkSubscribeAppResponseStatus; +import io.dapr.client.domain.BulkSubscribeMessage; +import io.dapr.client.domain.BulkSubscribeMessageEntry; +import io.dapr.client.domain.CloudEvent; +import io.dapr.it.pubsub.http.PubSubIT; +import io.dapr.springboot.annotations.BulkSubscribe; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * SpringBoot Controller to handle input binding. + */ +@RestController +public class SubscriberController { + + private final Map>> messagesByTopic = Collections.synchronizedMap(new HashMap<>()); + + @GetMapping(path = "/messages/{topic}") + public List> getMessagesByTopic(@PathVariable("topic") String topic) { + return messagesByTopic.getOrDefault(topic, Collections.emptyList()); + } + + private static final List messagesReceivedBulkPublishTopic = new ArrayList(); + private static final List messagesReceivedTestingTopic = new ArrayList(); + private static final List messagesReceivedTestingTopicV2 = new ArrayList(); + private static final List messagesReceivedTestingTopicV3 = new ArrayList(); + private static final List responsesReceivedTestingTopicBulkSub = new ArrayList<>(); + + @GetMapping(path = "/messages/redis/testingbulktopic") + public List getMessagesReceivedBulkTopic() { + return messagesReceivedBulkPublishTopic; + } + + + + @GetMapping(path = "/messages/testingtopic") + public List getMessagesReceivedTestingTopic() { + return messagesReceivedTestingTopic; + } + + @GetMapping(path = "/messages/testingtopicV2") + public List getMessagesReceivedTestingTopicV2() { + return messagesReceivedTestingTopicV2; + } + + @GetMapping(path = "/messages/testingtopicV3") + public List getMessagesReceivedTestingTopicV3() { + return messagesReceivedTestingTopicV3; + } + + @GetMapping(path = "/messages/topicBulkSub") + public List getMessagesReceivedTestingTopicBulkSub() { + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + return responsesReceivedTestingTopicBulkSub; + } + + @Topic(name = "testingtopic", pubsubName = "pubsub") + @PostMapping("/route1") + public Mono handleMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopic.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingbulktopic", pubsubName = "pubsub") + @PostMapping("/route1_redis") + public Mono handleBulkTopicMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedBulkPublishTopic.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "pubsub", + rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2)) + @PostMapping(path = "/route1_v2") + public Mono handleMessageV2(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV2.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "pubsub", + rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1)) + @PostMapping(path = "/route1_v3") + public Mono handleMessageV3(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV3.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "typedtestingtopic", pubsubName = "pubsub") + @PostMapping(path = "/route1b") + public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String id = envelope.getData() == null ? "" : envelope.getData().getId(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing typed topic Subscriber got message with ID: " + id + "; Content-type: " + contentType); + messagesByTopic.compute("typedtestingtopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "binarytopic", pubsubName = "pubsub") + @PostMapping(path = "/route2") + public Mono handleBinaryMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesByTopic.compute("binarytopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "#{'another'.concat('topic')}", pubsubName = "${pubsubName:pubsub}") + @PostMapping(path = "/route3") + public Mono handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + System.out.println("Another topic Subscriber got message: " + message); + messagesByTopic.compute("anothertopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @PostMapping(path = "/route4") + public Mono handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + System.out.println("TTL topic Subscriber got message: " + message); + messagesByTopic.compute("ttltopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testinglongvalues", pubsubName = "pubsub") + @PostMapping(path = "/testinglongvalues") + public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + Long message = cloudEvent.getData().getValue(); + System.out.println("Subscriber got: " + message); + messagesByTopic.compute("testinglongvalues", merge(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Receive messages using the bulk subscribe API. + * The maxBulkSubCount and maxBulkSubAwaitDurationMs are adjusted to ensure + * that all the test messages arrive in a single batch. + * + * @param bulkMessage incoming bulk of messages from the message bus. + * @return status for each message received. + */ + @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 100) + @Topic(name = "topicBulkSub", pubsubName = "pubsub") + @PostMapping(path = "/routeBulkSub") + public Mono handleMessageBulk( + @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { + return Mono.fromCallable(() -> { + System.out.println("bulkMessage: " + bulkMessage.getEntries().size()); + + if (bulkMessage.getEntries().size() == 0) { + BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>()); + responsesReceivedTestingTopicBulkSub.add(response); + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + return response; + } + + List entries = new ArrayList<>(); + for (BulkSubscribeMessageEntry entry: bulkMessage.getEntries()) { + try { + System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId()); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); + } catch (Exception e) { + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); + } + } + BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries); + responsesReceivedTestingTopicBulkSub.add(response); + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + + return response; + }); + } + + private BiFunction>, List>> merge(final CloudEvent item) { + return (key, value) -> { + final List> list = value == null ? new ArrayList<>() : value; + list.add(item); + return list; + }; + } + + @GetMapping(path = "/health") + public void health() { + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java new file mode 100644 index 0000000000..2ed550cac7 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class TestPubSubApplication { + public static void main(String[] args) { + SpringApplication.run(TestPubSubApplication.class, args); + } +} From 9b579257b8c522fa75b6db250ceb63aca7c167e1 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Fri, 27 Jun 2025 20:41:35 -0300 Subject: [PATCH 2/5] Adjust assertion Signed-off-by: Matheus Cruz --- .../io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java index f63b548faf..3065d3ba86 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -586,7 +586,7 @@ public void testLongValues() throws Exception { for (CloudEvent message : messages) { actual.add(message.getData()); } - assertThat(values).isEqualTo(actual); + assertThat(values).containsExactlyInAnyOrder(actual); }, 2000); } } From 63010d78dcc2afee6dc1bb1a0056c0c02b35e0af Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Fri, 27 Jun 2025 21:17:21 -0300 Subject: [PATCH 3/5] Change assert Signed-off-by: Matheus Cruz --- .../io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java index 3065d3ba86..008e22b5d4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -586,7 +586,7 @@ public void testLongValues() throws Exception { for (CloudEvent message : messages) { actual.add(message.getData()); } - assertThat(values).containsExactlyInAnyOrder(actual); + assertThat(values).containsAll(actual); }, 2000); } } From 5fce2ad4057776add644fcb06cbc5d5e604b6f3b Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Mon, 7 Jul 2025 15:42:59 -0300 Subject: [PATCH 4/5] Apply pull request suggestions Signed-off-by: Matheus Cruz --- .../it/testcontainers/DaprClientFactory.java | 14 +++ .../pubsub/http/DaprPubSubIT.java | 98 +++++++------------ .../pubsub/http/SubscriberController.java | 27 ++--- 3 files changed, 63 insertions(+), 76 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprClientFactory.java diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprClientFactory.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprClientFactory.java new file mode 100644 index 0000000000..2a706af3c0 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprClientFactory.java @@ -0,0 +1,14 @@ +package io.dapr.it.testcontainers; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.config.Properties; +import io.dapr.testcontainers.DaprContainer; + +public interface DaprClientFactory { + + static DaprClientBuilder createDaprClientBuilder(DaprContainer daprContainer) { + return new DaprClientBuilder() + .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + daprContainer.getHttpPort()) + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + daprContainer.getGrpcPort()); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java index 008e22b5d4..dbaeaac6a2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -12,10 +12,8 @@ */ package io.dapr.it.testcontainers.pubsub.http; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.client.DaprClient; -import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprPreviewClient; import io.dapr.client.domain.BulkPublishEntry; import io.dapr.client.domain.BulkPublishRequest; @@ -24,10 +22,10 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.Metadata; import io.dapr.client.domain.PublishEventRequest; -import io.dapr.config.Properties; import io.dapr.it.pubsub.http.PubSubIT; +import io.dapr.it.testcontainers.DaprClientFactory; +import io.dapr.serializer.CustomizableObjectSerializer; import io.dapr.serializer.DaprObjectSerializer; -import io.dapr.spring.boot.autoconfigure.client.DaprClientAutoConfiguration; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; import io.dapr.utils.TypeRef; @@ -37,6 +35,8 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; @@ -45,7 +45,6 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -62,7 +61,6 @@ import static io.dapr.it.TestUtils.assertThrowsDaprExceptionWithReason; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @SpringBootTest( @@ -75,12 +73,12 @@ @Tag("testcontainers") public class DaprPubSubIT { + private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubIT.class); private static final Network DAPR_NETWORK = Network.newNetwork(); private static final Random RANDOM = new Random(); private static final int PORT = RANDOM.nextInt(1000) + 8000; private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*"; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final DaprObjectSerializer SERIALIZER = new CustomizableObjectSerializer(new ObjectMapper()); private static final String PUBSUB_APP_ID = "pubsub-dapr-app"; private static final String PUBSUB_NAME = "pubsub"; @@ -112,7 +110,7 @@ public class DaprPubSubIT { .withAppName(PUBSUB_APP_ID) .withNetwork(DAPR_NETWORK) .withDaprLogLevel(DaprLogLevel.DEBUG) - .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String())) .withAppChannelAddress("host.testcontainers.internal") .withAppPort(PORT); @@ -139,7 +137,7 @@ public void setUp() { public void shouldReceiveInvalidArgument() throws Exception { Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER); - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { assertThrowsDaprExceptionWithReason( "INVALID_ARGUMENT", "INVALID_ARGUMENT: pubsub unknown pubsub is not found", @@ -151,7 +149,7 @@ public void shouldReceiveInvalidArgument() throws Exception { @Test @DisplayName("Should receive INVALID_ARGUMENT using bulk publish when the specified Pub/Sub name does not exist") public void shouldReceiveInvalidArgumentWithBulkPublish() throws Exception { - try (DaprPreviewClient client = createDaprClientBuilder().buildPreviewClient()) { + try (DaprPreviewClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).buildPreviewClient()) { assertThrowsDaprException( "INVALID_ARGUMENT", "INVALID_ARGUMENT: pubsub unknown pubsub is not found", @@ -163,11 +161,10 @@ public void shouldReceiveInvalidArgumentWithBulkPublish() throws Exception { @DisplayName("Should publish some payload types successfully") public void shouldPublishSomePayloadTypesWithNoError() throws Exception { - DaprObjectSerializer serializer = createJacksonObjectSerializer(); - try ( - DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build(); - DaprPreviewClient previewClient = createDaprClientBuilder().withObjectSerializer(serializer) + DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); + DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer( + SERIALIZER) .buildPreviewClient() ) { @@ -189,10 +186,8 @@ public void shouldPublishSomePayloadTypesWithNoError() throws Exception { @DisplayName("Should publish various payload types to different topics") public void testPubSub() throws Exception { - DaprObjectSerializer serializer = createJacksonObjectSerializer(); - // Send a batch of messages on one topic - try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer(SERIALIZER).build()) { sendBulkMessagesAsText(client, TOPIC_NAME); @@ -202,10 +197,10 @@ public void testPubSub() throws Exception { PubSubIT.MyObject object = new PubSubIT.MyObject(); object.setId("123"); client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block(); - System.out.println("Published one object."); + LOG.info("Published one object."); client.publishEvent(PUBSUB_NAME, TYPED_TOPIC_NAME, object).block(); - System.out.println("Published another object."); + LOG.info("Published another object."); //Publishing a single byte: Example of non-string based content published publishOneByteSync(client, TOPIC_NAME); @@ -221,7 +216,7 @@ public void testPubSub() throws Exception { //Publishing a cloud event. client.publishEvent(new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEvent) .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event."); + LOG.info("Published one cloud event."); { CloudEvent cloudEventV2 = new CloudEvent<>(); @@ -234,7 +229,7 @@ public void testPubSub() throws Exception { client.publishEvent( new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2) .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event for v2."); + LOG.info("Published one cloud event for v2."); } { @@ -248,13 +243,13 @@ public void testPubSub() throws Exception { client.publishEvent( new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3) .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event for v3."); + LOG.info("Published one cloud event for v3."); } Thread.sleep(2000); callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME); + LOG.info("Checking results for topic " + TOPIC_NAME); List messages = client.invokeMethod( PUBSUB_APP_ID, @@ -289,7 +284,7 @@ public void testPubSub() throws Exception { }, 2000); callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME + " V2"); + LOG.info("Checking results for topic " + TOPIC_NAME + " V2"); List messages = client.invokeMethod( PUBSUB_APP_ID, @@ -304,7 +299,7 @@ public void testPubSub() throws Exception { }, 2000); callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME + " V3"); + LOG.info("Checking results for topic " + TOPIC_NAME + " V3"); List messages = client.invokeMethod( PUBSUB_APP_ID, @@ -319,7 +314,7 @@ public void testPubSub() throws Exception { }, 2000); callWithRetry(() -> { - System.out.println("Checking results for topic " + TYPED_TOPIC_NAME); + LOG.info("Checking results for topic " + TYPED_TOPIC_NAME); List> messages = client.invokeMethod( PUBSUB_APP_ID, @@ -338,7 +333,7 @@ public void testPubSub() throws Exception { }, 2000); callWithRetry(() -> { - System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME); + LOG.info("Checking results for topic " + ANOTHER_TOPIC_NAME); List messages = client.invokeMethod( PUBSUB_APP_ID, @@ -369,15 +364,15 @@ public void shouldPublishBinary() throws Exception { DaprObjectSerializer serializer = createBinaryObjectSerializer(); - try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer(serializer).build()) { publishOneByteSync(client, BINARY_TOPIC_NAME); } Thread.sleep(3000); - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { callWithRetry(() -> { - System.out.println("Checking results for topic " + BINARY_TOPIC_NAME); + LOG.info("Checking results for topic " + BINARY_TOPIC_NAME); final List messages = client.invokeMethod( PUBSUB_APP_ID, "messages/binarytopic", @@ -510,7 +505,7 @@ private void validatePublishedMessages(DaprClient client) { public void testPubSubTTLMetadata() throws Exception { // Send a batch of messages on one topic, all to be expired in 1 second. - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME); //Publishing messages @@ -520,16 +515,16 @@ public void testPubSubTTLMetadata() throws Exception { message, Map.of(Metadata.TTL_IN_SECONDS, "1")) .block(); - System.out.printf("Published message: '%s' to topic '%s' pubsub_name '%s'%n", message, TOPIC_NAME, PUBSUB_NAME); + LOG.info("Published message: '{}' to topic '{}' pubsub_name '{}'\n", message, TOPIC_NAME, PUBSUB_NAME); } } // Sleeps for two seconds to let them expire. Thread.sleep(2000); - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { callWithRetry(() -> { - System.out.println("Checking results for topic " + TTL_TOPIC_NAME); + LOG.info("Checking results for topic " + TTL_TOPIC_NAME); final List messages = client.invokeMethod(PUBSUB_APP_ID, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block(); assertThat(messages).hasSize(0); @@ -552,10 +547,10 @@ public void testLongValues() throws Exception { values.add(val); } Iterator valuesIt = values.iterator(); - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { for (int i = 0; i < NUM_MESSAGES; i++) { PubSubIT.ConvertToLong value = valuesIt.next(); - System.out.println("The long value sent " + value.getValue()); + LOG.info("The long value sent " + value.getValue()); //Publishing messages client.publishEvent( PUBSUB_NAME, @@ -574,9 +569,9 @@ public void testLongValues() throws Exception { } Set actual = new HashSet<>(); - try (DaprClient client = createDaprClientBuilder().build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { callWithRetry(() -> { - System.out.println("Checking results for topic " + LONG_TOPIC_NAME); + LOG.info("Checking results for topic " + LONG_TOPIC_NAME); final List> messages = client.invokeMethod( PUBSUB_APP_ID, "messages/testinglongvalues", @@ -591,31 +586,6 @@ public void testLongValues() throws Exception { } } - private static DaprClientBuilder createDaprClientBuilder() { - return new DaprClientBuilder() - .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getHttpPort()) - .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getGrpcPort()); - } - - private DaprObjectSerializer createJacksonObjectSerializer() { - return new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(o); - } - - @Override - public T deserialize(byte[] data, TypeRef type) throws IOException { - return OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); - } - - @Override - public String getContentType() { - return "application/json"; - } - }; - } - private @NotNull DaprObjectSerializer createBinaryObjectSerializer() { return new DaprObjectSerializer() { @Override diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java index 0fc85a2a8a..30e9204018 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java @@ -23,6 +23,8 @@ import io.dapr.client.domain.CloudEvent; import io.dapr.it.pubsub.http.PubSubIT; import io.dapr.springboot.annotations.BulkSubscribe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -44,6 +46,7 @@ public class SubscriberController { private final Map>> messagesByTopic = Collections.synchronizedMap(new HashMap<>()); + private static final Logger LOG = LoggerFactory.getLogger(SubscriberController.class); @GetMapping(path = "/messages/{topic}") public List> getMessagesByTopic(@PathVariable("topic") String topic) { @@ -80,7 +83,7 @@ public List getMessagesReceivedTestingTopicV3() { @GetMapping(path = "/messages/topicBulkSub") public List getMessagesReceivedTestingTopicBulkSub() { - System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + LOG.info("res size: " + responsesReceivedTestingTopicBulkSub.size()); return responsesReceivedTestingTopicBulkSub; } @@ -91,7 +94,7 @@ public Mono handleMessage(@RequestBody(required = false) CloudEvent envelo try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + LOG.info("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); messagesReceivedTestingTopic.add(envelope); } catch (Exception e) { throw new RuntimeException(e); @@ -106,7 +109,7 @@ public Mono handleBulkTopicMessage(@RequestBody(required = false) CloudEve try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType); + LOG.info("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType); messagesReceivedBulkPublishTopic.add(envelope); } catch (Exception e) { throw new RuntimeException(e); @@ -138,7 +141,7 @@ public Mono handleMessageV3(@RequestBody(required = false) CloudEvent enve try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + LOG.info("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); messagesReceivedTestingTopicV3.add(envelope); } catch (Exception e) { throw new RuntimeException(e); @@ -153,7 +156,7 @@ public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent

handleBinaryMessage(@RequestBody(required = false) CloudEvent try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType); + LOG.info("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType); messagesByTopic.compute("binarytopic", merge(envelope)); } catch (Exception e) { throw new RuntimeException(e); @@ -182,7 +185,7 @@ public Mono handleMessageAnotherTopic(@RequestBody(required = false) Cloud return Mono.fromRunnable(() -> { try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); - System.out.println("Another topic Subscriber got message: " + message); + LOG.info("Another topic Subscriber got message: " + message); messagesByTopic.compute("anothertopic", merge(envelope)); } catch (Exception e) { throw new RuntimeException(e); @@ -195,7 +198,7 @@ public Mono handleMessageTTLTopic(@RequestBody(required = false) CloudEven return Mono.fromRunnable(() -> { try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); - System.out.println("TTL topic Subscriber got message: " + message); + LOG.info("TTL topic Subscriber got message: " + message); messagesByTopic.compute("ttltopic", merge(envelope)); } catch (Exception e) { throw new RuntimeException(e); @@ -209,7 +212,7 @@ public Mono handleMessageLongValues(@RequestBody(required = false) CloudEv return Mono.fromRunnable(() -> { try { Long message = cloudEvent.getData().getValue(); - System.out.println("Subscriber got: " + message); + LOG.info("Subscriber got: " + message); messagesByTopic.compute("testinglongvalues", merge(cloudEvent)); } catch (Exception e) { throw new RuntimeException(e); @@ -231,7 +234,7 @@ public Mono handleMessageLongValues(@RequestBody(required = false) CloudEv public Mono handleMessageBulk( @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { return Mono.fromCallable(() -> { - System.out.println("bulkMessage: " + bulkMessage.getEntries().size()); + LOG.info("bulkMessage: " + bulkMessage.getEntries().size()); if (bulkMessage.getEntries().size() == 0) { BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>()); @@ -243,7 +246,7 @@ public Mono handleMessageBulk( List entries = new ArrayList<>(); for (BulkSubscribeMessageEntry entry: bulkMessage.getEntries()) { try { - System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId()); + LOG.info("Bulk Subscriber got entry ID: %s\n", entry.getEntryId()); entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); } catch (Exception e) { entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); @@ -251,7 +254,7 @@ public Mono handleMessageBulk( } BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries); responsesReceivedTestingTopicBulkSub.add(response); - System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + LOG.info("res size: " + responsesReceivedTestingTopicBulkSub.size()); return response; }); From 0645dbd9b8be1da946c816990079c855469c263c Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Mon, 14 Jul 2025 16:10:58 -0300 Subject: [PATCH 5/5] Use custom ObjectSerializer Signed-off-by: Matheus Cruz --- .../pubsub/http/DaprPubSubIT.java | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java index dbaeaac6a2..8d7111952d 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -12,7 +12,12 @@ */ package io.dapr.it.testcontainers.pubsub.http; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.dapr.client.DaprClient; import io.dapr.client.DaprPreviewClient; import io.dapr.client.domain.BulkPublishEntry; @@ -45,6 +50,7 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -78,7 +84,7 @@ public class DaprPubSubIT { private static final Random RANDOM = new Random(); private static final int PORT = RANDOM.nextInt(1000) + 8000; private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*"; - private static final DaprObjectSerializer SERIALIZER = new CustomizableObjectSerializer(new ObjectMapper()); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String PUBSUB_APP_ID = "pubsub-dapr-app"; private static final String PUBSUB_NAME = "pubsub"; @@ -92,7 +98,6 @@ public class DaprPubSubIT { private static final String TTL_TOPIC_NAME = "ttltopic"; private static final String LONG_TOPIC_NAME = "testinglongvalues"; - private static final int NUM_MESSAGES = 10; // typeRefs @@ -164,7 +169,7 @@ public void shouldPublishSomePayloadTypesWithNoError() throws Exception { try ( DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer( - SERIALIZER) + createJacksonObjectSerializer()) .buildPreviewClient() ) { @@ -187,7 +192,9 @@ public void shouldPublishSomePayloadTypesWithNoError() throws Exception { public void testPubSub() throws Exception { // Send a batch of messages on one topic - try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer(SERIALIZER).build()) { + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).withObjectSerializer( + createJacksonObjectSerializer() + ).build()) { sendBulkMessagesAsText(client, TOPIC_NAME); @@ -604,4 +611,23 @@ public String getContentType() { } }; } + + private DaprObjectSerializer createJacksonObjectSerializer() { + return new DaprObjectSerializer() { + @Override + public byte[] serialize(Object o) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsBytes(o); + } + + @Override + public T deserialize(byte[] data, TypeRef type) throws IOException { + return OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); + } + + @Override + public String getContentType() { + return "application/json"; + } + }; + } }