1. Introduction
Working with Apache Kafka data doesn’t have to be daunting! Kafka Streams API has powerful built-in methods to easily manipulate your data between topics. The framework acts as both a producer and consumer, making it even more accessible than ever to process data at scale with Spring Boot. This article will look at Kafka Streams Operations with Spring Boot.
2. Overview of Kafka Streams

Kafka Stream is a stateful stream processing library for building robust, elastic, and fault-tolerant streaming applications. It can keep the application state in a state store that operates on business logic from input to output topic.
By running Kafka Streams as an integrated program within your application, you can develop highly advanced processes that will take in multiple input topics and generate messages for multiple output topics.
A crucial element of Kafka Streams operations is the stream processor topology, which describes how multiple event streams interact. Kafka sends streaming data to the top source of the topology, which then channels it downwards to processor nodes for custom operations. Then, all processed information is outputted by sink nodes and stored in a new Kafka topic.
Additionally, checkpoints save stream states in state stores to increase fault tolerance and resilience.
2.1. Named State Store in Kafka Streams

State stores are essential to a Kafka Stream topology and provide persistent storage for the intermediate state from stream processing. Data from a state store can be output back to a Kafka topic, making state store data available for any downstream service.
The state store allows developers to build stateful streaming applications capable of handling data at any scale fault-tolerant and resiliently.
3. Kafka Streams Operations With Spring Boot Examples
Our guide will explore Kafka stream operations to perform functions like joining, grouping, and filtering one or more events. We will also delve into conditional branching and time-based processes based on the consumer record timestamp and combining and merging multiple Kafka streams.
3.1. Prerequisites and Setup
You may skip this section if you do not follow this tutorial thoroughly and only want to look at code examples.
If you try to follow this tutorial using your IDE, I will assume you already have a Kafka broker inside the docker image. You may want to read this to get Kafka running if you don’t.
Before you venture headfirst into this article, with its practical code examples proving why Kafka rules the roost – make sure to brush up on your knowledge of the platform!
3.2. Generate Project Template
First, we generate a simple Spring Boot application template using Spring Initializr with all the required dependencies. Click generate and import into your IDE.

3.3. Configuration for Kafka Streams with Spring Boot
Let’s start with configuration by adding Jackson dependency to the gradle file. It will be required for the regular Kafka Streams application to create objects from JSON messages.
implementation 'com.fasterxml.jackson.core:jackson-databind'
We also need to provide the KafkaStreamsConfiguration Spring bean. When setting up the example application, including the consumer group ID and partition.
The configuration file below is the minimum we need to specify to enable Kafka Streams support in the Spring Boot application. Spring Boot uses this configuration to manage the application lifecycle by creating KafkaStreams client.
@Configuration @EnableKafkaStreams public class KafkaStreamConfig { @Bean(name = DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfig() { var props = new HashMap<String, Object>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-item"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new KafkaStreamsConfiguration(props); } }
Also, we will provide an item class which we will use to deserialize a JSON String.
@Data @NoArgsConstructor @AllArgsConstructor public class Item { private Long id; private Long order; private String name; private Status status; private BigDecimal price; enum Status { PURCHASED, REFUNDED } }
3.4. Create Kafka Input Topic With Data
Firstly we need to insert some data into the Kafka input topic, so by running any command line tool, we need to get onto the broker image and create a new input topic. Then insert some data using Kafka console producer, where successful insertion can be verified using Kafka console consumer. Execute the below lines in order.
docker exec -it broker bash kafka-topics -bootstrap-server broker:9092 -create -topic order-items -partitions kafka-console-producer -broker-list broker:9092 -topic order-items {"id":1,"order":1,"name":"phone","status":"PURCHASED","price":"800.00"} {"id":2,"order":1,"name":"case","status":"PURCHASED","price":"20.00"} {"id":3,"order":2,"name":"mac book","status":"PURCHASED","price":"2400.00"} kafka-console-consumer -bootstrap-server broker:9092 -topic order-items -from-beginning
4. Kafka Streams Operations with Spring Boot
Kafka makes maintaining the application state easier than ever with its named state stores. Kafka Streams operations such as counting, aggregating, reducing and different window operations are only some functions that require this type of tracking.
With Kafka Streams, you can quickly and easily materialize information into a reliable storage system! The state store data is automatically backed up to a changelog Kafak topic for optimal fault tolerance.
By transforming a state into an identified store, your Spring Boot application can look up that store in the future. This is quite powerful as you can query data structures similar to databases inside Kafka Streams programs.
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class Order { private Long id; private int totalItems; private BigDecimal totalPrice; }
@Configuration public class KafkaItemStreamOne { @Bean public KStream<Long, Order> kStreamItem(StreamsBuilder builder) { var longSerde = Serdes.Long(); var jsonSerde = new JsonSerde<>(Item.class); var orderSerde = new JsonSerde<>(Order.class); var purchasedItemStream = builder .stream("order-items", Consumed.with(longSerde, jsonSerde)) .map((key, value) -> KeyValue.pair(value.getOrder(), value)) .groupByKey(Grouped.with(longSerde, jsonSerde)) .aggregate( () -> new Order(0L, 0, BigDecimal.ZERO), ((key, value, agg) -> new Order .OrderBuilder() .id(key) .totalItems(agg.getTotalItems() + 1) .totalPrice(agg.getTotalPrice() .add(value.getPrice())) .build()), Materialized.with(longSerde, orderSerde) ) .toStream(); purchasedItemStream.to( "purchased-order-items", Produced.with(longSerde, orderSerde)); return purchasedItemStream; } }
In the above example, our Kafka stream aggregate function applies stateful transformations on all the ordered items, calculating the total price for all the items in the same order. The successfully processed order is sent to the Kafka topic.
We verify the above code by running the application and checking the output topic for the aggregation result. The order with Id 1 should have two items and a total price of 820.00.
kafka-console-consumer -bootstrap-server broker:9092 -topic purchased-order-items -from-beginning
5. Branching Kafka Streams
The Kafka Streams branch is a routing mechanism that allows sending messages via different routes based on a predicate condition. Execute the below lines to add more records to the topic in the order provided.
kafka-console-producer -broker-list broker:9092 -topic order-items {"id":4,"order":1,"name":"tracksuit","status":"REFUNDED","price":"99.00"} {"id":5,"order":1,"name":"gaming mouse","status":"REFUNDED","price":"59.00"}
Since we are working on the same Spring application, we need to rename the Kafka streams application id config so it will start processing from the start of the topic. Also, we need the Spring @Configuration annotation to be removed from KafkaItemStreamOne.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-item-two");
We named this Kafka stream configuration KafkaItemStreamTwo extracting our transformation logic into a separate, branched method.
The branch method uses a predicate when an item has a status of a refund, in which case it will direct messages accordingly; if not, the default branch is used. For multiple conditions scenarios, additional branches can be added as needed.
@Configuration public class KafkaItemStreamTwo { @Bean public static KStream<String, Item> kStreamItem(StreamsBuilder builder) { var stringSerde = Serdes.String(); var jsonSerde = new JsonSerde<>(Item.class); var orderSerde = new JsonSerde<>(Order.class); var itemStream = builder .stream("order-items", Consumed.with(stringSerde, jsonSerde)); itemStream.split() .branch( (key, item) -> item.getStatus() == Item.Status.REFUNDED, branched(orderSerde, jsonSerde, "refunded-orders")) .defaultBranch(branched(orderSerde, jsonSerde, "paid-orders")); return itemStream; } private static Branched<String, Item> branched( JsonSerde<Order> orderSerde, JsonSerde<Item> itemSerde, String outputTopic) { return Branched.withConsumer(items -> items .map((key, item) -> KeyValue.pair(item.getOrder(), item)) .groupByKey(Grouped.with(Serdes.Long(), itemSerde)) .aggregate( () -> new Order(0L, 0, BigDecimal.ZERO), ((key, item, agg) -> new Order .OrderBuilder() .id(key) .totalItems(agg.getTotalItems() + 1) .totalPrice(agg.getTotalPrice() .add(item.getPrice())) .build()), Materialized.with(Serdes.Long(), orderSerde) ) .toStream().to(outputTopic)); } }
Restart the application; the orders should be routed to different topics based on the item refund status. This can be verified by executing the below on the Kafka broker container.
kafka-console-consumer -bootstrap-server broker:9092 -topic paid-orders -from-beginning kafka-console-consumer -bootstrap-server broker:9092 -topic refunded-orders -from-beginning
6. Combine Kafka Streams
Combining Kafka Streams is pretty simple; we can use the merge method to connect two streams.
var refundedOrderStream = builder .stream("refunded-orders", Consumed.with(Serdes.Long(), orderSerde)); var paidOrderStream = builder .stream("paid-orders", Consumed.with(Serdes.Long(), orderSerde)); refundedOrderStream.merge(paidOrderStream).to("orders-all");
A different approach is to use StreamsBuilder, which can accept a collection of topic names. While this approach is shorter, it may not always be appropriate if, for example, some transformation logic is required for only one of the topics.
var orderAllStream = builder .stream(List.of("refunded-orders", "paid-orders"), Consumed.with(Serdes.Long(), orderSerde)); orderAllStream.to("orders-all-two");
7. A Time-Based Record Processing
So far, we have been processing all available records on the topic. However, it is also possible to have time-based processing using a consumer record timestamp.
Let’s add the dependency below to the gradle file so Jackson can process Java date-time objects. Also, we have added the purchaseDate attribute to our item class and removed @Configuration on previous Kafka Streams configuration classes.
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss") private LocalDateTime purchaseDate;
Create a new topic and insert the data below. Please note you may need to adjust the dates if you are trying this locally. The intention is for the first two messages to be dated as of today, whereas the last message few days are in the past.
kafka-topics -bootstrap-server broker:9092 -create -topic order-items-two -partitions 1 kafka-console-producer -broker-list broker:9092 -topic order-items-two {"id":1,"order":1,"name":"phone","status":"PURCHASED", "price":"800.00","purchaseDate":"2022-11-05T10:00:00"} {"id":2,"order":1,"name":"case","status":"PURCHASED", "price":"20.00","purchaseDate":"2022-11-05T10:00:00"} {"id":3,"order":2,"name":"mac book","status":"PURCHASED", "price":"2400.00","purchaseDate":"2022-11-03T10:00:00"}
TimestampExtractor is an interface that allows the extraction of timestamps from a consumer record. Our created implementation returns the purchaseDate attribute on the message as a milliseconds value.
@NoArgsConstructor public class ItemPurchaseDateExtractor implements TimestampExtractor { @Override public long extract( ConsumerRecord<Object, Object> record, long partitionTime) { var item = (Item) record.value(); return item != null ? item.getPurchaseDate() .atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli() : record.timestamp(); } }
The KafkaItemStreamFour is somewhat similar to what we have created in previous examples, but we are also using the extractor class for time-sensitive processing.
The windowSerde defines the duration of when the time window is active so the messages within that window can be retrieved. We are using itemPurchaseDateExtractor to gain access to each consumer record purchaseDate attribute.
This is used to decide if a message is outside the window and, therefore, should be skipped.
@Configuration public class KafkaItemStreamFour { @Bean public KStream<Windowed<Long>, Order> kStreamItem(StreamsBuilder builder) { var longSerde = Serdes.Long(); var jsonSerde = new JsonSerde<>(Item.class); var orderSerde = new JsonSerde<>(Order.class); var itemPurchaseDateExtractor = new ItemPurchaseDateExtractor(); var windowLength = Duration.ofDays(1L); var windowSerde = WindowedSerdes .timeWindowedSerdeFrom(Long.class, windowLength.toMillis()); var consumed = Consumed.with( longSerde, jsonSerde, itemPurchaseDateExtractor, null); var purchasedItemStream = builder .stream("order-items-two", consumed) .map((key, value) -> KeyValue.pair(value.getOrder(), value)) .groupByKey(Grouped.with(longSerde, jsonSerde)) .windowedBy(TimeWindows.ofSizeWithNoGrace(windowLength)) .aggregate( () -> new Order(0L, 0, BigDecimal.ZERO), ((key, value, agg) -> new Order .OrderBuilder() .id(key) .totalItems(agg.getTotalItems() + 1) .totalPrice(agg.getTotalPrice() .add(value.getPrice())) .build()), Materialized.with(longSerde, orderSerde) ) .toStream(); purchasedItemStream.to( "daily-order-items", Produced.with(windowSerde, orderSerde)); return purchasedItemStream; } }
Run the application; the initial two messages should get aggregated into a single order while 3rd message is skipped as it’s outside the time window. This should be printed in the console with a message like; skipping record for the expired window.
Let’s verify records on the output topic by executing the script below.
kafka-console-consumer -bootstrap-server broker:9092 -topic daily-order-items -from-beginning
8. Joining Kafka Streams
Kafka Streams also support joining. KStream can be merged with other consumer record processing interfaces such as KTable, GlobalKTable, and another KStream. The inner, outer and left joins are supported.
The below image presents which processing interfaces can be joined when using a specific join type.

We are doing a left join on two Streams in our code. As part of the join, we must specify a method to join the value from both Kafka streams. In this case, the combineOrders method combines paid and refunded order items that belong to the same order id.
Since we are doing left join, joining the refunded order stream is optional, so we are handling that accordingly using the if statement.
The joinWindow specifies the maximum window duration when two record keys can be joined.
@Bean public KStream<Long, Order> kStreamOrder(StreamsBuilder builder) { var orderSerde = new JsonSerde<>(Order.class); var longSerde = Serdes.Long(); var totalOrderSerde = new JsonSerde<>(TotalOrder.class); var maxDifference = Duration.ofHours(1); var afterWindowEnd = Duration.ofMillis(0L); var joinWindow = JoinWindows .ofTimeDifferenceAndGrace(maxDifference, afterWindowEnd); var refundedOrderStream = builder .stream("refunded-orders", Consumed.with(Serdes.Long(), orderSerde)); var paidOrderStream = builder .stream("paid-orders", Consumed.with(Serdes.Long(), orderSerde)); paidOrderStream.leftJoin( refundedOrderStream, this::combineOrders, joinWindow, StreamJoined.with(longSerde, orderSerde, orderSerde)) .to("combined-orders", Produced.with(longSerde, totalOrderSerde)); return paidOrderStream; } private TotalOrder combineOrders(Order paid, Order refund) { var totalOrder = new TotalOrder(); totalOrder.setId(paid.getId()); totalOrder.setTotalCost(paid.getTotalPrice()); if (refund != null) { totalOrder.setTotalRefund(paid.getTotalPrice()); } return totalOrder; }
9. Summary
In this article, we have dived into the world of different Kafka Streams Operations with Spring Boot! Our analysis reveals a vast array of operations to process and transform topic records, from branches that redirect data like an old-school switchboard operator to merges & joins across multiple topics. What’s your stream going to do today?
Daniel Barczak is a software developer with over 9 years of professional experience. He has experience with several programming languages and technologies and has worked for businesses ranging from startups to big enterprises. Daniel in his leisure time likes to experiment with new smart home gadgets and explore the realm of home automation.
Leave a Reply