1. Introduction
No matter how smooth the ride, handling Kafka consumer errors is an inevitable part of messaging. This article will deal with those bumps while still moving ahead and learning from our mistakes (or processing issues). So strap on your safety belts – look at Spring Kafka Retry and Error Handling Guide!
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Apache Kafka inside the docker image. You may want to follow this to get Kafka running if you don’t.
2.1. Create a Kafka topic with data
Again we are not using a producer application to insert the data so we will add it manually. First, let’s get to the broker image [1] and create a topic partition [2] with some JSON data [3]. Finally, we can verify the send messages executing [4] to list them on the topic.
[1] docker exec -it broker bash
[2] kafka-topics -bootstrap-server broker:9092 -create -topic images -partitions 1
[3] kafka-console-producer -broker-list broker:9092 -topic images
>{"name":"dog","type":"png","size":10000}
>{"name":"cat","type":"jpg","size":12000}
>{"name":"holidays","type":"tiff","size":15000}
[4] kafka-console-consumer -bootstrap-server broker:9092 -topic images -from-beginning
2.2. Generate Project Template
Now that we have the event’s topic with data, we can generate a blank Spring Boot application using the Spring Initializr template with all the required dependencies. Click generate and import into your IDE.

2.3. Add Project Configuration
We are going to start by adding some Gradle dependencies inside build.gradle file for Jackson so we can deserialize the JSON into POJO classes. This is followed by creating the application.yml config file inside the main resources folder.
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
Next, we must define our ObjectMapper bean to be injected anywhere in our application.
@Configuration
public class JsonConfig {
@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
return objectMapper;
}
}
Finally, the image Java class to deserialize JSON into.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Image {
private String name;
private String type;
private long size;
}
3. Error Handling Via Global Error Handler
Providing global error handling for our Kafka consumer is relatively simple. We need to implement a handle record method from the CommonErrorHandler interface where we can provide custom implementation when an exception is thrown. We have done this with GlobalErrorHandler class, but for simplicity, we will log the exception and the consumer failed message instead of proper handling.
@Log4j2
@Service
public class GlobalErrorHandler implements CommonErrorHandler {
@Override
public void handleRecord(
Exception thrownException,
ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.warn("Global error handler, error message: {}, record: {}",
thrownException.getCause().getMessage(), record.value());
}
}
Next, we need to use the above class inside the kafkaListenerContainerFactory bean overriding the default error handler with our customs one. In case you haven’t noticed, we are doing this inside the setCommonErrorHandler method.
@Configuration
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
public KafkaConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object>
containerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new GlobalErrorHandler());
return factory;
}
}
Finally, our Kafka consumer tests the GlobalErrorHandler class. The exception that we are throwing should be propagated and handled appropriately.
@Log4j2
@Service
public class ImageConsumer {
private final ObjectMapper objectMapper;
public ImageConsumer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "images")
public void listenAll(String message) throws JsonProcessingException {
var image = objectMapper.readValue(message, Image.class);
if (image.getType().equalsIgnoreCase("tiff")) {
throw new IllegalArgumentException("Unsupported format type tiff");
}
log.info("processed image: {}", image);
}
}
Starting the Spring Boot application should provide output similar to the one below. We processed two images, but the one with the tiff type has thrown an exception, handled by our GlobalErrorHandler class and logged to the console as specified in the implementation provided.
processed image: Image(name=dog, type=png, size=10000)
processed image: Image(name=cat, type=jpg, size=12000)
Global error handler, error message: Unsupported format type tiff, record: {"name":"holidays","type":"tiff","size":15000}
4. Error Handling Via Explicit Error Handler
Sometimes we may need precise error handling just for the selected consumers. Again we can create a Spring bean similar to the example before, but instead, we need to implement ConsumerAwareListenerErrorHandler and provide the implementation for the handleError method.
@Log4j2
@Service(value = "imageErrorHandler")
public class ImageErrorHandler implements ConsumerAwareListenerErrorHandler {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException exception,
Consumer<?, ?> consumer) {
log.warn("Image error handler, error message: {}, record: {}",
exception.getMessage(), message.getPayload());
return null;
}
}
Also, we need to update the KafkaListener annotation to include our ImageErrorHandler.
@KafkaListener(topics = "images", errorHandler = "imageErrorHandler")
public void listenAll(String message) throws JsonProcessingException {
var image = objectMapper.readValue(message, Image.class);
if (image.getType().equalsIgnoreCase("tiff")) {
throw new IllegalArgumentException("Unsupported format type tiff");
}
log.info("processed image: {}", image);
}
Let’s add another message to the topic [1] to see the new handler in action.
[1] kafka-console-producer -broker-list broker:9092 -topic images
>{"name":"school","type":"tiff","size":9000}
This should result in a console message similar to the one below. As you can see, the ImageErrorHandler takes priority over the GlobalErrorHandler we have previously defined.
Image error handler, error message: Unsupported format type tiff, record: {"name":"school","type":"tiff","size":9000}
5. Spring Kafka Retry – Blocking Retry
The Spring Kafka retry logic is similar to the previous error handler, but we use a container factory instead. This approach has a blocking retry logic, so in the event of an exception, further processing will be blocked until the number of retries is exhausted. Practical when the messages on the partition need to be processed in the order, but this will affect latency and throughput. The fixed backoff specifies the retry policy before the failed message is sent to a dead letter topic, sometimes called retry topic, for further investigation. Let’s add this bean to the KafkaConfig class created previously.
@Bean(name = "imageDltContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageDltFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTemplate<String, String> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
var messageRecovery = new DeadLetterPublishingRecoverer(kafkaTemplate,
((consumerRecord, exception) ->
new TopicPartition("image-dead", consumerRecord.partition())));
var backOff = new FixedBackOff(1000, 3);
var defaultErrorHandler = new DefaultErrorHandler(messageRecovery, backOff);
factory.setCommonErrorHandler(defaultErrorHandler);
return factory;
}
This time, we specify the container factory on the same consumer instead of an error handler.
@KafkaListener(topics = "images", containerFactory = "imageDltContainerFactory")
public void listenAll(String message) throws JsonProcessingException {
var image = objectMapper.readValue(message, Image.class);
if (image.getType().equalsIgnoreCase("tiff")) {
throw new IllegalArgumentException("Unsupported format type tiff");
}
log.info("processed image: {}", image);
}
Finally, add another error message [1] and start the application. The retry behaviour should retry three times before it’s moved onto the image-dead partition. We can check that by executing [2].
[1] kafka-console-producer -broker-list broker:9092 -topic images
>{"name":"friends","type":"tiff","size":10000}
[2] kafka-console-consumer -bootstrap-server broker:9092 -topic image-dead -from-beginning
6. Spring Kafka Retry – Non-blocking Retrying
Spring Boot has a built-in RetryableTopic annotation that allows non-blocking retrying logic for failed messages that throw an exception. It is convenient because we don’t need to implement retry logic ourselves.
The idea behind it is that the failed messages are moved to a retry topic each time they fail, which allows the processing of subsequent records during retries. It improves the latency and throughput when strict ordering is not needed.
@RetryableTopic(
attempts = "3",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
backoff = @Backoff(delay = 1000, maxDelay = 5_000, random = true),
dltTopicSuffix = "dead-two"
)
@KafkaListener(topics = "images")
public void listenAll(String message) throws JsonProcessingException {
var image = objectMapper.readValue(message, Image.class);
if (image.getType().equalsIgnoreCase("tiff")) {
throw new IllegalArgumentException("Unsupported format type tiff");
}
log.info("processed image: {}", image);
}
Add a message that throws an exception and a few that will be processed successfully while the application is stopped. Upon restart, you should be able to notice that while error messages are retrying, the other messages are being processed concurrently.
7. Summary
This post has looked at ways to handle transient errors via Spring Kafka Retry and global error handling. While consumer errors can be tricky to manage, you’ll have them quickly taken with the proper methods! Define either a global or topic-specific error handler, depending on your needs. With the container factory, we have implemented retry logic and the ability to store errors away on a separate topic; you can help ensure your consumer application remains stable and reliable.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply