1. Introduction
RabbitMQ is a flexible message broker agnostic to the type of messages it can hold on all the queues. The consumer application must be able to parse those messages accordingly, so we must pass on additional information about the message. This article will look at handling multiple message types in Spring AMQP.
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
In case you are feeling baffled by RabbitMQ? I understand–it’s easy to get tangled up in tech talk. Don’t stress, though: a quick read about RabbitMQ architecture will make you hop ahead confidently!
Also, If you try to follow this tutorial using your IDE, I will assume you already have RabbitMQ inside the docker image. You may want to complete the tutorial to get RabbitMQ running in Docker if you don’t.
2.1. Generate Project Template
Let’s look at a practical example. First, we must generate a blank spring boot project template using Spring Initializr with all the required dependencies. The Spring AMQP project is officially supported by Spring as part of the spring boot starter amqp dependency.
Click generate and import into your IDE.
2.2. Add Project Configuration
Let’s start with configuration by adding Jackson dependency to the gradle file. It will be required for ObjectMapper definition.
implementation 'com.fasterxml.jackson.core:jackson-databind'
Next, we define our ObjectMapper bean, which can be injected anywhere in the spring boot application.
@Configuration public class JsonConfig { @Bean public ObjectMapper objectMapper() { var objectMapper = new ObjectMapper(); objectMapper.findAndRegisterModules(); return objectMapper; } }
In Spring Boot, we can use beans to define RabbitMQ resources.
The RabbitConfiguration contains a topic exchange and a queue bound together using a binding key (routing key). The same routing key needs to match when publishing messages onto the topic exchange; otherwise, the queue won’t receive messages. Those RabbitMQ resources will be created using Spring upon application startup.
With Spring Boot, we don’t need to worry about connection management and defining RabbitMQ access as long as we are happy to use the default values that point to your local machine.
@Configuration public class RabbitConfiguration { public static final String USERS_EXCHANGE = "users.exchange"; public static final String USERS_QUEUE = "users.queue"; public static final String USERS_ROUTING_KEY = "users"; @Bean TopicExchange usersExchange() { return new TopicExchange(USERS_EXCHANGE); } @Bean Queue usersQueue() { return QueueBuilder.durable(USERS_QUEUE) .build(); } @Bean Binding bindingUsers() { return BindingBuilder .bind(usersQueue()) .to(usersExchange()) .with(USERS_ROUTING_KEY); } }
Finally, the two user objects represent the old and new formats.
@Data @NoArgsConstructor @AllArgsConstructor public class OldFormatUser { private Long id; private String name; private String email; }
@Data @NoArgsConstructor @AllArgsConstructor public class NewFormatUser { private Long id; private String fullName; private String phoneNumber; private String email; }
3. Handle Message Types in Spring AMQP
RabbitMQ message broker allows users to store multiple message types on a single exchange. Different routing keys can route the messages from the publisher to exchange bound queues, allowing different messages to be sent and received without conflicts.
The consumer applications can use message headers or encoding in the message body to correctly deserialize the messages, which can be handled accordingly based on that information.
3.1. Message Headers Handling
We can use the message headers to determine the object type for deserialization. Below is a consumer class that retrieves the header responsible for the type determination, which is used in a switch statement to deserialize the RabbitMQ message into a particular object type. The routing key specified during the queue exchange binding determines which messages will be received by the consumer.
@Log4j2 @Component public class Consumer { private final ObjectMapper objectMapper; public Consumer(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @RabbitListener(queues = {RabbitConfiguration.USERS_QUEUE}) public void consume(Message message) throws IOException { Object user; String type = message .getMessageProperties() .getHeaders() .get("type") .toString(); switch (type) { case "old": user = objectMapper .readValue(message.getBody(), OldFormatUser.class); break; case "new": user = objectMapper .readValue(message.getBody(), NewFormatUser.class); break; default: user = null; break; } log.info("Successfully deserialized user: {}", user); } }
Start the application and navigate to the RabbitMQ management page. In the exchanges tab, select the created exchange and send a message in the publish message section.
The above message should be deserialized into OldFormatUser. If we post another JSON message while amending the header type to new, we will get the NewFormatUser object.
This is because we have used message headers to determine the message type in the application.
{ "id":2, "fullName":"Tom Smith", "phoneNumber":"0000000", "email":"tom12@gmail.com" }
One of the main advantages of using the message headers approach is the possibility of using the header value as a routing key to publish the messages to the headers exchange. While the headers exchange performs considerably slower than other exchange types, it is the only one that works directly with message headers.
3.2. Encode Type in Message Body
We need to add a dependency to the gradle file as we will be parsing the message into JSON.
implementation 'org.json:json:20220924'
Consumer two is somehow similar to the previous consumer, but instead, we are extracting the message type from the content of the body. The same message is converted to JSON, and after the type is known, we deserialize it into the object.
@Log4j2 @Component public class ConsumerTwo { private final ObjectMapper objectMapper; public ConsumerTwo(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @RabbitListener(queues = {RabbitConfiguration.USERS_QUEUE}) public void consume(Message message) throws IOException { Object user; JSONObject userWithType = new JSONObject(new String(message.getBody())); String type = userWithType.optString("type"); String userString = userWithType.getJSONObject("user").toString(); switch (type) { case "old": user = objectMapper .readValue(userString, OldFormatUser.class); break; case "new": user = objectMapper .readValue(userString, NewFormatUser.class); break; default: user = null; break; } log.info("Successfully deserialized user: {}", user); } }
Before the application was restarted, I disabled the consumer from the previous example.
Publish the following JSON String message on the exchange using the RabbitMQ console. The user is deserialized into the expected type. This is because we are passing an extra attribute in the message payload.
Send another message onto the topic exchange with a different type than before.
{ "type":"new", "user":{ "id":2, "fullName":"Tom Smith", "phoneNumber":"0000000000", "email":"tom12@gmail.com" } }
4. Summary
In this article, we have looked at processing multiple message types in Spring AMQP. Spring AMQP allows us to handle various message types easily, but deciding on the best approach can be challenging. For example, if you’re using header exchange, things like headers might make perfect sense – though it requires extra work when parsing JSON messages. However, if such a property exists on the message, the body encoding could be a perfect choice!
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply