1 - Introduction

Kafka Streams API

The easiest way to write mission-critical real-time applications and microservices with all the benefits of Kafka’s server-side cluster technology.

Write your first app Play with demo app

  • Write standard Java applications
  • Exactly-once processing semantics
  • No seperate processing cluster required
  • Develop on Mac, Linux, Windows
  • Elastic, highly scalable, fault-tolerant
  • Deploy to containers, VMs, bare metal, cloud
  • Equally viable for small, medium, & large use cases
  • Fully integrated with Kafka security

Developer manual Tutorials Concepts

Hello Kafka Streams

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

Java 8+ Java 7 Scala

                import org.apache.kafka.common.serialization.Serdes;
                import org.apache.kafka.streams.KafkaStreams;
                import org.apache.kafka.streams.StreamsConfig;
                import org.apache.kafka.streams.kstream.KStream;
                import org.apache.kafka.streams.kstream.KStreamBuilder;
                import org.apache.kafka.streams.kstream.KTable;

                import java.util.Arrays;
                import java.util.Properties;

                public class WordCountApplication {

                    public static void main(final String[] args) throws Exception {
                        Properties config = new Properties();
                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                        KStreamBuilder builder = new KStreamBuilder();
                        KStream<String, String> textLines = builder.stream("TextLinesTopic");
                        KTable<String, Long> wordCounts = textLines
                            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                            .groupBy((key, word) -> word)
                            .count("Counts");
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

                        KafkaStreams streams = new KafkaStreams(builder, config);
                        streams.start();
                    }

                }
            


                import org.apache.kafka.common.serialization.Serdes;
                import org.apache.kafka.streams.KafkaStreams;
                import org.apache.kafka.streams.StreamsConfig;
                import org.apache.kafka.streams.kstream.KStream;
                import org.apache.kafka.streams.kstream.KStreamBuilder;
                import org.apache.kafka.streams.kstream.KTable;
                import org.apache.kafka.streams.kstream.KeyValueMapper;
                import org.apache.kafka.streams.kstream.ValueMapper;

                import java.util.Arrays;
                import java.util.Properties;

                public class WordCountApplication {

                    public static void main(final String[] args) throws Exception {
                        Properties config = new Properties();
                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                        KStreamBuilder builder = new KStreamBuilder();
                        KStream<String, String> textLines = builder.stream("TextLinesTopic");
                        KTable<String, Long> wordCounts = textLines
                            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                @Override
                                public Iterable<String> apply(String textLine) {
                                    return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                }
                            })
                            .groupBy(new KeyValueMapper<String, String, String>() {
                                @Override
                                public String apply(String key, String word) {
                                    return word;
                                }
                            })
                            .count("Counts");
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

                        KafkaStreams streams = new KafkaStreams(builder, config);
                        streams.start();
                    }

                }
            


                import java.lang.Long
                import java.util.Properties
                import java.util.concurrent.TimeUnit

                import org.apache.kafka.common.serialization._
                import org.apache.kafka.streams._
                import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}

                import scala.collection.JavaConverters.asJavaIterableConverter

                object WordCountApplication {

                    def main(args: Array[String]) {
                        val config: Properties = {
                            val p = new Properties()
                            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
                            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
                            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                            p
                        }

                        val builder: KStreamBuilder = new KStreamBuilder()
                        val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                        val wordCounts: KTable[String, Long] = textLines
                            .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
                            .groupBy((_, word) => word)
                            .count("Counts")
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")

                        val streams: KafkaStreams = new KafkaStreams(builder, config)
                        streams.start()

                        Runtime.getRuntime.addShutdownHook(new Thread(() => {
                            streams.close(10, TimeUnit.SECONDS)
                        }))
                    }

                }

See how Kafka Streams is being used

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka and Kafka Streams. Learn More

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing event streams enables our technical team to do near-real time business intelligence. Learn More

Previous Next

2 - Quick Start

Play with a Streams Application

This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. However, if you have already started Kafka and Zookeeper, feel free to skip the first two steps.

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.

This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the [WordCountDemo](https://github.com/apache/kafka/blob/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java) example code (converted to use Java 8 lambda expressions for easy reading).

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count("Counts")

// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data.

As the first step, we will start Kafka (unless you already have it started) and then we will prepare input data to a Kafka topic, which will subsequently be processed by a Kafka Streams application.

Step 1: Download the code

Download the 0.11.0.2 release and un-tar it. Note that there are multiple downloadable Scala versions and we choose to use the recommended version (2.11) here:

> tar -xzf kafka_2.11-0.11.0.2.tgz
> cd kafka_2.11-0.11.0.2

Step 2: Start the Kafka server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

Now start the Kafka server:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: Prepare input topic and start Kafka producer

Next, we create the input topic named streams-plaintext-input and the output topic named streams-wordcount-output :

> bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".

> bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output
Created topic "streams-wordcount-output".

The created topic can be described with the same kafka-topics tool:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe

Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:
    Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

Step 4: Start the Wordcount Application

The following command starts the WordCount demo application:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

The demo application will read from the input topic streams-plaintext-input , perform the computations of the WordCount algorithm on each of the read messages, and continuously write its current results to the output topic streams-wordcount-output. Hence there won’t be any STDOUT output except log entries as the results are written back into in Kafka.

Now we can start the console producer in a separate terminal to write some input data to this topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Step 5: Process some data

Now let’s write some message with the console producer into the input topic streams-plaintext-input by entering a single line of text and then hit . This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

This message will be processed by the Wordcount application and the following output data will be written to the streams-wordcount-output topic and printed by the console consumer:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1

Here, the first column is the Kafka message key in java.lang.String format and represents a word that is being counted, and the second column is the message value in java.lang.Longformat, representing the word’s latest count.

Now let’s continue writing one more message with the console producer into the input topic streams-plaintext-input. Enter the text line “hello kafka streams” and hit . Your terminal should look as follows:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams

In your other terminal in which the console consumer is running, you will observe that the WordCount application wrote new output data:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1
hello	1
kafka	2
streams	2

Here the last printed lines kafka 2 and streams 2 indicate updates to the keys kafka and streams whose counts have been incremented from 1 to 2. Whenever you write further input messages to the input topic, you will observe new messages being added to the streams-wordcount-output topic, representing the most recent word counts as computed by the WordCount application. Let’s enter one final input text line “join kafka summit” and hit in the console producer to the input topic streams-wordcount-input before we wrap up this quickstart:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
all streams lead to kafka
hello kafka streams
join kafka summit

The streams-wordcount-output topic will subsequently show the corresponding updated word counts (see last three lines):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1
hello	1
kafka	2
streams	2
join	1
kafka	3
summit	1

As one can see, outputs of the Wordcount application is actually a continuous stream of updates, where each output record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as “kafka”. For multiple records with the same key, each later record is an update of the previous one.

The two diagrams below illustrate what is essentially happening behind the scenes. The first column shows the evolution of the current state of the KTable<String, Long> that is counting word occurrences for count. The second column shows the change records that result from state updates to the KTable and that are being sent to the output Kafka topic streams-wordcount-output.

First the text line “all streams lead to kafka” is being processed. The KTable is being built up as each new word results in a new table entry (highlighted with a green background), and a corresponding change record is sent to the downstream KStream.

When the second text line “hello kafka streams” is processed, we observe, for the first time, that existing entries in the KTable are being updated (here: for the words “kafka” and for “streams”). And again, change records are being sent to the output topic.

And so on (we skip the illustration of how the third line is being processed). This explains why the output topic has the contents we showed above, because it contains the full record of changes.

Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.

Step 6: Teardown the application

You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the Zookeeper server in order via Ctrl-C.

Previous Next

3 - Write a streams app

Write your own Streams Applications

In this guide we will start from scratch on setting up your own project to write a stream processing application using Kafka’s Streams API. It is highly recommended to read the quickstart first on how to run a Streams application written in Kafka Streams if you have not done so.

Setting up a Maven Project

We are going to use a Kafka Streams Maven Archetype for creating a Streams project structure with the following commands:

        mvn archetype:generate \
            -DarchetypeGroupId=org.apache.kafka \
            -DarchetypeArtifactId=streams-quickstart-java \
            -DarchetypeVersion=0.11.0.2 \
            -DgroupId=streams.examples \
            -DartifactId=streams.examples \
            -Dversion=0.1 \
            -Dpackage=myapps

You can use a different value for groupId, artifactId and package parameters if you like. Assuming the above parameter values are used, this command will create a project structure that looks like this:

        > tree streams.examples
        streams-quickstart
        |-- pom.xml
        |-- src
            |-- main
                |-- java
                |   |-- myapps
                |       |-- LineSplit.java
                |       |-- Pipe.java
                |       |-- WordCount.java
                |-- resources
                    |-- log4j.properties

There are already several example programs written with Streams library under src/main/java. Since we are going to start writing such programs from scratch, we can now delete these examples:

        > cd streams-quickstart
        > rm src/main/java/myapps/*.java

Writing a first Streams application: Pipe

It’s coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under src/main/java. Let’s name it Pipe.java:

        package myapps;

        public class Pipe {

            public static void main(String[] args) throws Exception {

            }
        }

We are going to fill in the main function to write this pipe program. Note that we will not list the import statements as we go since IDEs can usually add them automatically. However if you are using a text editor you need to manually add the imports, and at the end of this section we’ll show the complete code snippet with import statement for you.

The first step to write a Streams application is to create a java.util.Properties map to specify different Streams execution configuration values as defined in StreamsConfig. A couple of important configuration values you need to set are: StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, which specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster, and StreamsConfig.APPLICATION_ID_CONFIG, which gives the unique identifier of your Streams application to distinguish itself with other applications talking to the same Kafka cluster:

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092

In addition, you can customize other configurations in the same map, for example, default serialization and deserialization libraries for the record key-value pairs:

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

For a full list of configurations of Kafka Streams please refer to this table.

Next we will define the computational logic of our Streams application. In Kafka Streams this computational logic is defined as a topology of connected processor nodes. We can use a topology builder to construct such a topology,

        final KStreamBuilder builder = new KStreamBuilder();

And then create a source stream from a Kafka topic named streams-plaintext-input using this topology builder:

        KStream<String, String> source = builder.stream("streams-plaintext-input");

Now we get a KStream that is continuously generating records from its source Kafka topic streams-plaintext-input. The records are organized as String typed key-value pairs. The simplest thing we can do with this stream is to write it into another Kafka topic, say it’s named streams-pipe-output:

        source.to("streams-pipe-output");

Note that we can also concatenate the above two lines into a single line as:

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology builder object (one can also construct a StreamsConfig object from the props map and then pass that object to the constructor, KafkaStreams have overloaded constructor functions to takes either type).

        final KafkaStreams streams = new KafkaStreams(builder, props);

By calling its start() function we can trigger the execution of this client. The execution won’t stop until close() is called on this client. We can, for example, add a shutdown hook with a countdown latch to capture a user interrupt and close the client upon terminating this program:

        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

The complete code so far looks like this:

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStreamBuilder;

        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class Pipe {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final KStreamBuilder builder = new KStreamBuilder();

                builder.stream("streams-plaintext-input").to("streams-pipe-output");

                final KafkaStreams streams = new KafkaStreams(builder, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // attach shutdown handler to catch control-c
                Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                    @Override
                    public void run() {
                        streams.close();
                        latch.countDown();
                    }
                });

                try {
                    streams.start();
                    latch.await();
                } catch (Throwable e) {
                    System.exit(1);
                }
                System.exit(0);
            }
        }

If you already have the Kafka broker up and running at localhost:9092, and the topics streams-plaintext-input and streams-pipe-output created on that broker, you can run this code in your IDE or on the command line, using Maven:

        > mvn clean package
        > mvn exec:java -Dexec.mainClass=myapps.Pipe

For detailed instructions on how to run a Streams application and observe its computing results, please read the Play with a Streams Application section. We will not talk about this in the rest of this section.

Writing a second Streams application: Line Split

We have learned how to construct a Streams client with its two key components: the StreamsConfig and TopologyBuilder. Now let’s move on to add some real processing logic by augmenting the current topology. We can first create another program by first copy the existing Pipe.java class:

        > cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java

And change its class name as well as the application id config to distinguish with the original program:

        public class Pipe {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
                // ...
            }
        }

Since each of the source stream’s record is a String typed key-value pair, let’s treat the value string as a text line and split it into words with a FlatMapValues operator:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        KStream<String, String> words = builder.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split("\W+"));
                    }
                });

The operator will take the source stream as its input, and generate a new stream named words by processing each record from its source stream in order and breaking its value string into a list of words, and producing each word as a new record to the output words stream. This is a stateless operator that does not need to keep track of any previously received records or processed results. Note if you are using JDK 8 you can use lambda expression and simplify the above code as:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\W+")));

And finally we can write the word stream back into another Kafka topic, say streams-linesplit-output. Again, these two steps can be concatenated as the following (assuming lambda expression is used):

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\W+")))
              .to("streams-linesplit-output");

The complete code looks like this (assuming lambda expression is used):

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KStreamBuilder;
        import org.apache.kafka.streams.kstream.ValueMapper;

        import java.util.Arrays;
        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class LineSplit {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final KStreamBuilder builder = new KStreamBuilder();

                KStream<String, String> source = builder.stream("streams-plaintext-input");
                source.flatMapValues(value -> Arrays.asList(value.split("\W+")))
                      .to("streams-linesplit-output");

                final KafkaStreams streams = new KafkaStreams(builder, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // ... same as Pipe.java below
            }
        }

Writing a third Streams application: Wordcount

Let’s now take a step further to add some “stateful” computations to the topology by counting the occurrence of the words split from the source text stream. Following similar steps let’s create another program based on the LineSplit.java class:

        public class WordCount {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                // ...
            }
        }

In order to count the words we can first modify the flatMapValues operator to treat all of them as lower case (assuming lambda expression is used):

        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+"));
                    }
                });

In order to do the counting aggregation we have to first specify that we want to key the stream on the value string, i.e. the lower cased word, with a groupBy operator. This operator generate a new grouped stream, which can then be aggregated by a count operator, which generates a running count on each of the grouped keys:

        KTable<String, Long> counts =
        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+"));
                    }
                })
              .groupBy(new KeyValueMapper<String, String, String>() {
                   @Override
                   public String apply(String key, String value) {
                       return value;
                   }
                })
              .count("Counts");

Note that the count operator has a String typed parameter Counts, which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic. This Counts store can be queried in real-time, with details described in the Developer Manual.

We can also write the counts KTable’s changelog stream back into another Kafka topic, say streams-wordcount-output. Note that this time the value type is no longer String but Long, so the default serialization classes are not viable for writing it to Kafka anymore. We need to provide overridden serialization methods for Long types, otherwise a runtime exception will be thrown:

        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

Note that in order to read the changelog stream from topic streams-wordcount-output, one needs to set the value deserialization as org.apache.kafka.common.serialization.LongDeserializer. Details of this can be found in the Play with a Streams Application section. Assuming lambda expression from JDK 8 can be used, the above code can be simplified as:

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
              .groupBy((key, value) -> value)
              .count("Counts")
              .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

The complete code looks like this (assuming lambda expression is used):

        package myapps;

        import org.apache.kafka.common.serialization.Serdes;
        import org.apache.kafka.streams.KafkaStreams;
        import org.apache.kafka.streams.StreamsConfig;
        import org.apache.kafka.streams.kstream.KeyValueMapper;
        import org.apache.kafka.streams.kstream.KStreamBuilder;
        import org.apache.kafka.streams.kstream.ValueMapper;

        import java.util.Arrays;
        import java.util.Locale;
        import java.util.Properties;
        import java.util.concurrent.CountDownLatch;

        public class WordCount {

            public static void main(String[] args) throws Exception {
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                final KStreamBuilder builder = new KStreamBuilder();

                KStream<String, String> source = builder.stream("streams-plaintext-input");
                source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
                      .groupBy((key, value) -> value)
                      .count("Counts")
                      .to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

                final KafkaStreams streams = new KafkaStreams(builder, props);
                final CountDownLatch latch = new CountDownLatch(1);

                // ... same as Pipe.java below
            }
        }

Previous Next

4 - Core Concepts

Core Concepts

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state.

Kafka Streams has a low barrier to entry : You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka’s parallelism model.

Some highlights of Kafka Streams:

  • Designed as a simple and lightweight client library , which can be easily embedded in any Java application and integrated with any existing packaging, deployment and operational tools that users have for their streaming applications.
  • Has no external dependencies on systems other than Apache Kafka itself as the internal messaging layer; notably, it uses Kafka’s partitioning model to horizontally scale processing while maintaining strong ordering guarantees.
  • Supports fault-tolerant local state , which enables very fast and efficient stateful operations like windowed joins and aggregations.
  • Supports exactly-once processing semantics to guarantee that each record will be processed once and only once even when there is a failure on either Streams clients or Kafka brokers in the middle of processing.
  • Employs one-record-at-a-time processing to achieve millisecond processing latency, and supports event-time based windowing operations with late arrival of records.
  • Offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API.

We first summarize the key concepts of Kafka Streams.

Stream Processing Topology

  • A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies , where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
  • A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.

There are two special processors in the topology:

  • Source Processor : A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.
  • Sink Processor : A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.

Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides the most common data transformation operations such as map, filter, join and aggregations out of the box; the lower-level Processor API allows developers define and connect custom processors as well as to interact with state stores.

A processor topology is merely a logical abstraction for your stream processing code. At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see Stream Partitions and Tasks for details).

Time

A critical aspect in stream processing is the notion of time , and how it is modeled and integrated. For example, some operations such as windowing are defined based on time boundaries.

Common notions of time in streams are:

  • Event time - The point in time when an event or data record occurred, i.e. was originally created “at the source”. Example: If the event is a geo-location change reported by a GPS sensor in a car, then the associated event-time would be the time when the GPS sensor captured the location change.
  • Processing time - The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing time may be milliseconds, hours, or days etc. later than the original event time. Example: Imagine an analytics application that reads and processes the geo-location data reported from car sensors to present it to a fleet management dashboard. Here, processing-time in the analytics application might be milliseconds or seconds (e.g. for real-time pipelines based on Apache Kafka and Kafka Streams) or hours (e.g. for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.
  • Ingestion time - The point in time when an event or data record is stored in a topic partition by a Kafka broker. The difference to event time is that this ingestion timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created “at the source”. The difference to processing time is that processing time is when the stream processing application processes the record. For example, if a record is never processed, there is no notion of processing time for it, but it still has an ingestion time.

The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka’s configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps.

Kafka Streams assigns a timestamp to every data record via the TimestampExtractor interface. Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to stream processing applications. Developers can thus enforce different notions of time depending on their business needs. For example, per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by time-dependent operations such as joins.

Finally, whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context:

  • When new output records are generated via processing some input record, for example, context.forward() triggered in the process() function call, output record timestamps are inherited from input record timestamps directly.
  • When new output records are generated via periodic functions such as punctuate(), the output record timestamp is defined as the current internal time (obtained through context.timestamp()) of the stream task.
  • For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.

States

Some stream processing applications don’t require state, which means the processing of a message is independent from the processing of all other messages. However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL.

Kafka Streams provides so-called state stores , which can be used by stream processing applications to store and query data. This is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.

Processing Guarantees

In stream processing, one of the most frequently asked question is “does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?” Failing to guarantee exactly-once stream processing is a deal-breaker for many applications that cannot tolerate any data-loss or data duplicates, and in that case a batch-oriented framework is usually used in addition to the stream processing pipeline, known as the Lambda Architecture. Prior to 0.11.0.0, Kafka only provides at-least-once delivery guarantees and hence any stream processing systems that leverage it as the backend storage could not guarantee end-to-end exactly-once semantics. In fact, even for those stream processing systems that claim to support exactly-once processing, as long as they are reading from / writing to Kafka as the source / sink, their applications cannot actually guarantee that no duplicates will be generated throughout the pipeline. Since the 0.11.0.0 release, Kafka has added support to allow its producers to send messages to different topic partitions in a transactional and idempotent manner, and Kafka Streams has hence added the end-to-end exactly-once processing semantics by leveraging these features. More specifically, it guarantees that for any record read from the source Kafka topics, its processing results will be reflected exactly once in the output Kafka topic as well as in the state stores for stateful operations. Note the key difference between Kafka Streams end-to-end exactly-once guarantee with other stream processing frameworks’ claimed guarantees is that Kafka Streams tightly integrates with the underlying Kafka storage system and ensure that commits on the input topic offsets, updates on the state stores, and writes to the output topics will be completed atomically instead of treating Kafka as an external system that may have side-effects. To read more details on how this is done inside Kafka Streams, readers are recommended to read KIP-129. In order to achieve exactly-once semantics when running Kafka Streams applications, users can simply set the processing.guarantee config value to exactly_once (default value is at_least_once). More details can be found in the Kafka Streams Configs section.

Previous Next

5 - Architecture

Architecture

Kafka Streams simplifies application development by building on the Kafka producer and consumer libraries and leveraging the native capabilities of Kafka to offer data parallelism, distributed coordination, fault tolerance, and operational simplicity. In this section, we describe how Kafka Streams works underneath the covers.

The picture below shows the anatomy of an application that uses the Kafka Streams library. Let’s walk through some details.

Stream Partitions and Tasks

The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it. In both cases, this partitioning is what enables data locality, elasticity, scalability, high performance, and fault tolerance. Kafka Streams uses the concepts of partitions and tasks as logical units of its parallelism model based on Kafka topic partitions. There are close links between Kafka Streams and Kafka in the context of parallelism:

  • Each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition.
  • A data record in the stream maps to a Kafka message from that topic.
  • The keys of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.

An application’s processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic’s data. If you run a larger number of app instances than partitions of the input topic, the “excess” app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former’s work.

It is important to understand that Kafka Streams is not a resource manager, but a library that “runs” anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be automatically restarted on other instances and continue to consume from the same stream partitions.

The following diagram shows two tasks each assigned with one partition of the input streams.

Threading Model

Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. Each thread can execute one or more tasks with their processor topologies independently. For example, the following diagram shows one stream thread running two stream tasks.

Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing. It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads. The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging Kafka’s coordination functionality.

As we described above, scaling your stream processing application with Kafka Streams is easy: you merely need to start additional instances of your application, and Kafka Streams takes care of distributing partitions amongst tasks that run in the application instances. You can start as many threads of the application as there are input Kafka topic partitions so that, across all running instances of an application, every thread (or rather, the tasks it runs) has at least one input partition to process.

Local State Stores

Kafka Streams provides so-called state stores , which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations. The Kafka Streams DSL, for example, automatically creates and manages such state stores when you are calling stateful operators such as join() or aggregate(), or when you are windowing a stream.

Every stream task in a Kafka Streams application may embed one or more local state stores that can be accessed via APIs to store and query data required for processing. Kafka Streams offers fault-tolerance and automatic recovery for such local state stores.

The following diagram shows two stream tasks with their dedicated local state stores.

Fault Tolerance

Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available even if the application fails and needs to re-process it. Tasks in Kafka Streams leverage the fault-tolerance capability offered by the Kafka consumer client to handle failures. If a task runs on a machine that fails, Kafka Streams automatically restarts the task in one of the remaining running instances of the application.

In addition, Kafka Streams makes sure that the local state stores are robust to failures, too. For each state store, it maintains a replicated changelog Kafka topic in which it tracks any state updates. These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition. Log compaction is enabled on the changelog topics so that old data can be purged safely to prevent the topics from growing indefinitely. If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user.

Note that the cost of task (re)initialization typically depends primarily on the time for restoring the state by replaying the state stores’ associated changelog topics. To minimize this restoration time, users can configure their applications to have standby replicas of local states (i.e. fully replicated copies of the state). When a task migration happens, Kafka Streams then attempts to assign a task to an application instance where such a standby replica already exists in order to minimize the task (re)initialization cost. See num.standby.replicas in the Kafka Streams Configs section.

Previous Next

6 - Upgrade Guide

Upgrade Guide & API Changes

If you want to upgrade from 0.10.2.x to 0.11.0 you don’t need to do any code changes as the public API is fully backward compatible. However, some configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. See below a complete list of 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.

If you want to upgrade from 0.10.1.x to 0.11.0, also see the Upgrade Section for 0.10.2. It highlights incompatible changes you need to consider to upgrade your code and application. See below a complete list of 0.10.2 and 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.

Upgrading from 0.10.0.x to 0.11.0.x directly is also possible. Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher. See Streams API changes in 0.10.1, Streams API changes in 0.10.2, and Streams API changes in 0.11.0 for a complete list of API changes. Upgrading to 0.11.0.3 requires two rolling bounces with config upgrade.from="0.10.0" set for first upgrade phase (cf. KIP-268). As an alternative, an offline upgrade is also possible.

  • prepare your application instances for a rolling bounce and make sure that config upgrade.from is set to "0.10.0" for new version 0.11.0.3
  • bounce each instance of your application once
  • prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config upgrade.mode
  • bounce each instance of your application once more to complete the upgrade

Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)

  • stop all old (0.10.0.x) application instances
  • update your code and swap old code and jar file with new code and new jar file
  • restart all new (0.11.0.0, 0.11.0.1, or 0.11.0.2) application instances

Streams API changes in 0.11.0.0

Updates in StreamsConfig:

  • new configuration parameter processing.guarantee is added
  • configuration parameter key.serde was deprecated and replaced by default.key.serde
  • configuration parameter value.serde was deprecated and replaced by default.value.serde
  • configuration parameter timestamp.extractor was deprecated and replaced by default.timestamp.extractor
  • method #keySerde() was deprecated and replaced by #defaultKeySerde()
  • method #valueSerde() was deprecated and replaced by #defaultValueSerde()
  • new method #defaultTimestampExtractor() was added

New methods in TopologyBuilder:

  • added overloads for #addSource() that allow to define a TimestampExtractor per source node
  • added overloads for #addGlobalStore() that allow to define a TimestampExtractor per source node associated with the global store

New methods in KStreamBuilder:

  • added overloads for #stream() that allow to define a TimestampExtractor per input stream
  • added overloads for #table() that allow to define a TimestampExtractor per input table
  • added overloads for #globalKTable() that allow to define a TimestampExtractor per global table

Deprecated methods in KTable:

  • void foreach(final ForeachAction<? super K, ? super V> action)
  • void print()
  • void print(final String streamName)
  • void print(final Serde<K> keySerde, final Serde<V> valSerde)
  • void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
  • void writeAsText(final String filePath)
  • void writeAsText(final String filePath, final String streamName)
  • void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
  • void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)

The above methods have been deprecated in favor of using the Interactive Queries API. If you want to query the current content of the state store backing the KTable, use the following approach:

  • Make a call to KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)
  • Then make a call to ReadOnlyKeyValueStore.all() to iterate over the keys of a KTable.

If you want to view the changelog stream of the KTable then you could call KTable.toStream().print().

Metrics using exactly-once semantics:

If exactly-once processing is enabled via the processing.guarantees parameter, internally Streams switches from a producer per thread to a producer per task runtime model. In order to distinguish the different producers, the producer’s client.id additionally encodes the task-ID for this case. Because the producer’s client.id is used to report JMX metrics, it might be required to update tools that receive those metrics.

Producer’s client.id naming schema:

  • at-least-once (default): [client.Id]-StreamThread-[sequence-number]
  • exactly-once: [client.Id]-StreamThread-[sequence-number]-[taskId]

[client.Id] is either set via Streams configuration parameter client.id or defaults to [application.id]-[processId] ([processId] is a random UUID).

Notable changes in 0.10.2.1

Parameter updates in StreamsConfig:

  • The default config values of embedded producer’s retries and consumer’s max.poll.interval.ms have been changed to improve the resiliency of a Kafka Streams application

Streams API changes in 0.10.2.0

New methods in KafkaStreams:

  • set a listener to react on application state change via #setStateListener(StateListener listener)
  • retrieve the current application state via #state()
  • retrieve the global metrics registry via #metrics()
  • apply a timeout when closing an application via #close(long timeout, TimeUnit timeUnit)
  • specify a custom indent when retrieving Kafka Streams information via #toString(String indent)

Parameter updates in StreamsConfig:

  • parameter zookeeper.connect was deprecated; a Kafka Streams application does no longer interact with Zookeeper for topic management but uses the new broker admin protocol (cf. KIP-4, Section “Topic Admin Schema”)
  • added many new parameters for metrics, security, and client configurations

Changes in StreamsMetrics interface:

  • removed methods: #addLatencySensor()
  • added methods: #addLatencyAndThroughputSensor(), #addThroughputSensor(), #recordThroughput(), #addSensor(), #removeSensor()

New methods in TopologyBuilder:

  • added overloads for #addSource() that allow to define a auto.offset.reset policy per source node
  • added methods #addGlobalStore() to add global StateStores

New methods in KStreamBuilder:

  • added overloads for #stream() and #table() that allow to define a auto.offset.reset policy per input stream/table
  • added method #globalKTable() to create a GlobalKTable

New joins for KStream:

  • added overloads for #join() to join with KTable
  • added overloads for #join() and leftJoin() to join with GlobalKTable
  • note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x (cf. Kafka Streams Join Semantics in the Apache Kafka wiki)

Aligned null-key handling for KTable joins:

  • like all other KTable operations, KTable-KTable joins do not throw an exception on null key records anymore, but drop those records silently

New window type Session Windows :

  • added class SessionWindows to specify session windows
  • added overloads for KGroupedStream methods #count(), #reduce(), and #aggregate() to allow session window aggregations

Changes to TimestampExtractor:

  • method #extract() has a second parameter now
  • new default timestamp extractor class FailOnInvalidTimestamp (it gives the same behavior as old (and removed) default extractor ConsumerRecordTimestampExtractor)
  • new alternative timestamp extractor classes LogAndSkipOnInvalidTimestamp and UsePreviousTimeOnInvalidTimestamps

Relaxed type constraints of many DSL interfaces, classes, and methods (cf. KIP-100).

Streams API changes in 0.10.1.0

Stream grouping and aggregation split into two methods:

  • old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey()
  • new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count()
  • Example: stream.countByKey() changes to stream.groupByKey().count()

Auto Repartitioning:

  • a call to through() after a key-changing operator and before an aggregation/join is no longer required
  • Example: stream.selectKey(…).through(…).countByKey() changes to stream.selectKey().groupByKey().count()

TopologyBuilder:

  • methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups()

DSL: new parameter to specify state store names:

  • The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter “operator/window name” is now the storeName)
  • KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName)
  • KTable#through(String topic) changes to #through(String topic, String storeName)
  • KGroupedStream #aggregate(), #reduce(), and #count() require additional parameter “String storeName”
  • Example: stream.countByKey(TimeWindows.of(“windowName”, 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), “countStoreName”)

Windowing:

  • Windows are not named anymore: TimeWindows.of(“name”, 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names)
  • JoinWindows has no default size anymore: JoinWindows.of(“name”).within(1000) changes to JoinWindows.of(1000)

Previous Next

7 - Streams Developer Guide