1. Introduction
Kafka Exactly-Once Processing is a powerful feature that ensures message delivery between producers and consumers without duplication or loss. This ability is critical for applications that require high reliability, data integrity, and accuracy. Integrating Kafka Exactly-Once Processing with Spring Boot simplifies the implementation of end-to-end stream processing tasks, making it more convenient and efficient for developers to build and maintain their applications.
Utilizing Exactly Once semantics in Kafka and Spring Boot ensures your application performs well and maintains data consistency and accuracy throughout its lifecycle. This integration is particularly valuable for handling complex data-processing tasks that demand high precision and fault tolerance.
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Apache Kafka inside the docker image. You may want to read how to run Kafka if you don’t.
2.1. Generate Project Template
To create a Spring Boot Kafka project, we must first use Spring Initializr to create a Spring Boot project template with all the required dependencies. We can import the template into our IDE and continue the development process after it has been generated.

3. Exactly-Once Processing in Kafka
In the Kafka platform, ensuring that your messages are delivered using exactly once semantics is important for the consistency and correctness of your data processing. Exactly-once processing means that each message is delivered and processed at least once, avoiding duplicates and data inaccuracies.
It’s a common misconception that the phrase “exactly once” only refers to the entire sequence; however, the consume and consume+process operations must be executed at least once for the entire sequence to be successful.
Kafka provides several mechanisms to achieve this, such as the Transactional API, isolation level, and idempotent producers.
The key components that enable exactly-once processing are:
- Transactional API: An application programming interface known as a transactional API enables you to create and consume messages within transactions while preserving atomicity and consistency. Atomicity refers to treating a transaction as a single unit of work, with the result that either every operation within the transaction is completed or none are. Consistency means that both before and after the transaction, the database is still in a usable state.
- Idempotence: The producer’s inherent quality of idempotence guarantees that delivering the same message more than once has the same result as sending it just once, preventing unnecessary processing. In other words, just one copy of the message is processed if the same message is transmitted more than once. This is crucial because messages may get lost or duplicated in distributed systems due to problems with the network or other factors. Idempotence guarantees consistency by ensuring that messages are only processed once.
- Isolation level: The isolation level controls uncommitted messages’ visibility, supporting read committed and uncommitted configurations. A transaction can only see modifications already committed by other transactions if it is read committed. A transaction can view changes that other transactions haven’t yet committed to, which is referred to as being read uncommitted.
3.1. Implementing Exactly-Once Processing in Kafka
To ensure that exactly once semantics happen in your Kafka application, you must:
- Configure the producer to be idempotent via ProducerFactory configuration.
- Use the Transactional API to send and consume messages.
- Set the isolation level of consumers to read committed via ConsumerFactory.
4. Setting Up Spring Boot Kafka Project
To begin, set up your Spring Boot application for Kafka integration. First, we need to adjust your build.gradle file to include Jackson databind dependency:
implementation 'com.fasterxml.jackson.core:jackson-databind'
Next, we are going to create ProducerConfiguration using the ProducerFactory object that provides configuration for Kafka producers. To enable an idempotent producer with exactly once delivery, we need to set the ENABLE_IDEMPOTENCE_CONFIG to true. Finally, we configure a KafkaTransactionManager to bind a Kafka producer to the thread for transactional operations to enable exactly-once semantics processing.
import static org.apache.kafka.clients.producer.ProducerConfig.*; @Configuration public class ProducerConfiguration { @Bean public ProducerFactory<String, Person> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); config.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); config.put(TRANSACTIONAL_ID_CONFIG, "person-transaction"); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, Person> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public PlatformTransactionManager transactionManager() { return new KafkaTransactionManager<>(producerFactory()); } }
To configure a Kafka consumer, we need to define a ConsumerConfiguration. The consumer’s ISOLATION_LEVEL_CONFIG property set to read_committed will ensure it only reads committed messages, whereas ENABLE_AUTO_COMMIT_CONFIG false and AckMode.MANUAL_IMMEDIATE will ensure we can acknowledge the message manually after successful processing.
import static org.apache.kafka.clients.consumer.ConsumerConfig.*; @Configuration public class ConsumerConfiguration { @Bean public ConsumerFactory<String, Person> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(ISOLATION_LEVEL_CONFIG, "read_committed"); config.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties() .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
Finally, the Person class is used for deserialization.
@Data @AllArgsConstructor @NoArgsConstructor public class Person { private String name; private int age; }
5. Creating Kafka Producer and a Consumer
Creating a transactional Kafka producer with Spring Boot is simple. We would use Spring @Transactional annotation. If the sendMessage method were to throw an exception somewhere after the kafkaTemplate.send, the same transaction rollback would be used without sending the message to the Kafka topic.
@Service @AllArgsConstructor public class PersonProducer { private KafkaTemplate<String, Person> kafkaTemplate; @Transactional public void sendMessage(String topic, Person personMessage) { kafkaTemplate.send(topic, personMessage); } }
The PersonConsumer uses @KafkaListener to listen for messages with the Acknowledgment argument that can be used for a manual acknowledgement after successful processing.
@Slf4j @Service public class PersonConsumer { @KafkaListener(topics = "exactly_once_topic", groupId = "test") public void listen(Person personMessage, Acknowledgment acknowledgment) { log.info(String.format( "Received person: %s, age: %d", personMessage.getName(), personMessage.getAge()) ); acknowledgment.acknowledge(); } }
Running the application should insert two-person messages into the exactly_once_topic topic.
To fully understand what is going on, please try the following:
- Throw an exception in the sendMessage method in the PersonProducer class. The message would be rolled back regardless of where it is thrown due to the @Transactional annotation.
- Remove acknowledgment.acknowledge() from PersonConsumer. The message will still be processed and logged, but the consumer offset won’t be committed because ackMode mode has been set to manual. So upon restart, the same message would be processed again.
If you have a Kafka broker running locally, you can verify the consumer offsets with the following commands:
[1] docker exec -it broker bash (winpty at the start if on windows) [2] kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test
The CURRENT-OFFSET should have a different value than the LOG-END-OFFSET.
6. Conclusion
Kafka’s implementation of exactly once delivery and processing is a crucial first step in creating scalable and dependable distributed systems. You can avoid double processing and keep your system consistent by ensuring that messages are only processed once by creating a Kafka producer and consumer using transactions and idempotence to implement exactly-once processing in Kafka. Using these tools and methods, you may create dependable distributed systems that can handle a lot of concurrency and scale to suit your company’s demands.
Daniel Barczak is a software developer with over 9 years of professional experience. He has experience with several programming languages and technologies and has worked for businesses ranging from startups to big enterprises. Daniel in his leisure time likes to experiment with new smart home gadgets and explore the realm of home automation.
Leave a Reply