1. Introduction
When you work with Kafka, message processing is of paramount importance. You want to ensure that your messages are consumed and processed successfully by the consumers. One way to achieve this is through acknowledgements, a crucial part of Kafkaโs message consumption process. Kafka acknowledgements allow the consumer to inform the producer or the Kafka broker that a message has been received and successfully processed. This enables more fine-grained control over message consumption and commits in a Spring Boot application and the ability to handle errors or delays in processing.
In this article, weโll discuss Kafka acknowledgements in Spring Boot and how they can help you manage message delivery and improve the overall reliability of your Kafka-based applications.
2. Understanding Kafka Acknowledgements
Acknowledgements is a mechanism in Apache Kafka that guarantees reliable message delivery between producers and consumers. Automatic and manual acknowledgements are both available.
Automatic acknowledgement is the default behaviour for consumers. As a result, whenever a message is consumed, the consumer instantly notifies the broker that the message has been properly received. This acknowledgement is appropriate when high throughput is more important than message loss.
However, with manual acknowledgement ack, the user must manually acknowledge each message processing. When message loss is unacceptable, this kind of acknowledgement offers more control over message delivery and may be helpful. Yet this can increase latency and reduce throughput because the consumer must wait for the acknowledgement before moving on to the next message.
2.1. Producer Acknowledgements
In the producer configuration, the acks parameter can be set to enable acknowledgements for producers. The acks option specifies how many acknowledgements the producer needs to receive before a message is deemed successfully dispatched. For acks, there are three possibilities:
2.1.1. Potential data loss (Acks=0)
The producer does not wait for an acknowledgement from the broker before considering a message as sent but transmits the data. In that instance, thereโs a chance of data loss because if you transmit the data to your broker and the broker goes down, we wonโt know because we set acks=0, so we wonโt get an acknowledgement and will lose the data.
This option provides the highest throughput but offers no message delivery guarantee because the broker never replies to the producer.
2.1.2. Limited data loss (Acks=1)
The producer waits for the leader broker to acknowledge receiving the communication before continuing, and replication is not guaranteed. Replication occurs in the background, but it is not required to receive a response. If no acknowledgement is received, the producer will use the retry mechanism.
This choice strikes a compromise between reliability and throughput.
2.1.3. No data loss (Acks=all)
The producer waits for acknowledgements from all brokers in the cluster before considering a message as sent. Otherwise, the retry logic will be invoked. This option provides the highest level of reliability but comes at the cost of reduced throughput.
3. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
Iโll assume that Apache Kafka is already running in the docker image if youโre trying to follow this tutorial using your IDE. If you donโt, you might wish to read how to run Kafka.
3.1. Generate Project Template
To create a Spring Boot Kafka project, we must use Spring Initializr to create a blank template with all the necessary dependencies. Click generate and import into your IDE.
3.2. Add Project Configuration
To configure our Spring Boot application, first, we need to add the Jackson databind and spring boot starter web dependency to our build.gradle file:
implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'org.springframework.boot:spring-boot-starter-web'
The ProducerConfiguration in Apache Kafka sets the specifics of the key-value serializer, the KafkaTemplate bean, and the Kafka broker. The Kafka broker is identified by setting the bootstrap.servers field in the ProducerConfiguration.
We instruct Kafka on the messageโs serialisation by setting the key-value serializer values in the ProducerConfiguration. Common data types like strings, integers, and JSON all have built-in serializers in Kafka. However, a custom serializer must be created if the message key or value is a custom data type.
import static org.apache.kafka.clients.producer.ProducerConfig.*; @Configuration public class ProducerConfiguration { @Bean public ProducerFactory<String, Person> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, Person> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
Similarly, the ConsumerConfiguration in Apache Kafka is a crucial element that instructs the application on consuming Kafka messages. One of the most important components of the ConsumerConfiguration is the deserializer, which transforms the messageโs byte array into an object that the application can use.
import static org.apache.kafka.clients.consumer.ConsumerConfig.*; @Configuration public class ConsumerConfiguration { @Bean public ConsumerFactory<String, Person> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(GROUP_ID_CONFIG, "test"); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
4. Kafka Consumer Acknowledgement
We must create a producer service to send messages to the Kafka topic. The producer service normally consists of a method that accepts a message as input and uses the KafkaTemplate bean to send it to a particular Kafka topic. Simple strings or custom objects have been serialised using the designated key-value serializer:
@Service @AllArgsConstructor public class PersonProducer { private KafkaTemplate<String, Person> kafkaTemplate; public void sendMessage(Person personMessage) { kafkaTemplate.send("ack_topic", personMessage); } }
For testing purposes, we will expose an endpoint in a controller that can handle requests to submit messages to a Kafka topic while using the above producer class. In this scenario, a /person/insert endpoint will be used to receive requests to add new people to the Kafka broker:
@RestController @RequestMapping("/person") @AllArgsConstructor public class PersonController { private PersonProducer personProducer; @PostMapping(value = "/insert", produces = "application/json") public void insertPerson(@RequestBody Person person) { personProducer.sendMessage(person); } }
An application must build a consumer class to handle the logic for obtaining and processing messages from the Kafka topic. The PersonConsumer is the consumer class in this instance. It uses KafkaListener annotation to specify the topic it subscribes to and the method that should be called when a new message is received. The method will accept the message as an argument and take some action in response to the information in the message:
@Slf4j @Service public class PersonConsumer { @KafkaListener(topics = "ack_topic", groupId = "test") public void listen(Person personMessage) { log.info(String.format( "Received person: %s, age: %d", personMessage.getName(), personMessage.getAge()) ); } }
Start the application and send a request to the person controller we have created. Here I am using postman, but you can use anything you want:
The application console should display a log message that has been received. Still, you can verify that it exists on the topic by executing the following commands in order:
[1] docker exec -it broker bash (winpty at the start if on windows) [2] kafka-console-consumer --bootstrap-server=localhost:9092 --topic ack-topic --from-beginning
By default, the Kafka consumer acknowledgement is set to automatic, which causes the offset to auto-commit as soon as the message is received. The failed messages wonโt be reprocessed even if the consumer throws exception during the processing.
We can check the consumer offset by executing kafka-consumer-groups.sh script. The current-offset should match the log-end-offset, meaning the consumer is up to date with the processing.
[1] kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test
5. Manual Acknowledgement
Manual acknowledgement is a method that allows the consumer to acknowledge the successful processing of a message expressly. Unlike automatic acknowledgement, which auto-commits the offset immediately after receiving the message, manual acknowledgement requires the consumer to commit the offset after processing the message.
To enable manual acknowledgements in our application, we need to disable ENABLE_AUTO_COMMIT_CONFIG config in our ConsumerFactory and set ack mode to MANUAL_IMMEDIATE:
import static org.apache.kafka.clients.consumer.ConsumerConfig.*; @Configuration public class ConsumerConfiguration { @Bean public ConsumerFactory<String, Person> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(GROUP_ID_CONFIG, "test"); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); config.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties() .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
Restart the application and send another message; even though the application has processed it, the current-offset wonโt be committed, and it wonโt match the log-end-offset because we need to acknowledge the message manually:
[1] kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test
To manually acknowledge, we need to use the Acknowledgment ack object. Update the PersonConsumer:
@Slf4j @Service public class PersonConsumer { @KafkaListener(topics = "ack_topic", groupId = "test") public void listen(Person personMessage, Acknowledgment ack) { log.info(String.format( "Received person: %s, age: %d", personMessage.getName(), personMessage.getAge()) ); ack.acknowledge(); } }
Again restart the application the same message will be processed, but the consumerโs offset will be committed this time.
6. Conclusion
Finally, Apache Kafka provides several types of acknowledgements for producers and consumers that can be set based on the needs of the use case. The acknowledgement level for producers can be set to Acks=0, Acks=1, or Acks=all, depending on the desired level of data loss tolerance. While Acks=0 has the highest throughput, it also has the greatest risk of data loss. Acks=all, on the other hand, Acks=all offers the maximum level of data loss tolerance but the lowest throughput.
For consumers, Kafka supports automatic acknowledgement by default which means that the offset is committed automatically once the message is received. Manual acknowledgement, on the other hand, might be utilised when additional control over message processing and delivery is desired. Manual acknowledgement necessitates the consumer manually committing the offset after processing the message, giving the customer more control over message processing and delivery.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply