1. Introduction
While the error rate on well-designed applications may be relatively low, the number of errors may grow exponentially when processing data at scale. Handling errors gracefully is an essential aspect of any commercial application. This article will look at how to handle errors in Spring AMQP.
2. Prerequisites and Setup
You may skip this section if you are not following this tutorial thoroughly and only want to look at code examples.
In case you are feeling baffled by RabbitMQ? I understand–it’s easy to get tangled up in tech talk. Don’t stress, though: a quick read about RabbitMQ architecture will have you hop ahead confidently!
Also, If you try to follow this tutorial using your IDE, I will assume you already have RabbitMQ inside the docker image. If you don’t, please follow another tutorial to get RabbitMQ running in a Docker container.
2.1. Generate Project Template
Next, we generate a blank spring boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.
2.2. Add Project Configuration
Let’s start with the basic configuration by adding Jackson dependency to the gradle file. It will be required for Jackson’s message converter to create objects from JSON messages.
implementation 'com.fasterxml.jackson.core:jackson-databind'
In our RabbitConfiguration configuration class, we created RabbitMQ resources programmatically using Spring bean definitions. On application startup, any queues, exchanges and bindings are created application startup automatically. We are using a routing key to bind an exchange with a queue.
The rabbit listener uses Jackson’s message converted to deserialise JSON into Java Objects. As this configuration is set globally for the entire project, all RabbitMQ consumers will inherit this message converter by default.
@Configuration public class RabbitConfiguration { public static final String USERS_EXCHANGE = "users.exchange"; public static final String USERS_QUEUE = "users.queue"; public static final String USERS_ROUTING_KEY = "register.user"; @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean TopicExchange usersExchange() { return new TopicExchange(USERS_EXCHANGE); } @Bean Queue usersQueue() { return QueueBuilder.durable(USERS_QUEUE) .build(); } @Bean Binding bindingUsers() { return BindingBuilder .bind(usersQueue()) .to(usersExchange()) .with(USERS_ROUTING_KEY); } }
The User.class will be used in deserialisation.
@Data @NoArgsConstructor @AllArgsConstructor public class User { private Long id; private String name; private String email; }
3. Spring AMQP Default Requeue Logic
The created consumer consumes users’ objects and tries to validate their email addresses using pattern matching. The application responds with an email exception when the email address is invalid. As a result, the processing will be stopped. Start the spring boot application after adding the consumer and email exception classes.
public class EmailException extends Exception { public EmailException(String message) { super(message); } }
@Log4j2 @Component public class Consumer { public static final Pattern VALID_EMAIL_ADDRESS_REGEX = Pattern.compile("^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$", Pattern.CASE_INSENSITIVE); @RabbitListener(queues = {RabbitConfiguration.USERS_QUEUE}) public void consume(User user) { log.info(String.format("Received deserialized user: %s", user)); var matcher = VALID_EMAIL_ADDRESS_REGEX.matcher(user.getEmail()); if (!matcher.find()) { throw new EmailException("invalid email address"); } log.info(String.format("Successfully validated user: %s", user)); } }
To test our consumer in the RabbitMQ management console (http://localhost:15672/), log in using the default credentials and in the exchanges tab, click on the exchange that our application has created. The Bellow section should be visible, add the content and click publish message.
Since the email address was valid, this should have been processed successfully. The relevant logs are visible in the console. Let’s publish another message with an invalid email address.
{"id":1,"name":"Daniel","email":"daniel1234gmail.com"}
Now the rabbit message listener failed, and we are getting an exception. The failed message was expected as this is the behaviour we have defined in the consumer.
After unsuccessful processing, by default, the message is enqueued back to the queue and processed again and again, like in an infinite loop. Let’s take a look at how we can change that.
4. Handle Errors With Global Error Handler
In the SimpleRabbitListenerContainerFactory, we can provide a custom implementation of a handler that will be available globally for error handling. It will contain explicit handling for our custom application exceptions. The global error handler internally uses ConditionalRejectingErrorHandler, which decides whether a message should be rejected if it is defined as fatal. The rejected message will not be re-queued.
Let’s start by adding CustomExceptionStrategy.class that extends ConditionalRejectingErrorHandler, where we provide our implementation of the email exception we have created. We will ensure that any email exceptions thrown won’t be reprocessed, and custom error handling can be included inside the method. For this example will just be logged into the console.
@Log4j2 public class CustomExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { @Override public boolean isFatal(Throwable throwable) { if (throwable.getCause() instanceof EmailException) { log.error("Exception has occurred : {} ", throwable.getMessage()); return true; } return false; } }
Next, we must extend our RabbitConfiguration by specifying the handler in the SimpleRabbitListenerContainerFactory implementation.
@Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setErrorHandler(errorHandler()); return factory; }
@Bean public ErrorHandler errorHandler() { return new ConditionalRejectingErrorHandler(customExceptionStrategy()); } @Bean FatalExceptionStrategy customExceptionStrategy() { return new CustomExceptionStrategy(); }
Restart the application and publish another message with an invalid email address. The message is rejected, so it won’t be reprocessed because in our CustomExceptionStrategy we have defined the email exception as fatal.
{"id":1,"name":"Daniel","email":"daniel1234gmail.com"}
5. Dead Letter Queue
A dead letter queue is a queue that holds failed messages. It allows us to analyse the message content, understand the failure reason, and, if possible, develop a strategy for reprocessing. At the same time doesn’t degrade consumer performance through endless reprocessing.
It’s time to amend our RabbitConfiguration.class to add a new x dead letter exchange and queue bound together using the dead letter routing key. The setDefaultRequeueRejected param is false, and we no longer need to pass the handler. Additionally, the user queue arguments to include dead letter exchange details will ensure that failed messages can be routed accordingly.
At this point, I have deleted all the resources manually via the management console because I want to create everything from scratch using the updated configuration file.
@Configuration public class RabbitConfiguration { public static final String EXCHANGE = "users.exchange"; public static final String DLE = "users.exchange.dle"; public static final String QUEUE = "users.queue"; public static final String DEAD_LETTER_QUEUE = "users.queue.dlq"; public static final String KEY = "register.user"; public static final String KEY_DLQ = "register.user.dlq"; @Bean("rabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setErrorHandler(errorHandler()); factory.setDefaultRequeueRejected(false); factory.setErrorHandler(errorHandler()); return factory; } @Bean TopicExchange usersExchange() { return new TopicExchange(EXCHANGE); } @Bean DirectExchange usersDeadLetterExchange() { return new DirectExchange(DLE); } @Bean Queue usersQueue() { return QueueBuilder.durable(QUEUE) .withArgument("x-dead-letter-exchange", DLE) .withArgument("x-dead-letter-routing-key", KEY_DLQ) .build(); } @Bean Queue usersDeadLetterQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE) .build(); } @Bean Binding bindingUsers() { return BindingBuilder .bind(usersQueue()) .to(usersExchange()) .with(KEY); } @Bean Binding bindingDLQUsers() { return BindingBuilder .bind(usersDeadLetterQueue()) .to(usersDeadLetterExchange()) .with(KEY_DLQ); } }
Start the application and ensure the above resources are created in the RabbitMQ console. Publish the below JSON message with an invalid email onto the users’ exchange. After the exception, the message is placed on the user’s dead letter queue, and the application can perform the processing without performance degradation.
{"id":1,"name":"Daniel","email":"daniel1234gmail.com"}
6. Summary
This article has looked at how to handle errors in Spring AMQP. Spring AMQP is designed to handle errors with finesse, providing a custom error handler that allows users to dictate how exceptions are managed explicitly. For those pesky messages you can’t seem to shake off – don’t worry! The dead letter queue or exchange approach comes to the rescue, offering an extra set of hands for reprocessing and investigating any trouble-making messages.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply