1. Introduction
Integration testing is essential to designing strong and dependable applications, especially when working with distributed systems such as Apache Kafka and frameworks like Spring Boot. This article will examine Kafka integration testing inside a Spring Boot application to ensure your components communicate properly.
To ensure that your Kafka and Spring Boot apps work properly, you must design dependable, self-contained integration tests that do not rely on an external Kafka server operating. You will be able to write extensive integration tests and confidently evaluate your applicationโs functioning by utilising integrated Kafka and the features of the Spring Kafka Test library.
2. Prerequisites and Setup
You should skip this section if you only want to look at code examples.
If you are trying to follow this tutorial using your IDE, I will assume that you already have Apache Kafka inside the docker image. You may want to read how to run Kafka if you donโt.
2.1. Generate Project Template
To create Spring Boot Kafka project, we must generate a blank Spring Boot project template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.
3. Create Producer and Consumer Application
First, we need to add the application yml configuration. In Apache Kafka, the โauto offset reset earliestโ option is a configuration setting that determines when a consumer group starts consuming messages from a partition for the first time or has no previously committed offset for a partition.
If โauto offset reset earliestโ is enabled, the consumer group will begin consuming messages at the beginning of the partition. If โauto offset reset latestโ is set, the consumer group will start consuming messages from the latest offset, which means it will only consume messages produced after the consumer group starts.
spring: kafka: consumer: auto-offset-reset: earliest group-id: integration
A consumer group ID is a unique identifier used to identify a group of consumers consuming messages from the same topic and partition(s) in Kafka. Each consumer in the group is assigned a subset of the partitions to consume from. The consumer group ID coordinates messages across the group, ensuring that only one consumer consumes each message.
Next, we need to define a simple producer class that will be responsible for sending string messages to the Kafka topic:
@Service @AllArgsConstructor public class VehicleProducer { private KafkaTemplate<String, String> kafkaTemplate; public void sendStringMessage(String vehicleMessage) { kafkaTemplate.send("test_topic", vehicleMessage); } }
A consumer in Apache Kafka is a client application that consumes data from Kafka topics. Subscribed to one or more topics, the consumer consumes messages from one or more partitions of those topics.
When a consumer consumes messages from a partition, it keeps its offset, the location of the most recently read message. The consumer can commit its offset to Kafka regularly, allowing it to resume reading from the same point if it is restarted or fails.
Here is a consumer class that will listen to Kafka test_topic:
@Slf4j @Service @Data public class VehicleConsumer { private final CountDownLatch countDownLatch = new CountDownLatch(1); @KafkaListener(topics = "test_topic", groupId = "integration") public void listen(String vehicleMessage) { log.info(String.format( "Received vehicle: %s", vehicleMessage ) ); countDownLatch.countDown(); } }
4. Kafka Integration Testing with Embedded Kafka Node
Fo the first integration test, we are using @EmbeddedKafka annotation, which provides an in-memory Kafka broker instance with an additional configuration that specifies the number of partitions, broker host, and port.
The CountDownLatch class defined in the producer awaits for countDown() method invocation that verifies successful message processing where @ExtendWith(OutputCaptureExtension.class) enables us to analyse the application output and scan for the log message once the message has been received.
Finally, run the test to verify that everything is working as expected.
@SpringBootTest @EmbeddedKafka( partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) @TestInstance(TestInstance.Lifecycle.PER_CLASS) @ExtendWith(OutputCaptureExtension.class) public class IntegrationTest { @Autowired private VehicleConsumer vehicleConsumer; @Autowired private VehicleProducer vehicleProducer; @Test public void shouldProduceAndConsumeStringMessage(CapturedOutput output) throws InterruptedException { String vehicle = "Mercedes E Class 2020"; vehicleProducer.sendStringMessage(vehicle); assertTrue(vehicleConsumer.getCountDownLatch() .await(2, TimeUnit.SECONDS) ); assertTrue(output.getOut() .contains("Received vehicle: Mercedes E Class 2020") ); } }
5. Kafka Integration Test using TestContainers
Integrated in-memory Kafka broker is a lightweight option within the same JVM as your test code. This eliminates the need to set up a separate Kafka cluster or network, allowing you to execute your tests fast.
On the other hand, Kafka Docker inside Test containers is a more robust solution for integration testing. They enable you to execute Kafka broker in a separate container which is a more realistic testing environment and allows you to test your code against a Kafka cluster similar to one in production.
To replicate the above test using TestContainers for Java, first, we need to add the below libraries; the first provides compatibility with JUnit5 Jupiter, whereas the second will enable downloading Kafka docker image and create a container for the duration of the test:
testImplementation "org.testcontainers:junit-jupiter:1.18.3" testImplementation "org.testcontainers:kafka:1.18.3"
Here is our test class; since we are using dynamic port allocation on the KafkaContainer we need to override the default settings on the consumer and producer config. However, because the port is only allocated upon the container creation, for simplicity of the example, we defined our configuration class inside the test itself:
@Testcontainers @SpringBootTest @ExtendWith(OutputCaptureExtension.class) @Import(IntegrationTestTwo.KafkaTestContainersConfig.class) public class IntegrationTestTwo { private static final DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:7.4.1"); @Container public static final KafkaContainer kafkaContainer = new KafkaContainer(imageName); @Autowired private VehicleConsumer vehicleConsumer; @Autowired private VehicleProducer vehicleProducer; @Test public void shouldProduceAndConsumeStringMessage(CapturedOutput output) throws InterruptedException { String vehicle = "Mercedes E Class 2020"; vehicleProducer.sendStringMessage(vehicle); assertTrue(vehicleConsumer.getCountDownLatch() .await(2, TimeUnit.SECONDS) ); assertTrue(output.getOut() .contains("Received vehicle: Mercedes E Class 2020") ); } @TestConfiguration static class KafkaTestContainersConfig { @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> config = new HashMap<>(); config.put( BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() ); config.put( KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ); config.put( VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ); config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(GROUP_ID_CONFIG, "integration"); return config; } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put( BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() ); config.put( KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class ); config.put( VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class ); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } }
Run the test case and wait for the results. If this is the first time running, it may take longer since TestContainers needs to download the docker image required to start the container.
Once the test completes, the container is stopped and destroyed.
6. Conclusion
Finally, two widely used ways to write integration tests on Kafka-based systems exist. The first way is to use Embedded Kafka, a lightweight solution that allows Kafka to operate within the same JVM as the test code. This approach is suitable for small applications and unit tests. The second test method uses Kafka Docker TestContainers, which gives a more realistic testing environment by running Kafka in its container. This method is better suited for bigger applications and end-to-end testing. Both methodologies have advantages and disadvantages, and the testing scenarioโs unique needs determine the chosen approach.
Daniel Barczak
Daniel Barczak is a software developer with a solid 9-year track record in the industry. Outside the office, Daniel is passionate about home automation. He dedicates his free time to tinkering with the latest smart home technologies and engaging in DIY projects that enhance and automate the functionality of living spaces, reflecting his enthusiasm and passion for smart home solutions.
Leave a Reply