1. Introduction
RabbitMQ is like a well-run mailroom – when messages are delivered, it just wants to ensure they are received and handled correctly. Spring Boot AMQP lets u send an ‘ack’ (or acknowledgement) and lets RabbitMQ know that the message was safely served up for processing by your application! In this article, we’ll take more of a close look at acknowledgements in Spring AMQP so you can ensure all those outbound messages get where they need to go without fail.
2. Acknowledgements in Spring Boot AMQP
An acknowledgement in RabbitMQ is essentially an agreement from one party (a consumer) to receive messages from another party (a publisher). It confirms that the responsibility of handling the messages has been taken on, as well as providing a valuable means of ensuring reliable messaging within RabbitMQ.
Spring Boot AMQP has excellent support for RabbitMQ. By default, message acknowledgements are automatic. When a consumer acknowledges, the message is automatically removed from the queue. In case of an exception in the listener, it will be rejected, re-queued and processed like a new message. Changing this behaviour by setting the property defaultRequeueRejected (re-queue) to false is possible. Passing the DLQ(dead letter exchange) as an argument to the processing queue allows us to route the same message to different work queues for further analysis.
Here is how different configurations will affect the handling of successful and rejected acknowledgements:
- ack – the messages are processed successfully and will be disregarded even when the task queues configure a DLQ since there are no errors.
- nack and re-queue true – default configuration; unacknowledged messages are rejected and returned to the same queue to be processed again. The odd messages could be stuck in this state without proper error handling.
- nack and re-queue false – The messages will be discarded after rejection and loss.
- nack re-queue false and DLQ – upon rejection, the message will get routed to the DLQ so it can be analysed later and reprocessed.
3. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
In case you are feeling baffled by RabbitMQ? I understand–it’s easy to get tangled up in tech talk. Don’t stress, though: a quick read of RabbitMQ architecture will make you hop ahead confidently!
If you try to follow this tutorial using your IDE, I will assume that you already have RabbitMQ inside the docker image. You may look at another article to get RabbitMQ running in Docker.
3.1. Generate Project Template
Next, we generate a blank Spring boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.
3.2. Add Initial Project Files
Let’s start with configuration by adding Jackson dependency to the gradle file.
implementation 'com.fasterxml.jackson.core:jackson-databind'
The RabbitConfiguration is a configuration class that overrides RabbitListenerContainerFactory passing the appropriate message converted. It also creates an exchange with a queue bound to receive a message payload with the specified routing key.
RabbitMQ hash (#) is the default routing key when no key is specified.
@Configuration public class RabbitConfiguration { public static final String EXCHANGE = "users.exchange"; public static final String DLE = "users.exchange.dle"; public static final String QUEUE = "users.queue"; public static final String ROUTING_KEY = "register.user"; @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean TopicExchange usersExchange() { return new TopicExchange(EXCHANGE); } @Bean Queue usersQueue() { return QueueBuilder.durable(QUEUE) .build(); } @Bean Binding bindingUsers() { return BindingBuilder .bind(usersQueue()) .to(usersExchange()) .with(ROUTING_KEY); }
Also, the custom EmailException will be used in the RabbitMQ consumer.
public class EmailException extends Exception { public EmailException(String message) { super(message); } }
Finally, the User Java object that we are going to deserialize into. In case you don’t know, we are using Lombok annotations to generate Java boilerplate code to save us some typing.
@Data @NoArgsConstructor @AllArgsConstructor public class User { private Long id; private String name; private String email; }
4. Nack – Consumer Negative Acknowledgement
The RabbitMQ consumer verifies an email to check if an email is valid. It will throw an exception when invalid, and the listener will respond with nack. Since the exception is not defined as fatal by default, it will be returned to the queue.
@Log4j2 @Component public class Consumer { public static final Pattern VALID_EMAIL_ADDRESS_REGEX = Pattern.compile("^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$", Pattern.CASE_INSENSITIVE); @RabbitListener(queues = {RabbitConfiguration.USERS_QUEUE}) public void consume(User user) throws EmailException { log.info(String.format("Received deserialized user: %s", user)); var matcher = VALID_EMAIL_ADDRESS_REGEX.matcher(user.getEmail()); if (!matcher.find()) { throw new EmailException("Invalid email address"); } log.info(String.format("Successfully validated user: %s", user)); } }
Start the application and log in to the RabbitMQ server management console (http://localhost:15672/) with default credentials. In the exchanges tab, click on the created exchange and publish the below test message.
Since the message is enqueued back after the exception, we keep processing in an infinite loop. Let’s amend our RabbitListenerContainerFactory in the RabbitConfiguration to include the below configuration.
@Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setDefaultRequeueRejected(false); return factory; }
Restart the application. The message isn’t enqueued anymore but dropped after unsuccessful processing. We can also store the failed message by storing it on a separate queue for investigation.
Here is the updated RabbitConfiguration we have added an exchange and a queue that will hold error messages. A newly created exchange is passed as an argument to the processing queue, so RabbitMQ will use it to route messages when the exception occurs.
@Configuration public class RabbitConfiguration { public static final String EXCHANGE = "users.exchange"; public static final String DLE = "users.exchange.dle"; public static final String QUEUE = "users.queue"; public static final String DLQ = "users.queue.dlq"; public static final String ROUTING_KEY = "register.user"; public static final String ROUTING_KEY_DLQ = "register.user.dlq"; @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setDefaultRequeueRejected(false); return factory; } @Bean TopicExchange usersExchange() { return new TopicExchange(EXCHANGE); } @Bean DirectExchange usersDeadLetterExchange() { return new DirectExchange(DLE); } @Bean Queue usersQueue() { return QueueBuilder.durable(QUEUE) .withArgument( "x-dead-letter-exchange", DLE ) .withArgument( "x-dead-letter-routing-key", ROUTING_KEY_DLQ ) .build(); } @Bean Queue usersDeadLetterQueue() { return QueueBuilder.durable(DLQ) .build(); } @Bean Binding bindingUsers() { return BindingBuilder .bind(usersQueue()) .to(usersExchange()) .with(ROUTING_KEY); } @Bean Binding bindingDLQUsers() { return BindingBuilder .bind(usersDeadLetterQueue()) .to(usersDeadLetterExchange()) .with(ROUTING_KEY_DLQ); } }
We have modified the existing queue by adding routing arguments. Next, we must remove the existing queue and exchange using the RabbitMQ server management console to recreate it from scratch. Once done, restart the application and check in the console that the updated resources have been created.
Publish the same message as previously using the script below.
{"id":1,"name":"Tom","email":"tom12gmail.com"}
This time the message is routed via the dead letter exchange to the users.queue.dlq queue. The message can be accessed through the queue from the get messages tab in the console.
5. Ack – Consumer Positive Acknowledgement
A successful ack acknowledgement will automatically tell the RabbitMQ server that a message has been appropriately handled. As a result, the message will be removed from a queue even when a dead letter queue (DLQ) is linked via an argument. The DLQ argument is only relevant when there is a processing error.
Let’s post another message in the console but in a valid format.
{"id":1,"name":"Tom","email":"tom12@gmail.com"}
The message is processed successfully and removed from the queue permanently.
5.1. Manual Acknowledgement
In rare circumstances, the acknowledgement can also be ack: ed manually. We must set the acknowledge mode to manual in the RabbitListenerContainerFactory to do this.
@Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setDefaultRequeueRejected(false); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; }
Then our consumer class must pass the channel and delivery tag. The channel is used to perform the manual ack: ing. If we have omitted channel.basicAck call in manual acknowledge mode, the message would end up unacked in the listener queue even when it was processed successfully.
@Log4j2 @Component public class Consumer { public static final Pattern VALID_EMAIL_ADDRESS_REGEX = Pattern.compile("^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$", Pattern.CASE_INSENSITIVE); @RabbitListener(queues = {RabbitConfiguration.USERS_QUEUE}) public void consume( User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws EmailException, IOException { log.info(String.format("Received deserialized user: %s", user)); var matcher = VALID_EMAIL_ADDRESS_REGEX.matcher(user.getEmail()); if (!matcher.find()) { throw new EmailException("Invalid email address"); } log.info(String.format("Successfully validated user: %s", user)); channel.basicAck(tag, false); } }
Restart the application and post the message.
{"id":1,"name":"Tom","email":"tom12@gmail.com"}
Feel free to comment out the basicAck invocation to see the message stuck unacked in the queue.
6. Reliable Publishing – Publisher Confirmation
Publisher confirmation is a RabbitMQ feature that allows work queues to ensure messages have been delivered directly to the consumer or via a new queue with multiple consumers. When enabled, the broker returns an acknowledged delivery receipt when it’s accepted by the consumer or forwarded to the new queue.
Thanks to publisher confirmation, work queues know if their message has made it out of the sending RabbitMQ instance, ensuring reliable messaging and preventing lost communications.
When publisher confirmation is enabled on the channel, messages are asynchronously confirmed by the broker. This guarantees successful handling on the server while providing a mechanism for taking actions if not.
Let’s implement this in Spring Boot and add the below into the application.yml file inside the main resources folder.
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
Next, we must set the publisher template with the appropriate message converter to deserialize the messages into Java classes.
@Bean RabbitTemplate rabbitTemplate( ConnectionFactory connectionFactory, MessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } @Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
Finally, the publisher class sends the user message with additional correlation data. When the publisher receives confirmation, the correlation object responds with ack/nack. The correlation object must be created using a unique id to be associated with the failed message.
Since we have enabled publisher returns in the yml configuration, we can also access and take action on the returned message if, for example, the response is a nack.
@Log4j2 @Component public class Publisher { private final RabbitTemplate rabbitTemplate; public Publisher(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void publishUserMessage(User user) throws Exception { setConfirmCallback(); log.info("About to publish user message: {}", user); CorrelationData correlationData = new CorrelationData("Correlation for msg " + user.getId()); rabbitTemplate.convertAndSend( RabbitConfiguration.USERS_EXCHANGE, RabbitConfiguration.USERS_ROUTING_KEY, user, correlationData ); CorrelationData.Confirm confirm = correlationData.getFuture().get(5, TimeUnit.SECONDS); log.info("Confirm received, ack = {}", confirm.isAck()); } private void setConfirmCallback() { this.rabbitTemplate.setConfirmCallback( (correlation, ack, reason) -> { if (correlation != null) { log.info("Received {} for correlation {}", (ack ? "ack" : "nack"), correlation ); } }); } }
Lastly, the controller so we can publish some messages manually.
@RestController @RequestMapping("/users/v1") public class Controller { private final Publisher publisher; public Controller(Publisher publisher) { this.publisher = publisher; } @PostMapping("/add") public ResponseEntity<String> sendMessage( @RequestBody User user) throws Exception { publisher.publishUserMessage(user); return ResponseEntity.ok().build(); } }
Start the application, and using any command line tool or Postman, let’s send a request to the above endpoint. I will use bash and curl to send the below request. We can see in the console that we have received confirmation for our published message.
curl --request POST -H "Content-Type: application/json" -d '{"id":1,"name":"Tom","email":"tom12@gmail.com"}' http://localhost:8080/users/v1/add
7. Summary
In this post, we have looked at acknowledgements in Spring AMQP. Consumers have an array of choices when it comes to handling nacked messages. When guaranteeing your message makes its way through any pesky network problems that might prevent delivery on RabbitMQ’s end, look no further than Publisher Confirmation; reliable publishing at its best!!
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply