1. Introduction
Spring Boot Kafka Json Serializer is a powerful tool that allows developers to integrate Kafka with their Spring Boot applications easily. By using Spring Boot Kafka JsonSerializer, developers can take advantage of the simplicity and familiarity of the Spring framework while handling serialization and deserialization of Java objects to JSON format, making it easier to work with message data in Kafka.
One of the main benefits of using Spring Boot Kafka JsonSerializer is its seamless integration with the Spring Kafka library. The JsonSerializer and JsonDeserializer classes provided by the library allow a Java object to be converted to and from JSON when sending and receiving messages through Kafka. This simplifies working with JSON data, eliminating the need to manually convert a Java object to JSON strings and vice versa.
Additionally, the provided JSON messages serializer can be customized to suit the specific requirements of your application.
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Apache Kafka inside the docker image. You may want to read how to run Kafka if you donโt.
2.1. Generate Project Template
To create Spring Boot Kafka project, we must generate a blank Spring Boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.
3. Spring Boot Kafka Json Serializer & Deserializer
Integrating Spring Boot with Kafka is incredibly simple, thanks to Spring Bootโs Kafka support. This allows developers to produce and consume JSON messages easily. In the following paragraphs, weโll explore how to configure a JsonSerializer and JsonDeserializer for your Kafka application.
First, letโs set up dependencies for your Spring Boot project. Add the following to your software management software; I will amend the build since I am using gradle.gradle:
implementation 'com.fasterxml.jackson.core:jackson-databind'
To start using the JsonSerializer and JsonDeserializer, configure your Spring Boot application with the following properties in the application.properties or application.yml file:
spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: default-spring-consumer auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Next, we create ProducerConfiguration, which will pass the key and value serializers to the KafkaTemplete via DefaultKafkaProducerFactory:
import static org.apache.kafka.clients.producer.ProducerConfig.*; @Configuration public class ProducerConfiguration { @Bean public ProducerFactory<String, Person> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, Person> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
The ConsumerConfiguration with deserialization config so the Kafka broker can understand how to deserialize the receiving JSON message:
import static org.apache.kafka.clients.consumer.ConsumerConfig.*; @Configuration public class ConsumerConfiguration { @Bean public ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Person> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } public ConsumerFactory<String, Person> consumerFactory() { Map<String, Object> conf = new HashMap<>(); conf.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); conf.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); conf.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); conf.put(GROUP_ID_CONFIG, "my_group"); return new DefaultKafkaConsumerFactory<>( conf, new StringDeserializer(), new JsonDeserializer<>(Person.class) ); } }
Now you can produce JSON messages to a Kafka topic using the KafkaTemplate. Letโs add the person serializer class that will be used for deserialization as a Java model objects:
@Data @AllArgsConstructor @NoArgsConstructor public class Person { private String name; private int age; }
4. Producing and Consuming Messages
To send a Person Java object, we create a PersonProducer with injected KafkaTemplate<String, Person> using its send method:
@Service @AllArgsConstructor public class PersonProducer { private KafkaTemplate<String, Person> kafkaTemplate; public void sendPerson(Person person) { kafkaTemplate.send("person-topic", person); } }
The producer is injected in the @PostContruct to send the two messages on startup once the application context has loaded:
@SpringBootApplication public class KafkaJsonSerializerApplication { @Autowired private PersonProducer personProducer; public static void main(String[] args) { SpringApplication.run(KafkaJsonSerializerApplication.class, args); } @PostConstruct public void postConstruct() { personProducer.sendPerson(new Person("Jack", 20)); personProducer.sendPerson(new Person("Tom", 25)); } }
On the consumer side, we have PersonConsumer with the @KafkaListener method to receive Person Objects:
@Log @Service public class PersonConsumer { @KafkaListener(topics = "person-topic") public void listen(Person person) { log.info(String.format( "Received person: %s, age: %d", person.getName(), person.getAge() )); } }
Following these steps, youโve successfully integrated Spring Boot with Kafka and configured JsonSerializer and JsonDeserializer for messages. Start the application, and after a while, logs should display a similar message to this:
com.tech4gods.com.PersonConsumer: Received person: Jack, age: 20 com.tech4gods.com.PersonConsumer: Received person: Tom, age: 25
If running Kafka locally, we can ssh onto the docker image to check what exists in the topic at any given time. First, we ssh onto the docker image followed by consumer comment to list messages from the beginning:
docker exec -it broker bash (winpty at the start if on windows) kafka-console-consumer --bootstrap-server=localhost:9092 --topic person-topic --from-beginning
5. Conclusion
In this article, youโve learned how to use Spring Boot Kafka JsonSerializer for efficiently handling JSON data in your Kafka-based applications. Utilizing JsonSerializer and JsonDeserializer simplifies serializing and deserializing Java objects to and from JSON, making your code more readable and maintainable.
Remember that when working with JsonSerializer in Spring Boot Kafka, you should define your data model, which will be serialized to a byte array before being sent to Kafka. This ensures the data is efficiently stored and can be appropriately deserialised later.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply