Skip to content

AMQP Support

Erich Jagomägis edited this page Nov 14, 2023 · 2 revisions

This will configure JSON message converter for both serialization and deserialization. If Trace ID propagation is enabled, Trace ID propagation will be also auto-configured for AMQP messages. By default error handling is configured to drop messages if listener throws an exception. Recommended way to avoid data loss is to configure Dead Letter Exchange.

Enabling feature

In order to enable the feature, following property should be added to application.properties file

ee.bitweb.core.amqp.auto-configuration=true

Library does not include any transitive dependencies, thus following dependencies should be added to runtime classpath:

	// https://mvnrepository.com/artifact/org.springframework.amqp/spring-amqp
	implementation 'org.springframework.boot:spring-boot-starter-amqp'

If org.springframework.boot:spring-boot-starter-web is not used, then you will need additional dependency:

	// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
	implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jacksonVersion}"

AmqpService

A helper class ee.bitweb.core.amqp.AmqpService is available to send messages to queue-s.

amqpService.sendMessage(
        REQUEST_QUEUE,
        responseQueue,
        request,
        message -> {
            message.getMessageProperties().setHeader(AmqpConstants.VERSION_HEADER_NAME, version.toString());
            message.getMessageProperties().setHeader(AmqpConstants.PROCESSLOCK_HEADER_NAME, processLockName);

            return null;
        }
);

Interceptors

If auto-configuration is enabled, interceptors will be added in the order defined by @Order annotation. If Trace ID feature is enabled via auto-configuration, Trace ID propagation interceptor will be loaded and forced to be the first interceptor in the list. Interceptors allow to process the message before it is passed down to listener. This can also be used to perform error handling.

In order to add a custom interceptor, define a bean of a class that implements interface ee.bitweb.core.amqp.AmqpListenerInterceptor.

Before publish message processors

Before publish message processors allow us to modify each message that is sent to the queue. It can be used to add some custom header, logging etc. If auto-configuration is enabled, processors will be added in the order defined by @Order annotation. If Trace ID feature is enabled via auto-configuration, Trace ID propagation processor will add Trace ID header to every message leaving the application.

In order to add a custom interceptor, define a bean of a class that implements interface ee.bitweb.core.amqp.AmqpBeforePublishMessageProcessor.

Example

@Slf4j
@RequiredArgsConstructor
public class AmqpTraceBeforePublishMessageProcessor implements AmqpBeforePublishMessageProcessor {

    private final AmqpTraceProperties properties;
    private final TraceIdContext context;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        log.debug("Adding trace id  {} to AMQP message header {} ", context.get(), properties.getHeaderName());

        message.getMessageProperties().setHeader(properties.getHeaderName(), context.get());

        return message;
    }
}

Clone this wiki locally