Kafka Streams - Lessons Learned

Photo by Richard R on Unsplash

Kafka Streams - Lessons Learned

I recently had to work with Kafka Streams for the first time and, as usual, when encountering a new technology, made some mistakes. I want to share the learnings from working the first time with Kafka Streams with you.

Without getting too much into details, I would like to sketch my use case. I had to consume a single topic which contained a lot of different user events. Based on a single type of event I had to collect all of them belonging to the same "resource" (e.g. a webpage). Having identified those, I had to bring them into a time-based order and see if a continuous interaction happened. If the interaction reached a certain threshold (e.g. the user spent a certain time on a webpage) I had to forward a single event but no more. I tried to sketch it in the following picture:

This is the event stream for a single user. The events arrive out of order and are mixed. I am only interested in the square events. Additionally, I have to wait until at least two of the same colour arrive, that's my threshold. More than two yellow squares should not lead to more output.

Since the requirement involved some grouping, my initial idea was to use a state store, but...

StateStore is primarily local

You can use a StateStore to store data and compare it e.g. to other incoming events. The stores use a Kafka Topic (a topic with the changelog postfix) for fault tolerance and recreating the store in case of a restart (see also the good explanation here). But although this topic is replicated in the Kafka cluster, it's being filled primarily locally and stored on local disk. I started with this code:

        streamsBuilder
            .addStateStore(
                Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(STATE_STORE_NAME),
                    Serdes.String(),
                    valueSerde
                )
            )
            .stream<String, GenericRecord>(topic)
            .filter { _, value -> ... }
            .process(::MyProcessor, STATE_STORE_NAME)

But in my case it didn't work because I needed a global view of all the stored events. Therefore, a state store was the wrong choice. But, it wasn't only due to the local StateStore that this was problematic, but also due to my source. Before we talk about this, a note on load.

If you filter incorrectly, you might end up duplicating your source topic

As already mentioned, my application consumed a rather large topic and because I used the StateStore wrongly, we ended up nearly duplicating the incoming messages (the grouping that was part of the requirement did not work). The wrong grouping wasn't simply a coding error, there were integration tests for this. It was due to a logical error because I lacked Kafka knowledge and was directly connected to the next problem (message order). How could you avoid this? Well, I would say there is no general solution. Testing more thoroughly in Preprod would have at least avoided some problems in Prod.

Check your source's message order/key

This was a big beginner's mistake. When Kafka distributes messages across partitions it does so based on the key. I knew this when we started but somehow assumed that the source topic uses a key so that all relevant messages arrive in the order I need (in this case, all messages for one user arrive in order in the same node). Assuming this was a huge mistake. Our source used random keys, i.e. all messages were distributed randomly across all replicas. The providing team's goal was to distribute the load evenly, not group messages.

After finding out about the upstream topic's key, there was no way this would change. The other team wouldn't and couldn't change their partitioning. Repartitioning with Kafka Streams is not a problem. We can use a group-by for this:

        streamsBuilder
            .stream<String, GenericRecord>(topic)
            .filter { _, value -> ... }
            .groupBy({ _, value -> """${value.some}${value.thing}""" }, Grouped.`as`("group-by-something"))

Still, ordering messages is problematic. Simply repartitioning does not bring the messages in order.

Ordering out-of-order messages is hard

For my use case, I had two choices to get the messages back in order:

  1. Store the messages locally in a StateStore and push them out in order from time to time (see ReorderIntegrationTest in this PR)

  2. Use a session window to group together events based on their timestamp

Luckily, the session window perfectly fit my use case:

        streamsBuilder
            // add the TimeStampExtractor
            .stream<String, GenericRecord>(topic, Consumed.with(timestampExtractor))
            .filter { _, value -> ... }
            .groupBy({ _, value -> """${value.some}${value.thing}""" }, Grouped.`as`("group-by-something"))
            .windowedBy(
                // Define the session window according to the use-case
                SessionWindows.ofInactivityGapAndGrace(
                    Duration.ofSeconds(eventIntervalSeconds),
                    Duration.ofSeconds(gracePeriodSeconds)
                )
            )
            // aggregate all related events in some intermediate format
            .aggregate(
                // initial
                ::AggregationRoot,
                // aggregation
                { _, value, aggregationRoot->
                    AggregationRoot(
                        aggregationRoot.events + value
                    )
                },
                // merge
                { _, left, right->
                    left + right
                },
                Materialized.with(Serdes.String(), JsonSerde(...))
            )
            // only pass on finished sessions, not intermediate results
            .suppress(untilWindowCloses(unbounded()))

Session windows end when a message for a new window arrives

This was a little bit tricky to find out but could be replicated in an integration test. A session window consists of all events that are no more than a certain duration apart. When using session windows you can provide a TimestampExtractor so that Kafka Streams knows which timestamp to use for session assignment. But to know when a session ends, one event has to arrive which starts a new session. This is due to the fact that Kafka Streams doesn't use a wall clock internally but only relies on the event timestamps.

Now that I had a way to group and aggregate my events, I ran into another problem.

Message size during aggregation

When aggregating session windows and collecting all messages the session might grow considerably large. Setting max.request.size can help but considering that a session can grow as large as your typical user interaction, it is better to find a way to reduce the amount of messages somehow in either the aggregation or the merging. In my case, the merge of two sessions contained a lot of duplicates, so it helped to use a set.

Final thoughts

All the points I mentioned here were the problems that I encountered. Besides that, I want to state that after having fixed those, the Kafka Streams application runs smoothly and has a great performance. Also, the fluent stream definition is quite readable. Therefore, don't be afraid to use Kafka Streams if the use case fits.

Did you find this article valuable?

Support Code 'n' Roll - Rocking the computer by becoming a sponsor. Any amount is appreciated!