1. Introduction
While messaging via Kafka is fantastic, there’s nothing fun about doubling the output when something goes wrong! Check out our article on creating a Kafka Idempotent Consumer in Spring to protect you from the mayhem of duplicated events processing. That way, you can ensure you don’t get saddled with repeated messages in exceptions or network hiccups – no one wants that headache!
2. Idempotence
Idempotence results from applying something multiple times without causing side effects, regardless of the number of times it has been used. Double output is not idempotent.
In the Spring Kafka Idempotent Consumer pattern, the application should be able to process duplicate messages that don’t produce any side effects causing discrepancies in the data. We will apply a filter strategy and use a unique identifier, which can be generated if it doesn’t exist on the message already.
This identifier will be used as a lookup to determine whether the same message processing occurred previously.
Since this lookup functionality has to be stored somewhere depending on individual circumstances, we can use persistent storage like a database or temporarily like a cache. Several different factors determine the appropriate storage:
- The period in which duplicates may occur
- Is performance a considered factor
- Typical application throughput
2.1. Delivery Semantics
Delivery semantics in Apache Kafka refers to the guarantees that a message or the same event will be reliably delivered between two or more systems.
2.1.1. At Most Once Semantics
At most once delivery, the offset is committed immediately so the duplicate messages won’t be a problem. If something goes wrong during the processing, the same message will be lost and not reprocessed.
This is, by default, an adopted type by Kafka consumers but should only be used when the prospect of losing a message is acceptable while high throughput and low latency are needed.
2.1.2. At Least Once Semantics
With at least once delivery, the offset commit happens after the message delivery and processing; if something goes wrong, the consumer will reprocess the message. This requires creating an idempotent consumer instance (reprocessing an already processed message won’t impact the system). While the throughput and latency may take a hit by reprocessing, no message will be lost as we can manually commit the consumer offset after a successful process.
2.1.3. Exactly Once Semantics
In the exactly once delivery, the message is processed once and should never be lost. It is hard to create consumers for this semantic because we still need to think about idempotency and in the event of an exception atomic transaction where either all or none will happen, similar to the commit-rollback strategy in the database.
When the exactly once semantics happen, it will have lower throughput and latency than the other two.
3. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Kafka inside the docker image. If you don’t, you may want to follow this to get Kafka running.
3.1. Create Kafka Topics with Data
Firstly we need to insert some data into the Kafka topic, so by running any command line tool, we need to get onto the broker image and create a new topic. Then insert some JSON data and verify successful insertion.
The above can be achieved by executing the command below:
[1] docker exec -it broker bash [2] kafka-topics -bootstrap-server broker:9092 -create -topic orders -partitions 1 [3] kafka-console-producer -broker-list broker:9092 -topic orders >{"id":1,"name":"computer","price":1000,"amount":1} >{"id":2,"name":"console","price":599,"amount":1} >{"id":3,"name":"earphones","price":99,"amount":2} [4] kafka-console-consumer -bootstrap-server broker:9092 -topic orders -from-beginning
3.2. Generate Project Template
Now that we have the topic with data, we can generate a blank project using Spring Initializr to generate the template with all the required dependencies. Click generate and import into your IDE.
4. Idempotent Spring Kafka Consumer Instance
We will start by adding some Gradle dependencies for Jackson and Caffeine cache dependency, followed by the application.yml config file inside the resources folder.
In the configuration we have specified consumer offset strategy and group id.
implementation 'com.github.ben-manes.caffeine:caffeine' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.core:jackson-annotations'
spring: kafka: consumer: group-id: default-spring-consumer auto-offset-reset: earliest
Following, we define the ObjectMapper Spring bean for parsing JSON messages into Java object classes.
@Configuration public class JsonConfig { @Bean public ObjectMapper objectMapper() { var objectMapper = new ObjectMapper(); objectMapper.findAndRegisterModules(); return objectMapper; } }
Furthermore, we need to add a cache configuration. For this example, we will assume that duplicate messages cannot be received after 120 minutes, so this is the expiration time for our data in the cache. Here we are using the Caffeine caching library, but Spring provides generic support for different caching implementations, ensuring you can use caching solutions you are familiar with.
@Configuration public class CacheConfig { public static final String CACHE_NAME = "orderCache"; @Bean(name = CACHE_NAME) public Cache<Long, Order> orderCache() { return Caffeine .newBuilder() .expireAfterWrite(120, TimeUnit.MINUTES) .build(); } }
Also, the order class that we are going to parse JSON messages into.
@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private String name; private String price; private int amount; }
Finally, let’s create the Kafka consumer. It will contain logic that If an order exists in the cache, it has been processed previously, so further processing will be omitted. Otherwise, we will process the order with the result stored in the cache. The order id will be a key that is unique per each message.
@Log4j2 @Service public class IdempotentConsumer { private final ObjectMapper objectMapper; @Qualifier(CACHE_NAME) private final Cache<Long, Order> cache; public IdempotentConsumer(ObjectMapper mapper, Cache<Long, Order> cache){ this.objectMapper = mapper; this.cache = cache; } @KafkaListener(topics = "orders") public void listenAll(String message) throws JsonProcessingException { var order = objectMapper.readValue(message, Order.class); if (ofNullable(cache.getIfPresent(order.getId())).isEmpty()) { processOrder(order); } log.info("Finished processing order: {}", order); } public void processOrder(Order order) { log.info("Order: {} not processed previously", order); cache.put(order.getId(), order); } }
Running the application, we should see the below log message in the console. Order messages should be cached for the next 120 minutes.
Order(id=1, name=computer, price=1000, amount=1) not processed previously Finished processing order: Order(id=1, name=computer, price=1000, amount=1) Order(id=2, name=console, price=599, amount=1) not processed previously Finished processing order: Order(id=2, name=console, price=599, amount=1) Order(id=3, name=earphones, price=99, amount=2) not processed previously Finished processing order: Order(id=3, name=earphones, price=99, amount=2)
Let’s try adding another order that will be a duplicate. In the container, execute:
[1] kafka-console-producer -broker-list broker:9092 -topic orders >{"id":1,"name":"computer","price":1000,"amount":1}
In the application console, we should only see the finished processing order message because, after the cache lookup, we wouldn’t go into the process order method. Therefore we have avoided double processing by doing a cache lookup, and our consumer is idempotent.
5. Summary
With this piece, we dove into the mysterious waters of idempotency. And just like that – problem solved! In spring, we built a resilient Kafka Idempotent Consumer to tame double output and equipped it with an insider’s cache to recognise duplicate messages. Hence, no surprise extra orders arrive at your doorstep.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply