1. Introduction
What if I told you that creating Apache Kafka consumers with Spring Boot is very simple? With the power of annotations, we can provide advanced functionality with only a few lines of code. The Spring Boot framework has excellent support for Apache Kafka. This article will look at various ways to create Spring Boot Kafka Consumer.
2. Prerequisites and Setup
You may skip this section if you are not following this tutorial thoroughly and only want to look at different Apache Kafka consumer examples in this Spring Boot Application.
If you try to follow this tutorial using your IDE, I will assume you already have an Apache Kafka server inside the docker image. You may want to follow this to get the Kafka server running if you don’t.
2.1. Create Kafka Topics with Data
First, we must enter the Apache Kafka server container via any Unix shell executing [1] a command where the broker is the name we have allocated to Kafka in the docker-compose file. Following this, we will create topics with data so that multiple consumers can process it. Running [2] will make a topic word-processor with a single partition. Finally, the Kafka producer script [3] adds data to the partition. We need to add one element per line separating using the enter key. Exit the typing mode using cntr+c. Ultimately we can verify successful insertion by running [4]
[1] docker exec -it broker bash
[2] kafka-topics -bootstrap-server broker:9092 -create -topic word-processor -partitions 1
[3] kafka-console-producer -broker-list broker:9092 -topic word-processor
>Car
>Animal
>Robot
[4] kafka-console-consumer -bootstrap-server broker:9092 -topic word-processor -from-beginning
2.2. Generate Project
Now that we have the topic with data, we can generate a blank spring boot application using Spring Initializr with all the required dependencies. Click generate and import into your IDE.
3. Kafka Consumer in Spring Boot Application
First, we must add the application.yml file to the resources folder with the following configuration. Each Spring Kafka consumer needs to be assigned a group id since the Kafka partition can only have one consumer per group. The auto offset earliest is set for the message processing from the beginning of the topic partition.
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
Next is our consumer class identified by @KafkaListener annotation with the topic name it’s currently listening on. Create the following file and run the project; you should see a log entry per string message in the application console.
@Log4j2
@Service
public class KafkaConsumer {
@KafkaListener(topics = "word-processor")
public void listenAll(String message) {
log.info("Processing word : {}", message);
}
}
Imagine that we need another consumer to process all the messages from the same topic but to filter out words that start with an A. We must override the same consumer group id defined globally in the application because the existing one has already processed each string message.
@KafkaListener(topics = "word-processor", groupId = "filtered-consumer")
public void listenAndFilter(String message) {
if (!message.startsWith("A")) {
log.info("Processing word : {}", message);
}
}
Running the above code should return two string message logs ignoring the Animal based on the custom filtering.
As you have noticed, the previously consumed messages are reprocessed regardless of the prior consumer because each consumer offset is maintained independently for each consumer group. Each Kafka topic has a message retention policy to preserve the data for a specific time.
3.1. Parallel Processing Spring Boot Kafka Consumer
It’s possible to enable parallel processing by providing a concurrency setting in the @KafkaListener. The concurrency param represents several consumer threads consuming messages in parallel. The listener must be thread-safe, so if the shared state cannot be avoided, it must be protected by locks.
@KafkaListener(
topics = "word-processor",
concurrency = "3",
groupId = "parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
log.info("Partition : {}, Msg: {}",
consumerRecord.partition(), consumerRecord.value());
}
Currently, our word-processor topic has a single partition, so with the above setup, two consumers would be idle, and only one would be consuming send messages. Let’s add two more partitions to the topic.
kafka-topics -bootstrap-server broker:9092 -alter -topic word-processor -partitions 3
We can verify the above command result by executing the below command.
kafka-topics -bootstrap-server broker:9092 -describe -topic word-processor
Starting the Spring Boot app should provide us with logging similar to the one below. As we can see, we have assigned a single consumer per partition, which is now ready to process messages.
[Consumer clientId=consumer-parallel-consumer-1, groupId=parallel-consumer] Adding newly assigned partitions: word-processor-0
[Consumer clientId=consumer-parallel-consumer-2, groupId=parallel-consumer] Adding newly assigned partitions: word-processor-1
[Consumer clientId=consumer-parallel-consumer-3, groupId=parallel-consumer] Adding newly assigned partitions: word-processor-2
3.2. Parallel Processing From Multiple Topics
Parallel processing is not limited to a single topic. Having a single listener, we can easily consume messages from multiple topics. First, let’s create another topic with some data.
Execute the following command on the Kafka broker image to create a new topic.
kafka-topics -bootstrap-server broker:9092 -create -topic word-processor-two -partitions 1
We send messages to the newly created topic using the Kafka producer script. Each message has to be typed individually. To exit write mode in the command line, use ctrl + c.
kafka-console-producer -broker-list broker:9092 -topic word-processor-two
>Bike
>Computer
>Jacket
Finally, our listener class is configured to consume messages from two different topics. Start the application and verify that everything works as expected.
@KafkaListener(
topics = {"word-processor", "word-processor-two"},
concurrency = "2",
groupId = "multi-topic-parallel-consumer")
public void consume(ConsumerRecord<String, String> consumerRecord) {
log.info("Partition : {}, Msg: {}",
consumerRecord.partition(), consumerRecord.value());
}
4. JSON Spring Boot Kafka Consumer
Up to this point, we have dealt with simple strings, but with the extra java configuration, we can process more complex messages like JSON. We can make use of Jackson to parse JSON to custom objects. Let’s add those dependencies to the Gradle build file.
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
We will add a new topic named numbers inside the Kafka broker container [1]. Next, insert each JSON message separately, including a key identifier for each using Kafka producer script [2].
[1] kafka-topics -bootstrap-server broker:9092 -create -topic numbers -partitions 1
[2] kafka-console-producer -broker-list broker:9092 -topic numbers -property parse.key=true -property key.separator=,
even,{"number":10}
odd,{"number":11}
even,{"number":20}
odd,{"number":21}
Exit the view with ctrl + c and verify the inserted data by executing the following.
kafka-console-consumer -bootstrap-server broker:9092 --property print.key=true -topic numbers -from-beginning
The data in the topic is ready, so now we need to add NumberObj.class model to serialise the JSON value and JsonConfig.class to register the instance of ObjectMapper as a Spring Boot bean.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NumberObj {
private int number;
}
@Configuration
public class JsonConfig {
@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
return objectMapper;
}
}
Finally, inside our KafkaConsumer.class we autowire the ObjectMapper and create a new KafkaListener. When retrieving the response as CosumerRecord, we can access the message key, value, partition, and more.
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "numbers")
public void process(ConsumerRecord<String, String> record)
throws JsonProcessingException {
var number = objectMapper.readValue(record.value(), NumberObj.class);
log.info(" Key: {}, Partition : {}, Msg: {}",
record.key(), record.partition(), number);
}
Verify the results by running the Spring Boot application.
4.1. Custom Consumer Filtering Strategy
With the power of Spring Boot, we can create a Spring Kafka listener container factory that will contain a custom filtering strategy. In this example, we will filter our even numbers while odd will be returned. Here is a code sample to show how it can be accomplished.
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean(name = "evenNumFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object>
evenNumFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
factory.setRecordFilterStrategy(consumerRecord -> {
try {
var numberValue = consumerRecord.value().toString();
var number = objectMapper.readValue(numberValue, NumberObj.class);
return number.getNumber() % 2 == 0;
} catch (JsonProcessingException e) {
return false;
}
});
return factory;
}
}
Ultimately, we need to update our consumer @KafkaListener annotation to include the container factory we have created in the Apache Kafka configuration file.
@KafkaListener(topics = "numbers", containerFactory = "evenNumFactory")
The initial offset must be restored to reprocess the existing message using the new filtering strategy [1].
Start the application; only the odd numbers should be logged while the even numbers are filtered out from the processing.
[1] kafka-consumer-groups -bootstrap-server broker:9092 -group default-spring-consumer -reset-offsets -to-earliest -topic numbers -execute
4.2. Error Handling Using Retry Logic
Sometimes things may go not as expected, and some send messages may cause exceptions in the consumer. While this post is not dedicated to error handling, let’s look at how to send error messages to a separate Kafka topic for investigation later.
Below we have created a container factory that uses DefaultErrorHandler to send error messages to a numbers-dead Kafka topic.
The FixedBackOff specifies the number of retries and the duration between each. The message is sent to the topic once the number of retries has been exhausted.
@Bean(name = "numbersDltContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageDltFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
KafkaTemplate<String, String> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
var messageRecovery = new DeadLetterPublishingRecoverer(kafkaTemplate,
((consumerRecord, exception) ->
new TopicPartition("numbers-dead", consumerRecord.partition())));
var backOff = new FixedBackOff(1000, 3);
var defaultErrorHandler = new DefaultErrorHandler(messageRecovery, backOff);
factory.setCommonErrorHandler(defaultErrorHandler);
return factory;
}
The modified Spring Kafka consumer throws an exception when the processing number is odd.
@KafkaListener(
topics = "numbers",
containerFactory = "numbersDltContainerFactory",
groupId = "error-handling-consumer")
public void process(ConsumerRecord<String, String> record)
throws JsonProcessingException {
var number = objectMapper.readValue(record.value(), NumberObj.class);
if (number.getNumber() % 2 != 0) {
throw new IllegalArgumentException("Can only process even numbers");
}
log.info(" Key: {}, Partition : {}, Msg: {}",
record.key(), record.partition(), number);
}
Run the Spring Boot application and check the numbers-dead topic containing the custom messages with odd numbers.
kafka-console-consumer -bootstrap-server broker:9092 -topic numbers-dead -from-beginning
5. Summary
In this article, we have explored how to create a Spring Boot Kafka Consumer. Have you ever wondered how to bring simplicity and concurrency when consuming Kafka? Look no further – the @KafkaListener annotation in Spring Boot is here! By setting your custom consumer group, you can now easily define parameters for parallel processing within one simple Spring Boot bean. Plus, thanks to record filtering strategies, getting just the data you need has never been more accessible.
Leverage the container factory method to ensure your consumer application remains steadfast and dependable. The intrinsic retry logic and capacity to store away errors on a distinct topic built into this approach help guarantee service stability.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply