1. Introduction
Using a message broker like RabbitMQ to process messages in the microservices environment is common. Each service will listen for messages on a specific queue so they can be consumed as soon as they become available. Fortunately, Spring boot has excellent support for RabbitMQ, and with the power of annotations, we can get it configured in no time. This article will examine how to consume RabbitMQ messages with Spring AMQP Consumer.
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you try to follow this tutorial using your IDE, I will assume that you already have RabbitMQ inside the docker image. If you don’t, you may want to complete the tutorial to run RabbitMQ in a Docker container.
In case you are feeling baffled by RabbitMQ? I understand–it’s easy to get tangled up in tech talk. Don’t stress, though: a quick read about RabbitMQ architecture will have you hop ahead confidently!
2.1. Generate Project Template
Next, we generate a blank spring boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE. One of the dependencies is Spring boot RabbitMQ which belongs to the Spring AMQP project.
3. Create RabbitMQ Resources
When RabbitMQ runs in a container, we should be able to navigate to http://localhost:15672/ and log in using the default credentials 2x guest. First, we need to add a new exchange. The exchanges tab has a section below where we can add a new exchange.
Once the exchange is created, we need to create a queue. The queues tab will show all the queues currently on RabbitMQ with a way view to add new ones.
Finally, we need to add a relationship between the two called binding. Binding is required when a queue is interested in messages from a given exchange.
Since this is a fanout exchange, all messages will be routed automatically, even when multiple consumers are without a routing key. However, for any other type of exchange, when messages are sent with routing keys, the exchange will send the message to the queues that match the binding.
The below view is available when you go to the exchanges tab and click on the created exchange.
4. Consume RabbitMQ Message
The RabbitConfiguration.class is a configuration class that contains RabbitMQ configuration. Since we will rely on the Spring AMQP (Advanced Message Queuing Protocol) default settings, we don’t need to worry about connection management and can focus on resource declaration.
Let’s start by adding a queue name that matches what we created via the management page.
@Configuration public class RabbitConfiguration { protected static final String itemsQueue = "items.queue"; }
Our consumer class can be identified by the listener class annotated with @RabbitListener. It’s necessary to annotate it with @Component; otherwise, when you start the application, nothing will get consumed, and messages will stay in a queue.
@Log4j2 @Component public class Consumer { @RabbitListener(queues = {RabbitConfiguration.itemsQueue}) public void consume(String message) { log.info(String.format("Received message: %s", message)); } }
Let’s start the spring boot RabbitMQ application and publish the message below. When consuming messages, it should also get logged into the console. The view is available when you click on the available exchange in the exchanges tab.
4.1. JSON RabbitMQ Consumer
Receiving messages in JSON format is pretty typical, but we need to add some extra configuration. In the gradle file, let’s add jackson-databind to deserialize JSON into an actual object.
implementation 'com.fasterxml.jackson.core:jackson-databind'
Define object mapper bean in the configuration package.
@Configuration public class JsonConfig { @Bean public ObjectMapper objectMapper() { var objectMapper = new ObjectMapper(); objectMapper.findAndRegisterModules(); return objectMapper; } }
A Java object that we are going to deserialize into.
@Data @NoArgsConstructor @AllArgsConstructor public class Item { private Long id; private String name; private BigDecimal price; }
Our consumer method has changed, so we can read the JSON string and build an object. The method throws exceptions when JSON cannot be deserialized properly.
@Log4j2 @Component public class Consumer { private final ObjectMapper objectMapper; public Consumer(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @RabbitListener(queues = {RabbitConfiguration.itemsQueue}) public void consumeAsJson(String message) throws JsonProcessingException { var item = objectMapper.readValue(message, Item.class); log.info(String.format("Received json message: %s", item)); } }
Restart the spring boot application and post another message with the below payload. Again result should be displayed in the console.
{"id":1,"name":"phone","price":"799"}
4.2. Custom RabbitMQ Listener Container Factory
We can configure the RabbitListenerContainerFactory bean for more custom configurations by providing several settings that override the default settings. Here we have provided a custom message converted that internally uses ObjectMapper to deserialize when publishing and serialize when consuming messages.
The concurrency setting enables the parallel processing of messages from a queue. Initially, there are two consumers with the potential to scale up to 5 if they cannot cope with the incoming load.
@Configuration public class RabbitConfiguration { protected static final String itemsQueue = "items.queue"; @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(2); factory.setMaxConcurrentConsumers(5); return factory; } }
Next, we need to use the RabbitListenerContainerFactory by providing the name inside the @RabbitListener annotation.
@Log4j2 @Component public class Consumer { @RabbitListener( queues = {RabbitConfiguration.itemsQueue}, containerFactory = "rabbitListenerContainerFactory") public void consumeAsJson(Item item) { log.info(String.format("Received deserialized item: %s", item.toString())); } }
Restart the spring boot application. Publish some JSON messages in the same format as before, which should be logged into the console. In the RabbitMQ management page, go to the queues tab and click on the items.queue, and you should see the number of consumers attached set to 2. When testing manually, one will likely be idle unless you provide sufficient load or change the prefetch count to something shallow, like 1.
4.3. Multiple Consumers
When multiple consumers listen on the same RabbitMQ queue, they compete for the messages, meaning that when the exchange routes messages, they are consumed without any particular order. This can be an issue when messages must be processed in the order.
5. Summary
In this article, we have looked at how to consume RabbitMQ messages with Spring AMQP Consumer. As we explored, Spring makes it simple to consume RabbitMQ messages with a few customization modifications. Our custom container listener factory bean provides an array of options, allowing us to choose which consumers get the complete treatment and how their settings should be handled.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply