1. Introduction
With Spring Boot, sending messages to Apache Kafka is a breeze! No fussing over extra dependencies – the mighty Apache Kafka core comes with all you’ll need. Then let the powerful KafkaTemplate take care of anything else; configure it and wait for your data-packed message payloads to arrive in topics across the land. In this post, we will create Spring Boot Kafka Producer, so read on if you’re looking for an easy way out!
2. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Apache Kafka inside the docker image. You may want to read this to get Kafka running if you don’t.
2.1. Generate Project Template
To create Spring Boot Kafka Producer, we must generate a blank Spring Boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.

2.2. Add Project Configuration
Let’s start with adding Jackson’s dependency to the gradle.build file. It will be required if we want to use ObjectMapper class.
implementation 'com.fasterxml.jackson.core:jackson-databind'
Next is the JsonConfig class that will register the ObjectMapper bean into the application context to be used anywhere in our application. The KafkaConfig contains Kafka properties, but we will extend it as we add more functionality.
@Configuration
public class JsonConfig {
@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
return objectMapper;
}
}
@Configuration
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
public KafkaConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
}
Also, a user class that we are going to use for JSON String conversion.
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private String id;
private String name;
private String email;
}
3. Spring Boot Kafka Producer
We create our Spring Boot Kafka Producer class with a @Service annotation. It represents a singleton class by default in Spring Boot. This class will contain logic for sending messages to the Apache Kafka topic using the KafkaTemplate and the ObjectMapper to create JSON strings from Java classes.
@Service
public class UserProducer {
private final KafkaTemplate<String, String> template;
private final ObjectMapper objectMapper;
public UserProducer(KafkaTemplate<String, String> template, ObjectMapper mapper){
this.template = template;
this.objectMapper = objectMapper;
}
}
3.1. String Message
Spring Boot allows the creation of Kafka topics programmatically; while it’s not recommended in production for this example, we will do it.
So let’s define a new bean topic called users. The below code example should create a new Kafka topic on application startup.
@Bean
public NewTopic usersTopic() {
return TopicBuilder
.name("users")
.build();
}
Add this method to our UserProducer class. It sends the string message using KafkaTemplate.
public void sendString(String userMessage) {
template.send("users", userMessage);
}
Let’s Invoke the sendString producer method somewhere in the application. I will do it via @PostConstruct annotation, which executes after the dependency injection. Since we are not implementing a command line interface, the application will terminate after the method finishes running.
@SpringBootApplication
public class KafkaProducerApplication {
@Autowired
private UserProducer producer;
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@PostConstruct
public void postConstruct() {
producer.sendString("1,Tom,tom123@gmail.com");
producer.sendString("2,Nick,nick1234@gmail.com");
}
}
Since we are not creating a Kafka consumer, we can verify the inserted data by getting onto the broker container [1] and executing [2] to view all the messages on the topic.
[1] docker exec -it broker bash - (ADD winpty TO THE START IF ON WINDOWS)
[2] kafka-console-consumer -bootstrap-server broker:9092 -topic users -from-beginning
3.2. JSON Message Kafka Producer
Similarly, we add another topic containing the JSON representation of a user object to KafkaConfig. The sendJson is added to UserProducer class.
@Bean
public NewTopic jsonUsersTopic() {
return TopicBuilder
.name("json-users")
.build();
}
public void sendJson(User userMessage) throws JsonProcessingException {
var userJson = objectMapper.writeValueAsString(userMessage);
kafkaTemplate.send("json-users", userJson);
}
Next, we must change our postConstruct method to invoke the new method. Let’s run the application and verify that the data has been created on the topic [1].
@PostConstruct
public void postConstruct() throws JsonProcessingException {
producer.sendJson(new User("1","Tom", "tom123@gmail.com"));
producer.sendJson(new User("2","Nick", "nick1234@gmail.com"));
}
[1] kafka-console-consumer -bootstrap-server broker:9092 -topic json-users -from-beginning
3.3. Message With Key
Apache Kafka allows message storage with a key. An identical key for multiple messages guarantees storage on the same partition. The insertion order is preserved regarding the partition only, and send messages will likely not be in order when placed on different partitions.
Imagine placing an order with multiple items where each item is represented using a different message. We could use an order number for the key to calculate the total invoice for everything that exists for that order.
Let’s create another topic to hold the message with a key, followed by the insertion method.
@Bean
public NewTopic jsonUsersWithKeyTopic() {
return TopicBuilder
.name("json-users-key")
.build();
}
public void sendJsonWithKey(User userMessage) throws JsonProcessingException {
var userJson = objectMapper.writeValueAsString(userMessage);
kafkaTemplate.send("json-users-key", userMessage.getId(), userJson);
}
Again, we update our method and running the application should insert the data into the topic. We can verify if the users exist in the topic with the appropriate key assigned [1].
@PostConstruct
public void postConstruct() throws JsonProcessingException {
producer.sendJsonWithKey(new User("1","Tom", "tom123@gmail.com"));
producer.sendJsonWithKey(new User("2","Nick", "nick1234@gmail.com"));
}
[1] kafka-console-consumer -bootstrap-server broker:9092 -topic json-users-key -property print.key=true -property key.separator=":" -from-beginning
3.4. Custom Kafka Producer Configuration
When the default producer configuration is insufficient, we can create a producer factory bean injecting extra producer configuration. In the example below, we have enabled idempotence, but numerous attributes are available depending on the requirements.
@Bean
public ProducerFactory<String, String> producerFactory() {
var properties = kafkaProperties.buildProducerProperties();
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
The new producer factory must be passed to the KafkaTemplate definition to include the custom configuration. This bean will override the default one created by the Spring Boot application.
4. Summary
Who knew transforming Java classes into JSON could be so easy? In this practical post, we tested it, creating a Spring Boot Kafka Producer and seeing impressive results! And as if that wasn’t enough, we learned how to use the Jackson library for JSON object conversion while specifying message keys on an Apache Kafka topic.
Daniel Barczak is a software developer with over 9 years of professional experience. He has experience with several programming languages and technologies and has worked for businesses ranging from startups to big enterprises. Daniel in his leisure time likes to experiment with new smart home gadgets and explore the realm of home automation.
Leave a Reply