1 - Writing a Streams Application

Writing a Streams Application

Table of Contents

  • Libraries and Maven artifacts
  • Using Kafka Streams within your application code

Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

You can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs. Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Libraries and Maven artifacts

This section lists the Kafka Streams related libraries that are available for writing your Kafka Streams applications.

You can define dependencies on the following libraries for your Kafka Streams applications.

Group IDArtifact IDVersionDescription
org.apache.kafkakafka-streams1.1.0(Required) Base library for Kafka Streams.
org.apache.kafkakafka-clients1.1.0(Required) Kafka client library. Contains built-in serializers/deserializers.

Tip

See the section Data Types and Serialization for more information about Serializers/Deserializers.

Example pom.xml snippet when using Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

Using Kafka Streams within your application code

You can call Kafka Streams from anywhere in your application code, but usually these calls are made within the main() method of your application, or some variant thereof. The basic elements of defining a processing topology within your application are described below.

First, you must create an instance of KafkaStreams.

  • The first argument of the KafkaStreams constructor takes a topology (either StreamsBuilder#build() for the DSL or Topology for the Processor API) that is used to define a topology.
  • The second argument is an instance of StreamsConfig, which defines the configuration for this specific topology.

Code example:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = ...;  // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(topology, config);

At this point, internal structures are initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the KafkaStreams#start() method:

// Start the Kafka Streams threads
streams.start();

If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started. For more information, see Stream Partitions and Tasks and Threading Model.

To catch any unexpected exceptions, you can set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:

// Java 8+, using lambda expressions
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
  // here you should examine the throwable/exception and perform an appropriate action!
});


// Java 7
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  public void uncaughtException(Thread thread, Throwable throwable) {
    // here you should examine the throwable/exception and perform an appropriate action!
  }
});

To stop the application instance, call the KafkaStreams#close() method:

// Stop the Kafka Streams threads
streams.close();

To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close.

  • Here is a shutdown hook example in Java 8+:
// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  • Here is a shutdown hook example in Java 7:
// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  @Override
  public void run() {
      streams.close();
  }
}));

After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining instances.

Previous Next

2 - Configuring a Streams Application

Configuring a Streams Application

Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a StreamsConfig instance.

  1. Create a java.util.Properties instance.

  2. Set the parameters.

  3. Construct a StreamsConfig instance from the Properties instance. For example:

    import java.util.Properties; import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Any further settings
settings.put(... , ...);

// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);

Configuration parameter reference

This section contains the most common Streams configuration parameters. For a full reference, see the Streams Javadocs.

  • Required configuration parameters
    • application.id
    • bootstrap.servers
  • Optional configuration parameters
    • default.deserialization.exception.handler
    • default.production.exception.handler
    • default.key.serde
    • default.value.serde
    • num.standby.replicas
    • num.stream.threads
    • partition.grouper
    • replication.factor
    • state.dir
    • timestamp.extractor
  • Kafka consumers and producer configuration parameters
    • Naming
    • Default Values
    • enable.auto.commit
    • rocksdb.config.setter
  • Recommended configuration parameters for resiliency
    • acks
    • replication.factor

Required configuration parameters

Here are the required Streams configuration parameters.

Parameter NameImportanceDescriptionDefault Value
application.idRequiredAn identifier for the stream processing application. Must be unique within the Kafka cluster.None
bootstrap.serversRequiredA list of host/port pairs to use for establishing the initial connection to the Kafka cluster.None

application.id

(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to all instances of the application. It is recommended to use only alphanumeric characters, . (dot), - (hyphen), and _ (underscore). Examples: "hello_world", "hello_world-v1.0.0"

This ID is used in the following places to isolate resources used by the application from others:

  • As the default Kafka consumer and producer client.id prefix
  • As the Kafka consumer group.id for coordination
  • As the name of the subdirectory in the state directory (cf. state.dir)
  • As the prefix of internal Kafka topic names

Tip: When an application is updated, the application.id should be changed unless you want to reuse the existing data in internal topics and state stores. For example, you could embed the version information within application.id, as my-app-v1.0.0 and my-app-v1.0.2.

bootstrap.servers

(Required) The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092".

Tip: Kafka Streams applications can only communicate with a single Kafka cluster specified by this config value. Future versions of Kafka Streams will support connecting to different Kafka clusters for reading input streams and writing output streams.

Optional configuration parameters

Here are the optional Streams javadocs, sorted by level of importance:

  • High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters.
  • Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters.
  • Low: These parameters have a less general or less significant impact on performance.
Parameter NameImportanceDescriptionDefault Value
application.serverLowA host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single Kafka Streams application. The value of this must be different for each instance of the application.the empty string
buffered.records.per.partitionLowThe maximum number of records to buffer per partition.1000
cache.max.bytes.bufferingMediumMaximum number of memory bytes to be used for record caches across all threads.10485760 bytes
client.idMediumAn ID string to pass to the server when making requests. (This setting is passed to the consumer/producer clients used internally by Kafka Streams.)the empty string
commit.interval.msLowThe frequency with which to save the position (offsets in source topics) of tasks.30000 milliseconds
default.deserialization.exception.handlerMediumException handling class that implements the DeserializationExceptionHandler interface.LogAndContinueExceptionHandler
default.production.exception.handlerMediumException handling class that implements the ProductionExceptionHandler interface.DefaultProductionExceptionHandler
key.serdeMediumDefault serializer/deserializer class for record keys, implements the Serde interface (see also value.serde).Serdes.ByteArray().getClass().getName()
metric.reportersLowA list of classes to use as metrics reporters.the empty list
metrics.num.samplesLowThe number of samples maintained to compute metrics.2
metrics.recording.levelLowThe highest recording level for metrics.INFO
metrics.sample.window.msLowThe window of time a metrics sample is computed over.30000 milliseconds
num.standby.replicasMediumThe number of standby replicas for each task.0
num.stream.threadsMediumThe number of threads to execute stream processing.1
partition.grouperLowPartition grouper class that implements the PartitionGrouper interface.See Partition Grouper
poll.msLowThe amount of time in milliseconds to block waiting for input.100 milliseconds
replication.factorHighThe replication factor for changelog topics and repartition topics created by the application.1
state.cleanup.delay.msLowThe amount of time in milliseconds to wait before deleting state when a partition has migrated.600000 milliseconds
state.dirHighDirectory location for state stores./tmp/kafka-streams
timestamp.extractorMediumTimestamp extractor class that implements the TimestampExtractor interface.See Timestamp Extractor
value.serdeMediumDefault serializer/deserializer class for record values, implements the Serde interface (see also key.serde).Serdes.ByteArray().getClass().getName()
windowstore.changelog.additional.retention.msLowAdded to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.86400000 milliseconds = 1 day

default.deserialization.exception.handler

The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. These exception handlers are available:

  • LogAndContinueExceptionHandler: This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to deserialize.
  • LogAndFailExceptionHandler. This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.

default.production.exception.handler

The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the DefaultProductionExceptionHandler that always fails when these exceptions occur.

Each exception handler can return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:

            import java.util.Properties;
            import org.apache.kafka.streams.StreamsConfig;
            import org.apache.kafka.common.errors.RecordTooLargeException;
            import org.apache.kafka.streams.errors.ProductionExceptionHandler;
            import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;

            class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
                public void configure(Map<String, Object> config) {}

                public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                                 final Exception exception) {
                    if (exception instanceof RecordTooLargeException) {
                        return ProductionExceptionHandlerResponse.CONTINUE;
                    } else {
                        return ProductionExceptionHandlerResponse.FAIL;
                    }
                }
            }

            Properties settings = new Properties();

            // other various kafka streams settings, e.g. bootstrap servers, application id, etc

            settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                         IgnoreRecordTooLargeHandler.class);

default.key.serde

The default Serializer/Deserializer class for record keys. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the StreamsBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

default.value.serde

The default Serializer/Deserializer class for record values. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the StreamsBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

num.standby.replicas

The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

num.stream.threads

This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these thread. For more information about Kafka Streams threading model, see Threading Model.

partition.grouper

A partition grouper creates a list of stream tasks from the partitions of source topics, where each created task is assigned with a group of source topic partitions. The default implementation provided by Kafka Streams is DefaultPartitionGrouper. It assigns each task with one partition for each of the source topic partitions. The generated number of tasks equals the largest number of partitions among the input topics. Usually an application does not need to customize the partition grouper.

replication.factor

This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

Recommendation: Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures. Note that you will require more storage space as well (3 times more with the replication factor of 3).

state.dir

The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated with the application are created under this subdirectory.

timestamp.extractor

A timestamp extractor pulls a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is FailOnInvalidTimestamp. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client since Kafka version 0.10. Depending on the setting of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters, this extractor provides you with:

  • event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s official producer client, the timestamp represents milliseconds since the epoch.
  • ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.

The FailOnInvalidTimestamp extractor throws an exception if a record contains an invalid (i.e. negative) built-in timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.

If you have data with invalid timestamps and want to process it, then there are two alternative extractors available. Both work on built-in timestamps, but handle invalid timestamps differently.

  • LogAndSkipOnInvalidTimestamp: This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but silently drop the record. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an invalid built-in timestamp in your input data.
  • UsePreviousTimeOnInvalidTimestamp. This extractor returns the record’s built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception.

Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: System.currentTimeMillis()), which effectively means Streams will operate on the basis of the so-called processing-time of events.

You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or estimate a timestamp. Returning a negative timestamp will result in data loss - the corresponding record will not be processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via previousTimestamp (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
    long timestamp = -1;
    final Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      timestamp = myPojo.getTimestampInMillis();
    }
    if (timestamp < 0) {
      // Invalid timestamp!  Attempt to estimate a new timestamp,
      // otherwise fall back to wall-clock time (processing-time).
      if (previousTimestamp >= 0) {
        return previousTimestamp;
      } else {
        return System.currentTimeMillis();
      }
    }
  }

}

You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

Kafka consumers and producer configuration parameters

You can specify parameters for the Kafka consumers and producers that are used internally. The consumer and producer settings are defined by specifying parameters in a StreamsConfig instance.

In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:

Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
StreamsConfig config = new StreamsConfig(streamsSettings);

Naming

Some consumer and producer configuration parameters use the same parameter name. For example, send.buffer.bytes and receive.buffer.bytes are used to configure TCP buffers; request.timeout.ms and retry.backoff.ms control retries for client request. You can avoid duplicate names by prefix parameter names with consumer. or producer (e.g., consumer.send.buffer.bytes and producer.send.buffer.bytes).

Properties streamsSettings = new Properties();
// same value for consumer and producer
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");

Default Values

Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions of these configs, see Producer Configs and Consumer Configs.

Parameter NameCorresponding ClientStreams Default
auto.offset.resetConsumerearliest
enable.auto.commitConsumerfalse
linger.msProducer100
max.poll.interval.msConsumerInteger.MAX_VALUE
max.poll.recordsConsumer1000
retriesProducer10
rocksdb.config.setterConsumer

enable.auto.commit

The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config value to false. Consumers will only commit explicitly via commitSync calls when the Kafka Streams library or a user decides to commit the current processing state.

rocksdb.config.setter

The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, implement RocksDBConfigSetter and provide your custom class via rocksdb.config.setter.

Here is an example that adjusts the memory size consumed by RocksDB.

    public static class CustomRocksDBConfig implements RocksDBConfigSetter {

       @Override
       public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
         // See #1 below.
         BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
         tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
         // See #2 below.
         tableConfig.setBlockSize(16 * 1024L);
         // See #3 below.
         tableConfig.setCacheIndexAndFilterBlocks(true);
         options.setTableFormatConfig(tableConfig);
         // See #4 below.
         options.setMaxWriteBufferNumber(2);
       }
    }

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);

Notes for example:

  1. BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig(); Reduce block cache size from the default, shown here, as the total number of store RocksDB databases is partitions (40) * segments (3) = 120.
  2. tableConfig.setBlockSize(16 * 1024L); Modify the default block size per these instructions from the RocksDB GitHub.
  3. tableConfig.setCacheIndexAndFilterBlocks(true); Do not let the index and filter blocks grow unbounded. For more information, see the RocksDB GitHub.
  4. options.setMaxWriteBufferNumber(2); See the advanced options in the RocksDB GitHub.

Recommended configuration parameters for resiliency

There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:

Parameter NameCorresponding ClientDefault valueConsider setting to
acksProduceracks=1acks=all
replication.factorStreams13
min.insync.replicasBroker12

Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks setting to “all” guarantees that a record will not be lost as long as one replica is alive. The tradeoff from moving to the default values to the recommended ones is that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.

acks

The number of acknowledgments that the leader must have received before considering a request complete. This controls the durability of records that are sent. The possible values are:

  • acks=0 The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset returned for each record will always be set to -1.
  • acks=1 The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.
  • acks=all The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.

For more information, see the Kafka Producer documentation.

replication.factor

See the description here.

You define these settings via StreamsConfig:

Properties streamsSettings = new Properties();
streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

Note

A future version of Kafka Streams will allow developers to set their own app-specific configuration settings through StreamsConfig as well, which can then be accessed through ProcessorContext.

Previous Next

3 - Streams DSL

Streams DSL

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

Table of Contents

  • Overview
  • Creating source streams from Kafka
  • Transform a stream
    • Stateless transformations
    • Stateful transformations
      • Aggregating
      • Joining
        • Join co-partitioning requirements
        • KStream-KStream Join
        • KTable-KTable Join
        • KStream-KTable Join
        • KStream-GlobalKTable Join
      • Windowing
        • Tumbling time windows
        • Hopping time windows
        • Sliding time windows
        • Session Windows
    • Applying processors and transformers (Processor API integration)
  • Writing streams back to Kafka

Overview

In comparison to the Processor API, only the DSL supports:

  • Built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable. Having first-class support for streams and tables is crucial because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your application will be doing is transforming many input streams of customer-related events into an output table that contains a continuously updated 360-degree view of your customers.
  • Declarative, functional programming style with stateless transformations (e.g. map and filter) as well as stateful transformations such as aggregations (e.g. count and reduce), joins (e.g. leftJoin), and windowing (e.g. session windows).

With the DSL, you can define processor topologies (i.e., the logical processing plan) in your application. The steps to accomplish this are:

  1. Specify one or more input streams that are read from Kafka topics.
  2. Compose transformations on these streams.
  3. Write the resulting output streams back to Kafka topics, or expose the processing results of your application directly to other applications through interactive queries (e.g., via a REST API).

After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below.

For a complete list of available API functionality, see also the Streams API docs.

Creating source streams from Kafka

You can easily read data from Kafka topics into your application. The following operations are supported.

Reading from KafkaDescription
Stream
  • input topics -> KStream

| Creates a KStream from the specified Kafka input topics and interprets the data as a record stream. A KStream represents a partitioned record stream. (details) In the case of a KStream, the local KStream instance of every application instance will be populated with data from only a subset of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

If you do not specify SerDes explicitly, the default SerDes from the configuration are used. You must specify SerDes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default SerDes. For information about configuring default SerDes, available SerDes, and implementing your own custom SerDes see Data Types and Serialization. Several variants of stream exist. For example, you can specify a regex pattern for input topics to read from (note that all matching topics will be part of the same input topic group, and the work will not be parallelized for different topics if subscribed to in this way).
Table

  • input topic -> KTable

| Reads the specified Kafka input topic into a KTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or as DELETE (when the value is null) for that key. (details) In the case of a KStream, the local KStream instance of every application instance will be populated with data from only a subset of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed. You must provide a name for the table (more precisely, for the internal state store that backs the table). This is required for supporting interactive queries against the table. When a name is not provided the table will not queryable and an internal name will be provided for the state store. If you do not specify SerDes explicitly, the default SerDes from the configuration are used. You must specify SerDes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default SerDes. For information about configuring default SerDes, available SerDes, and implementing your own custom SerDes see Data Types and Serialization. Several variants of table exist, for example to specify the auto.offset.reset policy to be used when reading from the input topic.
Global Table

  • input topic -> GlobalKTable

| Reads the specified Kafka input topic into a GlobalKTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or as DELETE (when the value is null) for that key. (details) In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be populated with data from all the partitions of the input topic. You must provide a name for the table (more precisely, for the internal state store that backs the table). This is required for supporting interactive queries against the table. When a name is not provided the table will not queryable and an internal name will be provided for the state store.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

StreamsBuilder builder = new StreamsBuilder();

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

You must specify SerDes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default SerDes. For information about configuring default SerDes, available SerDes, and implementing your own custom SerDes see Data Types and Serialization. Several variants of globalTable exist to e.g. specify explicit SerDes.

Transform a stream

The KStream and KTable interfaces support a variety of transformation operations. Each of these operations can be translated into one or more connected processors into the underlying processor topology. Since KStream and KTable are strongly typed, all of these transformation operations are defined as generic functions where users could specify the input and output data types.

Some KStream transformations may generate one or more KStream objects, for example: - filter and map on a KStream will generate another KStream - branch on KStream can generate multiple KStreams

Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of late records after it has already been produced to the downstream transformation operators.

All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function that converts a KTable representation into a KStream. All of these transformation methods can be chained together to compose a complex processor topology.

These transformation operations are described in the following subsections:

  • Stateless transformations
  • Stateful transformations

Stateless transformations

Stateless transformations do not require state for processing and they do not require a state store associated with the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless KTable transformation. This allows the result to be queried through interactive queries. To materialize a KTable, each of the below stateless operations can be augmented with an optional queryableStoreName argument.

TransformationDescription
Branch
  • KStream -> KStream[]

| Branch (or split) a KStream based on the supplied predicates into one or more KStream instances. (details) Predicates are evaluated in order. A record is placed to one and only one output stream on the first match: if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the the record is dropped. Branching is useful, for example, to route records to different downstream topics.

KStream<String, Long> stream = ...;
KStream<String, Long>[] branches = stream.branch(
    (key, value) -> key.startsWith("A"), /* first predicate  */
    (key, value) -> key.startsWith("B"), /* second predicate */
    (key, value) -> true                 /* third predicate  */
  );

// KStream branches[0] contains all records whose keys start with "A"
// KStream branches[1] contains all records whose keys start with "B"
// KStream branches[2] contains all other records

// Java 7 example: cf. `filter` for how to create `Predicate` instances

Filter

  • KStream -> KStream
  • KTable -> KTable

| Evaluates a boolean function for each element and retains those for which the function returns true. (KStream details, KTable details)

KStream<String, Long> stream = ...;

// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);

// Java 7 example
KStream<String, Long> onlyPositives = stream.filter(
    new Predicate<String, Long>() {
      @Override
      public boolean test(String key, Long value) {
        return value > 0;
      }
    });

Inverse Filter

  • KStream -> KStream
  • KTable -> KTable

| Evaluates a boolean function for each element and drops those for which the function returns true. (KStream details, KTable details)

KStream<String, Long> stream = ...;

// An inverse filter that discards any negative numbers or zero
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filterNot((key, value) -> value <= 0);

// Java 7 example
KStream<String, Long> onlyPositives = stream.filterNot(
    new Predicate<String, Long>() {
      @Override
      public boolean test(String key, Long value) {
        return value <= 0;
      }
    });

FlatMap

  • KStream -> KStream

| Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types. (details) Marks the stream for data re-partitioning: Applying a grouping or a join after flatMap will result in re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

KStream<Long, String> stream = ...;
KStream<String, Integer> transformed = stream.flatMap(
     // Here, we generate two output records for each input record.
     // We also change the key and value types.
     // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
    (key, value) -> {
      List<KeyValue<String, Integer>> result = new LinkedList<>();
      result.add(KeyValue.pair(value.toUpperCase(), 1000));
      result.add(KeyValue.pair(value.toLowerCase(), 9000));
      return result;
    }
  );

// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances

FlatMap (values only)

  • KStream -> KStream

| Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type. (details) flatMapValues is preferable to flatMap because it will not cause data re-partitioning. However, you cannot modify the key or key type like flatMap does.

// Split a sentence into words.
KStream<byte[], String> sentences = ...;
KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\s+")));

// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances

Foreach

  • KStream -> void
  • KStream -> void
  • KTable -> void

| Terminal operation. Performs a stateless action on each record. (details) You would use foreach to cause side effects based on the input data (similar to peek) and then stop further processing of the input data (unlike peek, which is not a terminal operation). Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.

KStream<String, Long> stream = ...;

// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -> System.out.println(key + " => " + value));

// Java 7 example
stream.foreach(
    new ForeachAction<String, Long>() {
      @Override
      public void apply(String key, Long value) {
        System.out.println(key + " => " + value);
      }
    });

GroupByKey

  • KStream -> KGroupedStream

| Groups the records by the existing key. (details) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations. When to set explicit SerDes: Variants of groupByKey exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resulting KGroupedStream do not match the configured default SerDes. Note Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins. Causes data re-partitioning if and only if the stream was marked for re-partitioning. groupByKey is preferable to groupBy because it re-partitions data only if the stream was already marked for re-partitioning. However, groupByKey does not allow you to modify the key or key type like groupBy does.

KStream<byte[], String> stream = ...;

// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();

// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
    Serialized.with(
      Serdes.ByteArray(), /* key */
      Serdes.String())     /* value */
  );

GroupBy

  • KStream -> KGroupedStream
  • KTable -> KGroupedTable

| Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey(). (KStream details, KTable details) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations. When to set explicit SerDes: Variants of groupBy exist to override the configured default SerDes of your application, which you must do if the key and/or value types of the resulting KGroupedStream or KGroupedTable do not match the configured default SerDes. Note Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins. Always causes data re-partitioning: groupBy always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.

KStream<byte[], String> stream = ...;
KTable<byte[], String> table = ...;

// Java 8+ examples, using lambda expressions

// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Serialized.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    (key, value) -> KeyValue.pair(value, value.length()),
    Serialized.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.Integer()) /* value (note: type was modified) */
  );


// Java 7 examples

// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
    new KeyValueMapper<byte[], String, String>>() {
      @Override
      public String apply(byte[] key, String value) {
        return value;
      }
    },
    Serialized.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
      @Override
      public KeyValue<String, Integer> apply(byte[] key, String value) {
        return KeyValue.pair(value, value.length());
      }
    },
    Serialized.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.Integer()) /* value (note: type was modified) */
  );

Map

  • KStream -> KStream

| Takes one record and produces one record. You can modify the record key and value, including their types. (details) Marks the stream for data re-partitioning: Applying a grouping or a join after map will result in re-partitioning of the records. If possible use mapValues instead, which will not cause data re-partitioning.

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

// Java 7 example
KStream<String, Integer> transformed = stream.map(
    new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
      @Override
      public KeyValue<String, Integer> apply(byte[] key, String value) {
        return new KeyValue<>(value.toLowerCase(), value.length());
      }
    });

Map (values only)

  • KStream -> KStream
  • KTable -> KTable

| Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type. (KStream details, KTable details) mapValues is preferable to map because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like map does.

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());

// Java 7 example
KStream<byte[], String> uppercased = stream.mapValues(
    new ValueMapper<String>() {
      @Override
      public String apply(String s) {
        return s.toUpperCase();
      }
    });

Peek

  • KStream -> KStream

| Performs a stateless action on each record, and returns an unchanged stream. (details) You would use peek to cause side effects based on the input data (similar to foreach) and continue processing the input data (unlike foreach, which is a terminal operation). peek returns the input stream as-is; if you need to modify the input stream, use map or mapValues instead. peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting. Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
KStream<byte[], String> unmodifiedStream = stream.peek(
    (key, value) -> System.out.println("key=" + key + ", value=" + value));

// Java 7 example
KStream<byte[], String> unmodifiedStream = stream.peek(
    new ForeachAction<byte[], String>() {
      @Override
      public void apply(byte[] key, String value) {
        System.out.println("key=" + key + ", value=" + value);
      }
    });

Print

  • KStream -> void

| Terminal operation. Prints the records to System.out. See Javadocs for serde and toString() caveats. (details) Calling print() is the same as calling foreach((key, value) -> System.out.println(key + ", " + value))

KStream<byte[], String> stream = ...;
// print to sysout
stream.print();

// print to file with a custom label
stream.print(Printed.toFile("streams.out").withLabel("streams"));

SelectKey

  • KStream -> KStream

| Assigns a new key - possibly of a new key type - to each record. (details) Calling selectKey(mapper) is the same as calling map((key, value) -> mapper(key, value), value). Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will result in re-partitioning of the records.

KStream<byte[], String> stream = ...;

// Derive a new record key from the record's value.  Note how the key type changes, too.
// Java 8+ example, using lambda expressions
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])

// Java 7 example
KStream<String, String> rekeyed = stream.selectKey(
    new KeyValueMapper<byte[], String, String>() {
      @Override
      public String apply(byte[] key, String value) {
        return value.split(" ")[0];
      }
    });

Table to Stream

  • KTable -> KStream

| Get the changelog stream of this table. (details)

KTable<byte[], String> table = ...;

// Also, a variant of `toStream` exists that allows you
// to select a new key for the resulting stream.
KStream<byte[], String> stream = table.toStream();

Stateful transformations

Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary.

Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore all state stores prior to resuming the processing. See Fault Tolerance for further information.

Available stateful transformations in the DSL include:

  • Aggregating
  • Joining
  • Windowing (as part of aggregations and joins)
  • Applying custom processors and transformers, which may be stateful, for Processor API integration

The following diagram shows their relationships:

Stateful transformations in the DSL.

Here is an example of a stateful application: the WordCount algorithm.

WordCount example in Java 8+, using lambda expressions:

// Assume the record values represent lines of text.  For the sake of this example, you can ignore
// whatever may be stored in the record keys.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.  The text lines are the record
    // values, i.e. you can ignore whatever data is in the record keys and thus invoke
    // `flatMapValues` instead of the more generic `flatMap`.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
    // Group the stream by word to ensure the key of the record is the word.
    .groupBy((key, word) -> word)
    // Count the occurrences of each word (record key).
    //
    // This will change the stream type from `KGroupedStream<String, String>` to
    // `KTable<String, Long>` (word -> count).
    .count()
    // Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
    .toStream();

WordCount example in Java 7:

// Code below is equivalent to the previous Java 8+ example above.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
    .flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            return Arrays.asList(value.toLowerCase().split("\W+"));
        }
    })
    .groupBy(new KeyValueMapper<String, String, String>>() {
        @Override
        public String apply(String key, String word) {
            return word;
        }
    })
    .count()
    .toStream();

Aggregating

After records are grouped by key via groupByKey or groupBy - and thus represented as either a KGroupedStream or a KGroupedTable, they can be aggregated via an operation such as reduce. Aggregations are key-based operations, which means that they always operate over records (notably record values) of the same key. You can perform aggregations on windowed or non-windowed data.

TransformationDescription
Aggregate
  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (KGroupedStream details, KGroupedTable details) When aggregating a grouped stream , you must provide an initializer (e.g., aggValue = 0) and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table , you must provide a “subtractor” aggregator (think: aggValue - oldValue). Several variants of aggregate exist, see Javadocs for details.

KGroupedStream<byte[], String> groupedStream = ...;
KGroupedTable<byte[], String> groupedTable = ...;

// Java 8+ examples, using lambda expressions

// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    Materialized.as("aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long()); /* serde for aggregate value */

// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.as("aggregated-table-store") /* state store name */
	.withValueSerde(Serdes.Long()) /* serde for aggregate value */


// Java 7 examples

// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    new Initializer<Long>() { /* initializer */
      @Override
      public Long apply() {
        return 0L;
      }
    },
    new Aggregator<byte[], String, Long>() { /* adder */
      @Override
      public Long apply(byte[] aggKey, String newValue, Long aggValue) {
        return aggValue + newValue.length();
      }
    },
    Materialized.as("aggregated-stream-store")
        .withValueSerde(Serdes.Long());

// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    new Initializer<Long>() { /* initializer */
      @Override
      public Long apply() {
        return 0L;
      }
    },
    new Aggregator<byte[], String, Long>() { /* adder */
      @Override
      public Long apply(byte[] aggKey, String newValue, Long aggValue) {
        return aggValue + newValue.length();
      }
    },
    new Aggregator<byte[], String, Long>() { /* subtractor */
      @Override
      public Long apply(byte[] aggKey, String oldValue, Long aggValue) {
        return aggValue - oldValue.length();
      }
    },
    Materialized.as("aggregated-stream-store")
        .withValueSerde(Serdes.Long());

Detailed behavior of KGroupedStream:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder).
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior of KGroupedTable:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key (see below).
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record - i.e. a record with a null value - is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Aggregate (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Aggregates the values of records, per window, by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (TimeWindowedKStream details, SessionWindowedKStream details) You must provide an initializer (e.g., aggValue = 0), “adder” aggregator (e.g., aggValue + curValue), and a window. When windowing based on sessions, you must additionally provide a “session merger” aggregator (e.g., mergedAggValue = leftAggValue + rightAggValue). The windowed aggregate turns a TimeWindowedKStream<K, V> or SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of aggregate exist, see Javadocs for details.

import java.util.concurrent.TimeUnit;
KGroupedStream<String, Long> groupedStream = ...;

// Java 8+ examples, using lambda expressions

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeUnit.MINUTES.toMillis(5))
    .aggregate(
      () -> 0L, /* initializer */
    	(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
      Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5)).
    aggregate(
    	() -> 0L, /* initializer */
    	(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    	(aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
	    Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Java 7 examples

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeUnit.MINUTES.toMillis(5))
    .aggregate(
        new Initializer<Long>() { /* initializer */
            @Override
            public Long apply() {
                return 0L;
            }
        },
        new Aggregator<String, Long, Long>() { /* adder */
            @Override
            public Long apply(String aggKey, Long newValue, Long aggValue) {
                return aggValue + newValue;
            }
        },
        Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store")
          .withValueSerde(Serdes.Long()));

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(5)).
    aggregate(
        new Initializer<Long>() { /* initializer */
            @Override
            public Long apply() {
                return 0L;
            }
        },
        new Aggregator<String, Long, Long>() { /* adder */
            @Override
            public Long apply(String aggKey, Long newValue, Long aggValue) {
                return aggValue + newValue;
            }
        },
        new Merger<String, Long>() { /* session merger */
            @Override
            public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
                return rightAggValue + leftAggValue;
            }
        },
        Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store")
          .withValueSerde(Serdes.Long()));

Detailed behavior:

  • The windowed aggregate behaves similar to the rolling aggregate described above. The additional twist is that the behavior applies per window.
  • Input records with null keys are ignored in general.
  • When a record key is received for the first time for a given window, the initializer is called (and called before the adder).
  • Whenever a record with a non-null value is received for a given window, the adder is called.
  • When using session windows: the session merger is called whenever two sessions are being merged.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Count

  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Counts the number of records by the grouped key. (KGroupedStream details, KGroupedTable details) Several variants of count exist, see Javadocs for details.

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Counting a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.count();

// Counting a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.count();

Detailed behavior for KGroupedStream:

  • Input records with null keys or values are ignored.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored. Records with null values are not ignored but interpreted as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.

Count (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Counts the number of records, per window, by the grouped key. (TimeWindowedKStream details, SessionWindowedKStream details) The windowed count turns a TimeWindowedKStream<K, V> or SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of count exist, see Javadocs for details.

import java.util.concurrent.TimeUnit;
KGroupedStream<String, Long> groupedStream = ...;

// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    TimeWindows.of(TimeUnit.MINUTES.toMillis(5))) /* time-based window */
    .count();

// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
    .count();

Detailed behavior:

  • Input records with null keys or values are ignored.

Reduce

  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike aggregate. (KGroupedStream details, KGroupedTable details) When reducing a grouped stream , you must provide an “adder” reducer (e.g., aggValue + curValue). When reducing a grouped table , you must additionally provide a “subtractor” reducer (e.g., aggValue - oldValue). Several variants of reduce exist, see Javadocs for details.

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Java 8+ examples, using lambda expressions

// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */);

// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
    (aggValue, newValue) -> aggValue + newValue, /* adder */
    (aggValue, oldValue) -> aggValue - oldValue /* subtractor */);


// Java 7 examples

// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    new Reducer<Long>() { /* adder */
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return aggValue + newValue;
      }
    });

// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
    new Reducer<Long>() { /* adder */
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return aggValue + newValue;
      }
    },
    new Reducer<Long>() { /* subtractor */
      @Override
      public Long apply(Long aggValue, Long oldValue) {
        return aggValue - oldValue;
      }
    });

Detailed behavior for KGroupedStream:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value. Note that, in contrast to KGroupedStream, over time this initialization step may happen more than once for a key as a result of having received input tombstone records for that key (see below).
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
  • When a tombstone record - i.e. a record with a null value - is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will re-initialize its aggregate value.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Reduce (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Combines the values of records, per window, by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. Records with null key or value are ignored. The result value type cannot be changed, unlike aggregate. (TimeWindowedKStream details, SessionWindowedKStream details) The windowed reduce turns a turns a TimeWindowedKStream<K, V> or a SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of reduce exist, see Javadocs for details.

import java.util.concurrent.TimeUnit;
KGroupedStream<String, Long> groupedStream = ...;

// Java 8+ examples, using lambda expressions

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  TimeWindows.of(TimeUnit.MINUTES.toMillis(5)) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy(
  SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );


// Java 7 examples

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream..windowedBy(
  TimeWindows.of(TimeUnit.MINUTES.toMillis(5)) /* time-based window */)
  .reduce(
    new Reducer<Long>() { /* adder */
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return aggValue + newValue;
      }
    });

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  SessionWindows.with(TimeUnit.MINUTES.toMillis(5))) /* session window */
  .reduce(
    new Reducer<Long>() { /* adder */
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return aggValue + newValue;
      }
    });

Detailed behavior:

  • The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that the behavior applies per window.
  • Input records with null keys are ignored in general.
  • When a record key is received for the first time for a given window, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received for a given window, the adder is called.

See the example at the bottom of this section for a visualization of the aggregation semantics.

Example of semantics for stream aggregations: A KGroupedStream -> KTable example is shown below. The streams and the table are initially empty. Bold font is used in the column for “KTable aggregated” to highlight changed state. An entry such as (hello, 1) denotes a record with key hello and value 1. To improve the readability of the semantics table you can assume that all records are processed in timestamp order.

// Key: word, value: count
KStream<String, Integer> wordCounts = ...;

KGroupedStream<String, Integer> groupedStream = wordCounts
    .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));

KTable<String, Integer> aggregated = groupedStream.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

Note

Impact of record caches : For illustration purposes, the column “KTable aggregated” below shows the table’s state changes over time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key kafka in the KTable (here: from (kafka 1) directly to (kafka, 3). Typically, you should only disable record caches for testing or debugging purposes - under normal circumstances it is better to leave record caches enabled.

KStream wordCountsKGroupedStream groupedStreamKTable aggregated
TimestampInput recordGroupingInitializer
1(hello, 1)(hello, 1)0 (for hello)
2(kafka, 1)(kafka, 1)0 (for kafka)
3(streams, 1)(streams, 1)0 (for streams)
4(kafka, 1)(kafka, 1)
5(kafka, 1)(kafka, 1)
6(streams, 1)(streams, 1)

Example of semantics for table aggregations: A KGroupedTable -> KTable example is shown below. The tables are initially empty. Bold font is used in the column for “KTable aggregated” to highlight changed state. An entry such as (hello, 1) denotes a record with key hello and value 1. To improve the readability of the semantics table you can assume that all records are processed in timestamp order.

// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
KTable<String, String> userProfiles = ...;

// Re-group `userProfiles`.  Don't read too much into what the grouping does:
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable<String, Integer> groupedTable = userProfiles
    .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());

KTable<String, Integer> aggregated = groupedTable.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

Note

Impact of record caches : For illustration purposes, the column “KTable aggregated” below shows the table’s state changes over time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key kafka in the KTable (here: from (kafka 1) directly to (kafka, 3). Typically, you should only disable record caches for testing or debugging purposes - under normal circumstances it is better to leave record caches enabled.

KTable userProfilesKGroupedTable groupedTableKTable aggregated
TimestampInput recordInterpreted asGrouping
1(alice, E)INSERT alice(E, 5)
2(bob, A)INSERT bob(A, 3)
3(charlie, A)INSERT charlie(A, 7)
4(alice, A)UPDATE alice(A, 5)
5(charlie, null)DELETE charlie(null, 7)
6(null, E)ignored
7(bob, E)UPDATE bob(E, 3)

Joining

Streams and tables can also be joined. Many stream processing applications in practice are coded as streaming joins. For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales prices, inventory, customer information) in order to enrich a new data record (e.g. customer transaction) with context information. That is, scenarios where you need to perform table lookups at very large scale and with a low processing latency. Here, a popular pattern is to make the information in the databases available in Kafka through so-called change data capture in combination with Kafka’s Connect API, and then implementing applications that leverage the Streams API to perform very fast and efficient local joins of such tables and streams, rather than requiring the application to make a query to a remote database over the network for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state (e.g., snapshot) of each table in a local state store, thus greatly reducing the processing latency as well as reducing the load of the remote databases when doing such streaming joins.

The following join operations are supported, see also the diagram in the overview section of Stateful Transformations. Depending on the operands, joins are either windowed joins or non-windowed joins.

Join operandsType(INNER) JOINLEFT JOINOUTER JOIN
KStream-to-KStreamWindowedSupportedSupportedSupported
KTable-to-KTableNon-windowedSupportedSupportedSupported
KStream-to-KTableNon-windowedSupportedSupportedNot Supported
KStream-to-GlobalKTableNon-windowedSupportedSupportedNot Supported
KTable-to-GlobalKTableN/ANot SupportedNot SupportedNot Supported

Each case is explained in more detail in the subsequent sections.

Join co-partitioning requirements

Input data must be co-partitioned when joining. This ensures that input records with the same key, from both sides of the join, are delivered to the same stream task during processing. It is the responsibility of the user to ensure data co-partitioning when joining.

Tip

If possible, consider using global tables (GlobalKTable) for joining because they do not require data co-partitioning.

The requirements for data co-partitioning are:

  • The input topics of the join (left side and right side) must have the same number of partitions.
  • All applications that write to the input topics must have the same partitioning strategy so that records with the same key are delivered to same partition number. In other words, the keyspace of the input data must be distributed across partitions in the same manner. This means that, for example, applications that use Kafka’s Java Producer API must use the same partitioner (cf. the producer setting "partitioner.class" aka ProducerConfig.PARTITIONER_CLASS_CONFIG), and applications that use the Kafka’s Streams API must use the same StreamPartitioner for operations such as KStream#to(). The good news is that, if you happen to use the default partitioner-related settings across all applications, you do not need to worry about the partitioning strategy.

Why is data co-partitioning required? Because KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on the keys of records (e.g., leftRecord.key == rightRecord.key), it is required that the input streams/tables of a join are co-partitioned by key.

The only exception are KStream-GlobalKTable joins. Here, co-partitioning is it not required because all partitions of the GlobalKTable’s underlying changelog stream are made available to each KafkaStreams instance, i.e. each instance has a full copy of the changelog stream. Further, a KeyValueMapper allows for non-key based joins from the KStream to the GlobalKTable.

Note

Kafka Streams partly verifies the co-partitioning requirement: During the partition assignment step, i.e. at runtime, Kafka Streams verifies whether the number of partitions for both sides of a join are the same. If they are not, a TopologyBuilderException (runtime exception) is being thrown. Note that Kafka Streams cannot verify whether the partitioning strategy matches between the input streams/tables of a join - it is up to the user to ensure that this is the case.

Ensuring data co-partitioning: If the inputs of a join are not co-partitioned yet, you must ensure this manually. You may follow a procedure such as outlined below.

  1. Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions. Let’s call this stream/table “SMALLER”, and the other side of the join “LARGER”. To learn about the number of partitions of a Kafka topic you can use, for example, the CLI tool bin/kafka-topics with the --describe option.

  2. Pre-create a new Kafka topic for “SMALLER” that has the same number of partitions as “LARGER”. Let’s call this new topic “repartitioned-topic-for-smaller”. Typically, you’d use the CLI tool bin/kafka-topics with the --create option for this.

  3. Within your application, re-write the data of “SMALLER” into the new Kafka topic. You must ensure that, when writing the data with to or through, the same partitioner is used as for “LARGER”.

 * If "SMALLER" is a KStream: `KStream#to("repartitioned-topic-for-smaller")`.
 * If "SMALLER" is a KTable: `KTable#to("repartitioned-topic-for-smaller")`.
  1. Within your application, re-read the data in “repartitioned-topic-for-smaller” into a new KStream/KTable.
 * If "SMALLER" is a KStream: `StreamsBuilder#stream("repartitioned-topic-for-smaller")`.
 * If "SMALLER" is a KTable: `StreamsBuilder#table("repartitioned-topic-for-smaller")`.
  1. Within your application, perform the join between “LARGER” and the new stream/table.

KStream-KStream Join

KStream-KStream joins are always windowed joins, because otherwise the size of the internal state store used to perform the join - e.g., a sliding window or “buffer” - would grow indefinitely. For stream-stream joins it’s important to highlight that a new input record on one side will produce a join output for each matching record on the other side, and there can be multiple such matching records in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join (windowed)
  • (KStream, KStream) -> KStream

| Performs an INNER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of join exists, see the Javadocs for details.

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

// Java 7 example
KStream<String, String> joined = left.join(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key or a `null` value are ignored and do not trigger the join.

See the semantics overview at the bottom of this section for a detailed description.
Left Join (windowed)

  • (KStream, KStream) -> KStream

| Performs a LEFT JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of leftJoin exists, see the Javadocs for details.

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

// Java 7 example
KStream<String, String> joined = left.leftJoin(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key or a `null` value are ignored and do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

See the semantics overview at the bottom of this section for a detailed description.
Outer Join (windowed)

  • (KStream, KStream) -> KStream

| Performs an OUTER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of outerJoin exists, see the Javadocs for details.

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

// Java 7 example
KStream<String, String> joined = left.outerJoin(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key or a `null` value are ignored and do not trigger the join.
  • For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively; this explains the row with timestamp=3 in the table below, which lists [A, null] in the OUTER JOIN column (unlike LEFT JOIN, [null, x] is possible, too, but no such example is shown in the table).

See the semantics overview at the bottom of this section for a detailed description.

Semantics of stream-stream joins: The semantics of the various stream-stream join variants are explained below. To improve the readability of the table, assume that (1) all records have the same key (and thus the key in the table is omitted), (2) all records belong to a single join window, and (3) all records are processed in timestamp order. The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join, leftJoin, and outerJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KStream)Right (KStream)(INNER) JOINLEFT JOINOUTER JOIN
1null
2null
3A[A, null][A, null]
4a[A, a][A, a][A, a]
5B[B, a][B, a][B, a]
6b[A, b], [B, b][A, b], [B, b][A, b], [B, b]
7null
8null
9C[C, a], [C, b][C, a], [C, b][C, a], [C, b]
10c[A, c], [B, c], [C, c][A, c], [B, c], [C, c][A, c], [B, c], [C, c]
11null
12null
13null
14d[A, d], [B, d], [C, d][A, d], [B, d], [C, d][A, d], [B, d], [C, d]
15D[D, a], [D, b], [D, c], [D, d][D, a], [D, b], [D, c], [D, d][D, a], [D, b], [D, c], [D, d]

KTable-KTable Join

KTable-KTable joins are always non-windowed joins. They are designed to be consistent with their counterparts in relational databases. The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. The join result is a new KTable that represents the changelog stream of the join operation.

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KTable, KTable) -> KTable

| Performs an INNER JOIN of this table with another table. The result is an ever-updating KTable that represents the “current” result of the join. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

// Java 7 example
KTable<String, String> joined = left.join(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    });

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).

See the semantics overview at the bottom of this section for a detailed description.
Left Join

  • (KTable, KTable) -> KTable

| Performs a LEFT JOIN of this table with another table. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

// Java 7 example
KTable<String, String> joined = left.leftJoin(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    });

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

See the semantics overview at the bottom of this section for a detailed description.
Outer Join

  • (KTable, KTable) -> KTable

| Performs an OUTER JOIN of this table with another table. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

// Java 7 example
KTable<String, String> joined = left.outerJoin(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    });

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
  • For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively; this explains the rows with timestamp=3 and timestamp=7 in the table below, which list [A, null] and [null, b], respectively, in the OUTER JOIN column.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of table-table joins: The semantics of the various table-table join variants are explained below. To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order. The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join, leftJoin, and outerJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KTable)Right (KTable)(INNER) JOINLEFT JOINOUTER JOIN
1null
2null
3A[A, null][A, null]
4a[A, a][A, a][A, a]
5B[B, a][B, a][B, a]
6b[B, b][B, b][B, b]
7nullnullnull[null, b]
8nullnull
9C[C, null][C, null]
10c[C, c][C, c][C, c]
11nullnull[C, null][C, null]
12nullnullnull
13null
14d[null, d]
15D[D, d][D, d][D, d]

KStream-KTable Join

KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be to enrich a stream of user activities (KStream) with the latest user profile information (KTable).

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KStream, KTable) -> KStream

| Performs an INNER JOIN of this stream with the table, effectively doing a table lookup. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning. Several variants of join exists, see the Javadocs for details.

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
  );

// Java 7 example
KStream<String, String> joined = left.join(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.

See the semantics overview at the bottom of this section for a detailed description.
Left Join

  • (KStream, KTable) -> KStream

| Performs a LEFT JOIN of this stream with the table, effectively doing a table lookup. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning. Several variants of leftJoin exists, see the Javadocs for details.

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
  );

// Java 7 example
KStream<String, String> joined = left.leftJoin(right,
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    },
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of stream-table joins: The semantics of the various stream-table join variants are explained below. To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in the table) and that (2) all records are processed in timestamp order. The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join and leftJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KStream)Right (KTable)(INNER) JOINLEFT JOIN
1null
2null
3A[A, null]
4a
5B[B, a][B, a]
6b
7null
8null
9C[C, null]
10c
11null
12null
13null
14d
15D[D, d][D, d]

KStream-GlobalKTable Join

KStream-GlobalKTable joins are always non-windowed joins. They allow you to perform table lookups against a GlobalKTable (entire changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be “star queries” or “star joins”, where you would enrich a stream of user activities (KStream) with the latest user profile information (GlobalKTable) and further context information (further GlobalKTables).

At a high-level, KStream-GlobalKTable joins are very similar to KStream-KTable joins. However, global tables provide you with much more flexibility at the some expense when compared to partitioned tables:

  • They do not require data co-partitioning.
  • They allow for efficient “star joins”; i.e., joining a large-scale “facts” stream against “dimension” tables
  • They allow for joining against foreign keys; i.e., you can lookup data in the table not just by the keys of records in the stream, but also by data in the record values.
  • They make many use cases feasible where you must work on heavily skewed data and thus suffer from hot partitions.
  • They are often more efficient than their partitioned KTable counterpart when you need to perform multiple joins in succession.

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KStream, GlobalKTable) -> KStream

| Performs an INNER JOIN of this stream with the global table, effectively doing a table lookup. (details) The GlobalKTable is fully bootstrapped upon (re)start of a KafkaStreams instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

// Java 7 example
KStream<String, String> joined = left.join(right,
    new KeyValueMapper<String, Long, Integer>() { /* derive a (potentially) new key by which to lookup against the table */
      @Override
      public Integer apply(String key, Long value) {
        return key.length();
      }
    },
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    });

Detailed behavior:

  • The join is indirectly key-based , i.e. with the join predicate KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ , which indicate the deletion of a record key from the table. Tombstones do not trigger the join.

Left Join

  • (KStream, GlobalKTable) -> KStream

| Performs a LEFT JOIN of this stream with the global table, effectively doing a table lookup. (details) The GlobalKTable is fully bootstrapped upon (re)start of a KafkaStreams instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

// Java 7 example
KStream<String, String> joined = left.leftJoin(right,
    new KeyValueMapper<String, Long, Integer>() { /* derive a (potentially) new key by which to lookup against the table */
      @Override
      public Integer apply(String key, Long value) {
        return key.length();
      }
    },
    new ValueJoiner<Long, Double, String>() {
      @Override
      public String apply(Long leftValue, Double rightValue) {
        return "left=" + leftValue + ", right=" + rightValue;
      }
    });

Detailed behavior:

  • The join is indirectly key-based , i.e. with the join predicate KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ , which indicate the deletion of a record key from the table. Tombstones do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null).

Semantics of stream-table joins: The join semantics are identical to KStream-KTable joins. The only difference is that, for KStream-GlobalKTable joins, the left input record is first “mapped” with a user-supplied KeyValueMapper into the table’s keyspace prior to the table lookup.

Windowing

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key.

Note

A related operation is grouping, which groups all records that have the same key to ensure that data is properly partitioned (“keyed”) for subsequent operations. Once grouped, windowing allows you to further sub-group the records of a key.

For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Windows#until() and SessionWindows#until().

The DSL supports the following types of windows:

Window nameBehaviorShort description
Tumbling time windowTime-basedFixed-size, non-overlapping, gap-less windows
Hopping time windowTime-basedFixed-size, overlapping windows
Sliding time windowTime-basedFixed-size, overlapping windows that work on differences between record timestamps
Session windowSession-basedDynamically-sized, non-overlapping, data-driven windows

Tumbling time windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

This diagram shows windowing a stream of data records with tumbling windows. Windows do not overlap because, by definition, the advance interval is identical to the window size. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

Tumbling time windows are aligned to the epoch , with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... – and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....

The following code defines a tumbling window with a size of 5 minutes:

import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.kstream.TimeWindows;

// A tumbling time window with a size of 5 minutes (and, by definition, an implicit
// advance interval of 5 minutes).
long windowSizeMs = TimeUnit.MINUTES.toMillis(5); // 5 * 60 * 1000L
TimeWindows.of(windowSizeMs);

// The above is equivalent to the following code:
TimeWindows.of(windowSizeMs).advanceBy(windowSizeMs);

Hopping time windows

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap - and in general they do - a data record may belong to more than one such windows.

Note

Hopping windows vs. sliding windows: Hopping windows are sometimes called “sliding windows” in other stream processing tools. Kafka Streams follows the terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows.

The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute:

import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.kstream.TimeWindows;

// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.
// The window's name -- the string parameter -- is used to e.g. name the backing state store.
long windowSizeMs = TimeUnit.MINUTES.toMillis(5); // 5 * 60 * 1000L
long advanceMs =    TimeUnit.MINUTES.toMillis(1); // 1 * 60 * 1000L
TimeWindows.of(windowSizeMs).advanceBy(advanceMs);

This diagram shows windowing a stream of data records with hopping windows. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

Hopping time windows are aligned to the epoch , with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries [0;5000),[3000;8000),... – and not [1000;6000),[4000;9000),... or even something “random” like [1452;6452),[4452;9452),....

Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a windowed KTable whose keys type is Windowed<K>. This is to differentiate aggregate values with the same key from different windows. The corresponding window instance and the embedded key can be retrieved as Windowed#window() and Windowed#key(), respectively.

Sliding time windows

Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the JoinWindows class.

A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but to the data record timestamps. In contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are both inclusive.

Session Windows

Session windows are used to aggregate key-based events into so-called sessions , the process of which is referred to as sessionization. Sessions represent a period of activity separated by a defined gap of inactivity (or “idleness”). Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If an event falls outside of the session gap, then a new session will be created.

Session windows are different from the other window types in that:

  • all windows are tracked independently across keys - e.g. windows of different keys typically have different start and end times
  • their window sizes sizes vary - even windows for the same key typically have different sizes

The prime area of application for session windows is user behavior analysis. Session-based analyses can range from simple metrics (e.g. count of user visits on a news website or social platform) to more complex metrics (e.g. customer conversion funnel and event flows).

The following code defines a session window with an inactivity gap of 5 minutes:

import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.kstream.SessionWindows;

// A session window with an inactivity gap of 5 minutes.
SessionWindows.with(TimeUnit.MINUTES.toMillis(5));

Given the previous session window example, here’s what would happen on an input stream of six records. When the first three records arrive (upper part of in the diagram below), we’d have three sessions (see lower part) after having processed those records: two for the green record key, with one session starting and ending at the 0-minute mark (only due to the illustration it looks as if the session goes from 0 to 1), and another starting and ending at the 6-minute mark; and one session for the blue record key, starting and ending at the 2-minute mark.

Detected sessions after having received three input records: two records for the green record key at t=0 and t=6, and one record for the blue record key at t=2. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

If we then receive three additional records (including two late-arriving records), what would happen is that the two existing sessions for the green record key will be merged into a single session starting at time 0 and ending at time 6, consisting of a total of three records. The existing session for the blue record key will be extended to end at time 5, consisting of a total of two records. And, finally, there will be a new session for the blue key starting and ending at time 11.

Detected sessions after having received six input records. Note the two late-arriving data records at t=4 (green) and t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively.

Applying processors and transformers (Processor API integration)

Beyond the aforementioned stateless and stateful transformations, you may also leverage the Processor API from the DSL. There are a number of scenarios where this may be helpful:

  • Customization: You need to implement special, customized logic that is not or not yet available in the DSL.
  • Combining ease-of-use with full flexibility where it ’s needed: Even though you generally prefer to use the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a record’s metadata such as its topic, partition, and offset information. However, you don’t want to switch completely to the Processor API just because of that.
  • Migrating from other tools: You are migrating from other stream processing technologies that provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to migrate completely to the DSL right away.
TransformationDescription
Process
  • KStream -> void

| Terminal operation. Applies a Processor to each record. process() allows you to leverage the Processor API from the DSL. (details) This is essentially equivalent to adding the Processor via Topology#addProcessor() to your processor topology. An example is available in the javadocs.
Transform

  • KStream -> KStream

| Applies a Transformer to each record. transform() allows you to leverage the Processor API from the DSL. (details) Each input record is transformed into zero, one, or more output records (similar to the stateless flatMap). The Transformer must return null for zero output. You can modify the record’s key and value, including their types. Marks the stream for data re-partitioning: Applying a grouping or a join after transform will result in re-partitioning of the records. If possible use transformValues instead, which will not cause data re-partitioning. transform is essentially equivalent to adding the Transformer via Topology#addProcessor() to your processor topology. An example is available in the javadocs.
Transform (values only)

  • KStream -> KStream

| Applies a ValueTransformer to each record, while retaining the key of the original record. transformValues() allows you to leverage the Processor API from the DSL. (details) Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). The ValueTransformer may return null as the new value for a record. transformValues is preferable to transform because it will not cause data re-partitioning. transformValues is essentially equivalent to adding the ValueTransformer via Topology#addProcessor() to your processor topology. An example is available in the javadocs.

The following example shows how to leverage, via the KStream#process() method, a custom Processor that sends an email notification whenever a page view count reaches a predefined threshold.

First, we need to implement a custom stream processor, PopularPageEmailAlert, that implements the Processor interface:

// A processor that sends an alert message about a popular page to a configurable email address
public class PopularPageEmailAlert implements Processor<PageId, Long> {

  private final String emailAddress;
  private ProcessorContext context;

  public PopularPageEmailAlert(String emailAddress) {
    this.emailAddress = emailAddress;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;

    // Here you would perform any additional initializations such as setting up an email client.
  }

  @Override
  void process(PageId pageId, Long count) {
    // Here you would format and send the alert email.
    //
    // In this specific example, you would be able to include information about the page's ID and its view count
    // (because the class implements `Processor<PageId, Long>`).
  }

  @Override
  void close() {
    // Any code for clean up would go here.  This processor instance will not be used again after this call.
  }

}

Tip

Even though we do not demonstrate it in this example, a stream processor can access any available state stores by calling ProcessorContext#getStateStore(). Only such state stores are available that (1) have been named in the corresponding KStream#process() method call (note that this is a different method than Processor#process()), plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only allow for read-only access.

Then we can leverage the PopularPageEmailAlert processor in the DSL via KStream#process.

In Java 8+, using lambda expressions:

KStream<String, GenericRecord> pageViews = ...;

// Send an email notification when the view count of a page reaches one thousand.
pageViews.groupByKey()
         .count()
         .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
         // PopularPageEmailAlert is your custom processor that implements the
         // `Processor` interface, see further down below.
         .process(() -> new PopularPageEmailAlert("alerts@yourcompany.com"));

In Java 7:

// Send an email notification when the view count of a page reaches one thousand.
pageViews.groupByKey().
         .count()
         .filter(
            new Predicate<PageId, Long>() {
              public boolean test(PageId pageId, Long viewCount) {
                return viewCount == 1000;
              }
            })
         .process(
           new ProcessorSupplier<PageId, Long>() {
             public Processor<PageId, Long> get() {
               // PopularPageEmailAlert is your custom processor that implements
               // the `Processor` interface, see further down below.
               return new PopularPageEmailAlert("alerts@yourcompany.com");
             }
           });

Writing streams back to Kafka

Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be re-partitioned on its way to Kafka, depending on the situation.

Writing to KafkaDescription
To
  • KStream -> void

| Terminal operation. Write the records to a Kafka topic. (KStream details) When to provide serdes explicitly:

  • If you do not specify SerDes explicitly, the default SerDes from the configuration are used.
  • You must specify SerDes explicitly via the Produced class if the key and/or value types of the KStream do not match the configured default SerDes.
  • See Data Types and Serialization for information about configuring default SerDes, available SerDes, and implementing your own custom SerDes.

A variant of to exists that enables you to specify how the data is produced by using a Produced instance to specify, for example, a StreamPartitioner that gives you control over how output records are distributed across the partitions of the output topic.

KStream<String, Long> stream = ...;
KTable<String, Long> table = ...;


// Write the stream to the output topic, using the configured default key
// and value serdes of your `StreamsConfig`.
stream.to("my-stream-output-topic");

// Same for table
table.to("my-table-output-topic");

// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults of your `StreamsConfig`).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long());

Causes data re-partitioning if any of the following conditions is true:

  1. If the output topic has a different number of partitions than the stream/table.
  2. If the KStream was marked for re-partitioning.
  3. If you provide a custom StreamPartitioner to explicitly control how to distribute the output records across the partitions of the output topic.
  4. If the key of an output record is null.

Through

  • KStream -> KStream
  • KTable -> KTable

| Write the records to a Kafka topic and create a new stream/table from that topic. Essentially a shorthand for KStream#to() followed by StreamsBuilder#stream(), same for tables. (KStream details) When to provide SerDes explicitly:

  • If you do not specify SerDes explicitly, the default SerDes from the configuration are used.
  • You must specify SerDes explicitly if the key and/or value types of the KStream or KTable do not match the configured default SerDes.
  • See Data Types and Serialization for information about configuring default SerDes, available SerDes, and implementing your own custom SerDes.

A variant of through exists that enables you to specify how the data is produced by using a Produced instance to specify, for example, a StreamPartitioner that gives you control over how output records are distributed across the partitions of the output topic.

StreamsBuilder builder = ...;
KStream<String, Long> stream = ...;
KTable<String, Long> table = ...;

// Variant 1: Imagine that your application needs to continue reading and processing
// the records after they have been written to a topic via ``to()``.  Here, one option
// is to write to an output topic, then read from the same topic by constructing a
// new stream from it, and then begin processing it (here: via `map`, for example).
stream.to("my-stream-output-topic");
KStream<String, Long> newStream = builder.stream("my-stream-output-topic").map(...);

// Variant 2 (better): Since the above is a common pattern, the DSL provides the
// convenience method ``through`` that is equivalent to the code above.
// Note that you may need to specify key and value serdes explicitly, which is
// not shown in this simple example.
KStream<String, Long> newStream = stream.through("user-clicks-topic").map(...);

// ``through`` is also available for tables
KTable<String, Long> newTable = table.through("my-table-output-topic").map(...);

Causes data re-partitioning if any of the following conditions is true:

  1. If the output topic has a different number of partitions than the stream/table.
  2. If the KStream was marked for re-partitioning.
  3. If you provide a custom StreamPartitioner to explicitly control how to distribute the output records across the partitions of the output topic.
  4. If the key of an output record is null.

Note

When you want to write to systems other than Kafka: Besides writing the data back to Kafka, you can also apply a custom processor as a stream sink at the end of the processing to, for example, write to external databases. First, doing so is not a recommended pattern - we strongly suggest to use the Kafka Connect API instead. However, if you do use such a sink processor, please be aware that it is now your responsibility to guarantee message delivery semantics when talking to such external systems (e.g., to retry on delivery failure or to prevent message duplication).

Previous Next

4 - Processor API

Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic.

Table of Contents

  • Overview
  • Defining a Stream Processor
  • State Stores
    • Defining and creating a State Store
    • Fault-tolerant State Stores
    • Enable or Disable Fault Tolerance of State Stores (Store Changelogs)
    • Implementing Custom State Stores
  • Connecting Processors and State Stores

Overview

The Processor API can be used to implement both stateless as well as stateful operations, where the latter is achieved through the use of state stores.

Tip

Combining the DSL and the Processor API: You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the section Applying processors and transformers (Processor API integration).

For a complete list of available API functionality, see the Streams API docs.

Defining a Stream Processor

A stream processor is a node in the processor topology that represents a single processing step. With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology.

You can define a customized stream processor by implementing the Processor interface, which provides the process() API method. The process() method is called on each of the received records.

The Processor interface also has an init() method, which is called by the Kafka Streams library during task construction phase. Processor instances should perform any required initialization in this method. The init() method passes in a ProcessorContext instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation function (via ProcessorContext#schedule()), to forward a new record as a key-value pair to the downstream processors (via ProcessorContext#forward()), and to commit the current processing progress (via ProcessorContext#commit()).

Specifically, ProcessorContext#schedule() accepts a user Punctuator callback interface, which triggers its punctuate() API method periodically based on the PunctuationType. The PunctuationType determines what notion of time is used for the punctuation scheduling: either stream-time or wall-clock-time (by default, stream-time is configured to represent event-time via TimestampExtractor). When stream-time is used, punctuate() is triggered purely by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there is no new input data arriving, stream-time is not advanced and thus punctuate() is not called.

For example, if you schedule a Punctuator function every 10 seconds based on PunctuationType.STREAM_TIME and if you process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then punctuate() would be called 6 times. This happens regardless of the time required to actually process those records. punctuate() would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.

When wall-clock-time (i.e. PunctuationType.WALL_CLOCK_TIME) is used, punctuate() is triggered purely by the wall-clock time. Reusing the example above, if the Punctuator function is scheduled based on PunctuationType.WALL_CLOCK_TIME, and if these 60 records were processed within 20 seconds, punctuate() is called 2 times (one time every 10 seconds). If these 60 records were processed within 5 seconds, then no punctuate() is called at all. Note that you can schedule multiple Punctuator callbacks with different PunctuationType types within the same processor by calling ProcessorContext#schedule() multiple times inside init() method.

Attention

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

The following example Processor defines a simple word-count algorithm and the following actions are performed:

  • In the init() method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.

  • In the process() method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).

  • In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.

    public class WordCountProcessor implements Processor<String, String> {

    private ProcessorContext context; private KeyValueStore<String, Long> kvStore;

    @Override @SuppressWarnings(“unchecked”) public void init(ProcessorContext context) { // keep the processor context locally because we need it in punctuate() and commit() this.context = context;

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");
    
      // schedule a punctuate() method every 1000 milliseconds based on stream-time
      this.context.schedule(1000, PunctuationType.STREAM_TIME, (timestamp) -> {
          KeyValueIterator<String, Long> iter = this.kvStore.all();
          while (iter.hasNext()) {
              KeyValue<String, Long> entry = iter.next();
              context.forward(entry.key, entry.value.toString());
          }
          iter.close();
    
          // commit the current processing progress
          context.commit();
      });
    

    }

    @Override public void punctuate(long timestamp) { // this method is deprecated and should not be used anymore }

    @Override public void close() { // close any resources managed by this processor // Note: Do not close any StateStores as these are managed by the library }

    }

Note

Stateful processing with state stores: The WordCountProcessor defined above can access the currently received record in its process() method, and it can leverage state stores to maintain processing states to, for example, remember recently arrived records for stateful processing needs like aggregations and joins. For more information, see the state stores documentation.

State Stores

To implement a stateful Processor or Transformer, you must provide one or more state stores to the processor or transformer (stateless processors or transformers do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be interactively queried from other applications, such as a NodeJS-based dashboard or a microservice implemented in Scala or Go.

The available state store types in Kafka Streams have fault tolerance enabled by default.

Defining and creating a State Store

You can either use one of the available store types or implement your own custom store type. It’s common practice to leverage an existing store type via the Stores factory.

Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. Rather, you define state stores indirectly by creating a so-called StoreBuilder. This builder is used by Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where needed.

The following store types are available out of the box.

Store TypeStorage EngineFault-tolerant?Description
Persistent KeyValueStore<K, V>RocksDBYes (enabled by default)
  • The recommended store type for most use cases.

  • Stores its data on local disk.

  • Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.

  • RocksDB settings can be fine-tuned, see RocksDB configuration.

  • Available store variants: time window key-value store, session window key-value store.

    // Creating a persistent key-value store: // here, we create a KeyValueStore<String, Long> named “persistent-counts”. import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores;

    // Using a KeyValueStoreBuilder to build a KeyValueStore. StoreBuilder<KeyValueStore<String, Long» countStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(“persistent-counts”), Serdes.String(), Serdes.Long()); KeyValueStore<String, Long> countStore = countStoreSupplier.build();

In-memory KeyValueStore<K, V> | - | Yes (enabled by default) |

  • Stores its data in memory.

  • Storage capacity: managed local state must fit into memory (heap space) of an application instance.

  • Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts.

    // Creating an in-memory key-value store: // here, we create a KeyValueStore<String, Long> named “inmemory-counts”. import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores;

    // Using a KeyValueStoreBuilder to build a KeyValueStore. StoreBuilder<KeyValueStore<String, Long» countStoreSupplier = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(“inmemory-counts”), Serdes.String(), Serdes.Long()); KeyValueStore<String, Long> countStore = countStoreSupplier.build();

Fault-tolerant State Stores

To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one machine to another when elastically adding or removing capacity from your application. This topic is sometimes referred to as the state store’s associated changelog topic , or its changelog. For example, if you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can enable or disable this backup feature for a state store.

By default, persistent key-value stores are fault-tolerant. They are backed by a compacted changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.

Similarly, persistent window stores are fault-tolerant. They are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The default retention setting is Windows#maintainMs() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.

When you open an Iterator from a state store you must call close() on the iterator when you are done working with it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, you may encounter an OOM error.

Enable or Disable Fault Tolerance of State Stores (Store Changelogs)

You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through enableLogging() and disableLogging(). You can also fine-tune the associated topic’s configuration if needed.

Example for disabling fault-tolerance:

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingDisabled(); // disable backing up the store to a changelog topic

Attention

If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any standby replicas.

Here is an example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config from kafka.log.LogConfig. Unrecognized configs will be ignored.

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

Map<String, String> changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put("min.insyc.replicas", "1")

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingEnabled(changlogConfig); // enable changelogging, with custom changelog settings

Implementing Custom State Stores

You can use the built-in state store types or implement your own. The primary interface to implement for the store is org.apache.kafka.streams.processor.StateStore. Kafka Streams also has a few extended interfaces such as KeyValueStore.

You also need to provide a “builder” for the store by implementing the org.apache.kafka.streams.state.StoreBuilder interface, which Kafka Streams uses to create instances of your store.

Connecting Processors and State Stores

Now that a processor (WordCountProcessor) and the state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by using the Topology instance. In addition, you can add source processors with the specified Kafka topics to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology.

Here is an example implementation:

Topology builder = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreBuilder, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

Here is a quick explanation of this example:

  • A source processor node named "Source" is added to the topology using the addSource method, with one Kafka topic "source-topic" fed to it.
  • A processor node named "Process" with the pre-defined WordCountProcessor logic is then added as the downstream processor of the "Source" node using the addProcessor method.
  • A predefined persistent key-value state store is created and associated with the "Process" node, using countStoreBuilder.
  • A sink processor node is then added to complete the topology using the addSink method, taking the "Process" node as its upstream processor and writing to a separate "sink-topic" Kafka topic.

In this topology, the "Process" stream processor node is considered a downstream processor of the "Source" node, and an upstream processor of the "Sink" node. As a result, whenever the "Source" node forwards a newly fetched record from Kafka to its downstream "Process" node, the WordCountProcessor#process() method is triggered to process the record and update the associated state store. Whenever context#forward() is called in the WordCountProcessor#punctuate() method, the aggregate key-value pair will be sent via the "Sink" processor node to the Kafka topic "sink-topic". Note that in the WordCountProcessor implementation, you must refer to the same store name "Counts" when accessing the key-value store, otherwise an exception will be thrown at runtime, indicating that the state store cannot be found. If the state store is not associated with the processor in the Topology code, accessing it in the processor’s init() method will also throw an exception at runtime, indicating the state store is not accessible from this processor.

Now that you have fully defined your processor topology in your application, you can proceed to running the Kafka Streams application.

Previous Next

5 - Data Types and Serialization

Data Types and Serialization

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

  • By setting default SerDes via a StreamsConfig instance.
  • By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.

Table of Contents

  • Configuring SerDes
  • Overriding default SerDes
  • Available SerDes
    • Primitive and basic types
    • JSON
    • Implementing custom serdes

Configuring SerDes

SerDes specified in the Streams configuration via StreamsConfig are used as the default in your Kafka Streams application.

    import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

StreamsConfig config = new StreamsConfig(settings);

Overriding default SerDes

You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:

    import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));

If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:

    import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));

Available SerDes

Primitive and basic types

Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as byte[] in its kafka-clients Maven artifact:

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.

Data typeSerde
byte[]Serdes.ByteArray(), Serdes.Bytes() (see tip below)
ByteBufferSerdes.ByteBuffer()
DoubleSerdes.Double()
IntegerSerdes.Integer()
LongSerdes.Long()
StringSerdes.String()

Tip

Bytes is a wrapper for Java’s byte[] (byte array) that supports proper equality and ordering semantics. You may want to consider using Bytes instead of byte[] in your applications.

JSON

The code examples of Kafka Streams also include a basic serde implementation for JSON:

* [JsonPOJOSerializer](https://github.com/apache/kafka/blob/1.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java)
* [JsonPOJODeserializer](https://github.com/apache/kafka/blob/1.1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java)

You can construct a unified JSON serde from the JsonPOJOSerializer and JsonPOJODeserializer via Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>). The PageViewTypedDemo example demonstrates how to use this JSON serde.

Implementing custom SerDes

If you need to implement custom SerDes, your best starting point is to take a look at the source code references of existing SerDes (see previous section). Typically, your workflow will be similar to:

1. Write a _serializer_ for your data type `T` by implementing [org.apache.kafka.common.serialization.Serializer](https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java).
2. Write a _deserializer_ for `T` by implementing [org.apache.kafka.common.serialization.Deserializer](https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java).
3. Write a _serde_ for `T` by implementing [org.apache.kafka.common.serialization.Serde](https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java), which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in [Serdes](https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java) such as `Serdes.serdeFrom(Serializer<T>, Deserializer<T>)`.

Previous Next

* [Documentation](/documentation)
* [Kafka Streams](/streams)
* [Developer Guide](/streams/developer-guide/)

6 - Testing a Streams Application

Testing a Streams Application

To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base. Example pom.xml snippet when using Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>1.1.0</version>
    <scope>test</scope>
</dependency>

The test-utils package provides a TopologyTestDriver that can be used pipe data through a Topology that is either assembled manually using Processor API or via the DSL using StreamsBuilder. The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores.

// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();

// setup test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

The test driver accepts ConsumerRecords with key and value type byte[]. Because byte[] types can be problematic, you can use the ConsumerRecordFactory to generate those records by providing regular Java types for key and values and the corresponding serializers.

ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

To verify the output, the test driver produces ProducerRecords with key and value type byte[]. For result verification, you can specify corresponding deserializers when reading the output record from the driver.

ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

For result verification, you can use OutputVerifier. It offers helper methods to compare only certain parts of the result record: for example, you might only care about the key and value, but not the timestamp of the result record.

OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match

TopologyTestDriver supports punctuations, too. Event-time punctuations are triggered automatically based on the processed records’ timestamps. Wall-clock-time punctuations can also be triggered by advancing the test driver’s wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).

testDriver.advanceWallClockTime(20L);

Additionally, you can access state stores via the test driver before or after a test. Accessing stores before a test is useful to pre-populate a store with some initial values. After data was processed, expected updates to the store can be verified.

KeyValueStore store = testDriver.getKeyValueStore("store-name");

Note, that you should always close the test driver at the end to make sure all resources are release properly.

testDriver.close();

Example

The following example demonstrates how to use the test driver and helper classes. The example creates a topology that computes the maximum value per key using a key-value-store. While processing, no output is generated, but only the store is updated. Output is only sent downstream based on event-time and wall-clock punctuations.

private TopologyTestDriver testDriver;
private KeyValueStore<String, Long> store;

private StringDeserializer stringDeserializer = new StringDeserializer();
private LongDeserializer longDeserializer = new LongDeserializer();
private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new LongSerializer());

@Before
public void setup() {
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
    topology.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("aggStore"),
            Serdes.String(),
            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
        "aggregator");
    topology.addSink("sinkProcessor", "result-topic", "aggregator");

    // setup test driver
    Properties config = new Properties();
    config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
    config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
    testDriver = new TopologyTestDriver(topology, config);

    // pre-populate store
    store = testDriver.getKeyValueStore("aggStore");
    store.put("a", 21L);
}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

@Test
public void shouldNotUpdateStoreForSmallerValue() {
    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
    Assert.assertThat(store.get("a"), equalTo(21L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

@Test
public void shouldNotUpdateStoreForLargerValue() {
    testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
    Assert.assertThat(store.get("a"), equalTo(42L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

@Test
public void shouldUpdateStoreForNewKey() {
    testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
    Assert.assertThat(store.get("b"), equalTo(21L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

@Test
public void shouldPunctuateIfEvenTimeAdvances() {
    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);

    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));

    testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
    testDriver.advanceWallClockTime(60000);
    OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
    Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}

public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
    @Override
    public Processor<String, Long> get() {
        return new CustomMaxAggregator();
    }
}

public class CustomMaxAggregator implements Processor<String, Long> {
    ProcessorContext context;
    private KeyValueStore<String, Long> store;

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
            @Override
            public void punctuate(long timestamp) {
                flushStore();
            }
        });
        context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long timestamp) {
                flushStore();
            }
        });
        store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
    }

    @Override
    public void process(String key, Long value) {
        Long oldValue = store.get(key);
        if (oldValue == null || value > oldValue) {
            store.put(key, value);
        }
    }

    private void flushStore() {
        KeyValueIterator<String, Long> it = store.all();
        while (it.hasNext()) {
            KeyValue<String, Long> next = it.next();
            context.forward(next.key, next.value);
        }
    }

    @Override
    public void punctuate(long timestamp) {} // deprecated; not used

    @Override
    public void close() {}
}

Previous Next

7 - Interactive Queries

Interactive Queries

Interactive queries allow you to leverage the state of your application from outside your application. The Kafka Streams enables your applications to be queryable.

Table of Contents

  • Querying local state stores for an app instance
    • Querying local key-value stores
    • Querying local window stores
    • Querying local custom state stores
  • Querying remote state stores for the entire app
    • Adding an RPC layer to your application
    • Exposing the RPC endpoints of your application
    • Discovering and accessing application instances and their local state stores
  • Demo applications

The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

There are local and remote components to interactively querying the state of your application.

Local state An application instance can query the locally managed portion of the state and directly query its own local state stores. You can use the corresponding local data in other parts of your application code, as long as it doesn’t require calling the Kafka Streams API. Querying state stores is always read-only to guarantee that the underlying state stores will never be mutated out-of-band (e.g., you cannot add new entries). State stores should only be mutated by the corresponding processor topology and the input data it operates on. For more information, see Querying local state stores for an app instance. Remote state

To query the full state of your application, you must connect the various fragments of the state, including:

  • query local state stores
  • discover all running instances of your application in the network and their state stores
  • communicate with these instances over the network (e.g., an RPC layer)

Connecting these fragments enables communication between instances of the same app and communication from other applications for interactive queries. For more information, see Querying remote state stores for the entire app.

Kafka Streams natively provides all of the required functionality for interactively querying the state of your application, except if you want to expose the full state of your application via interactive queries. To allow application instances to communicate over the network, you must add a Remote Procedure Call (RPC) layer to your application (e.g., REST API).

This table shows the Kafka Streams native communication support for various procedures.

ProcedureApplication instanceEntire application
Query local state stores of an app instanceSupportedSupported
Make an app instance discoverable to othersSupportedSupported
Discover all running app instances and their state storesSupportedSupported
Communicate with app instances over the network (RPC)SupportedNot supported (you must configure)

Querying local state stores for an app instance

A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the application’s entire state. Querying the local stores on an instance will only return data locally available on that particular instance.

The method KafkaStreams#store(...) finds an application instance’s local state stores by name and type.

Every application instance can directly query any of its local state stores.

The name of a state store is defined when you create the store. You can create the store explicitly by using the Processor API or implicitly by using stateful operations in the DSL.

The type of a state store is defined by QueryableStoreType. You can access the built-in types via the class QueryableStoreTypes. Kafka Streams currently has two built-in types:

  • A key-value store QueryableStoreTypes#keyValueStore(), see Querying local key-value stores.
  • A window store QueryableStoreTypes#windowStore(), see Querying local window stores.

You can also implement your own QueryableStoreType as described in section Querying local custom state stores.

Note

Kafka Streams materializes one state store per stream partition. This means your application will potentially manage many underlying state stores. The API enables you to query all of the underlying stores without having to know which partition the data is in.

Querying local key-value stores

To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value store named “CountsKeyValueStore”. This store will hold the latest count for any word that is found on the topic “word-count-input”.

StreamsConfig config = ...;
StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
  .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));

// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

After the application has started, you can get access to “CountsKeyValueStore” and then query it via the ReadOnlyKeyValueStore API:

// Get the key-value store CountsKeyValueStore
ReadOnlyKeyValueStore<String, Long> keyValueStore =
    streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());

// Get value by key
System.out.println("count for hello:" + keyValueStore.get("hello"));

// Get the values for a range of keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + value);
}

// Get the values for all of the keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + value);
}

You can also materialize the results of stateless operators by using the overloaded methods that take a queryableStoreName as shown in the example below:

StreamsConfig config = ...;
StreamsBuilder builder = ...;
KTable<String, Integer> regionCounts = ...;

// materialize the result of filtering corresponding to odd numbers
// the "queryableStoreName" can be subsequently queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 != 0),
  Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>as("queryableStoreName"));

// do not materialize the result of filtering corresponding to even numbers
// this means that these results will not be materialized and cannot be queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 == 0));

Querying local window stores

A window store will potentially have many results for any given key because the key can be present in multiple windows. However, there is only one result per window for a given key.

To query a local window store, you must first create a topology with a window store. This example creates a window store named “CountsWindowStore” that contains the counts for words in 1-minute windows.

StreamsConfig config = ...;
StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
  .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.windowedBy(TimeWindows.of(60000))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

After the application has started, you can get access to “CountsWindowStore” and then query it via the ReadOnlyWindowStore API:

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get *all* available windows we fetch windows from the beginning of time until now.
long timeFrom = 0; // beginning of time = oldest available
long timeTo = System.currentTimeMillis(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
  System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
}

Querying local custom state stores

Note

Only the Processor API supports custom state stores.

Before querying the custom state stores you must implement these interfaces:

  • Your custom state store must implement StateStore.
  • You must have an interface to represent the operations available on the store.
  • You must provide an implementation of StoreBuilder for creating instances of your store.
  • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

The class/interface hierarchy for your custom store might look something like:

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder {
  // implementation of the supplier for MyCustomStore
}

To make this store queryable you must:

  • Provide an implementation of QueryableStoreType.
  • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

Here is how to implement QueryableStoreType:

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}

A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage multiple local instances of a particular state store. The wrapper class hides this complexity and lets you query a “logical” state store by name without having to know about all of the underlying local instances of that state store.

When implementing your wrapper class you must use the StateStoreProvider interface to get access to the underlying instances of your store. StateStoreProvider#stores(String storeName, QueryableStoreType<T> queryableStoreType) returns a List of state stores with the given storeName and of the type as defined by queryableStoreType.

Here is an example implementation of the wrapper follows (Java 8+):

// We strongly recommended implementing a read-only interface
// to restrict usage of the store to safe read operations!
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {

  private final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType;
  private final String storeName;
  private final StateStoreProvider provider;

  public CustomStoreTypeWrapper(final StateStoreProvider provider,
                                final String storeName,
                                final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType) {

    // ... assign fields ...
  }

  // Implement a safe read method
  @Override
  public V read(final K key) {
    // Get all the stores with storeName and of customStoreType
    final List<MyReadableCustomStore<K, V>> stores = provider.getStores(storeName, customStoreType);
    // Try and find the value for the given key
    final Optional<V> value = stores.stream().filter(store -> store.read(key) != null).findFirst();
    // Return the value if it exists
    return value.orElse(null);
  }

}

You can now find and query your custom store:

StreamsConfig config = ...;
Topology topology = ...;
ProcessorSupplier processorSuppler = ...;

// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store") //...;
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
topology.addStateStore(customStoreBuilder, "the-processor");

KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");

Querying remote state stores for the entire app

To query remote states for the entire app, you must expose the application’s full state to other applications, including applications that are running on different machines.

For example, you have a Kafka Streams application that processes user events in a multi-player video game, and you want to retrieve the latest status of each user directly and display it in a mobile app. Here are the required steps to make the full state of your application queryable:

  1. Add an RPC layer to your application so that the instances of your application can be interacted with via the network (e.g., a REST API, Thrift, a custom protocol, and so on). The instances must respond to interactive queries. You can follow the reference examples provided to get started.
  2. Expose the RPC endpoints of your application’s instances via the application.server configuration setting of Kafka Streams. Because RPC endpoints must be unique within a network, each instance has its own value for this configuration setting. This makes an application instance discoverable by other instances.
  3. In the RPC layer, discover remote application instances and their state stores and query locally available state stores to make the full state of your application queryable. The remote application instances can forward queries to other app instances if a particular instance lacks the local data to respond to a query. The locally available state stores can directly respond to queries.

Discover any running instances of the same application as well as the respective RPC endpoints they expose for interactive queries

Adding an RPC layer to your application

There are many ways to add an RPC layer. The only requirements are that the RPC layer is embedded within the Kafka Streams application and that it exposes an endpoint that other application instances and applications can connect to.

Exposing the RPC endpoints of your application

To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in StreamsConfig. The application.server property defines a unique host:port pair that points to the RPC endpoint of the respective instance of a Kafka Streams application. The value of this configuration property will vary across the instances of your application. When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.

Tip

Consider leveraging the exposed RPC endpoints of your application for further functionality, such as piggybacking additional inter-application communication that goes beyond interactive queries.

This example shows how to configure and run a Kafka Streams application that supports the discovery of its state stores.

Properties props = new Properties();
// Set the unique RPC endpoint of this application instance through which it
// can be interactively queried.  In a real application, the value would most
// probably not be hardcoded but derived dynamically.
String rpcEndpoint = "host1:4460";
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
// ... further settings may follow here ...

StreamsConfig config = new StreamsConfig(props);
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "word-count-input");

final KGroupedStream<String, String> groupedByWord = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
    .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));

// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("word-count"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Then, create and start the actual RPC service for remote access to this
// application instance's local state stores.
//
// This service should be started on the same host and port as defined above by
// the property `StreamsConfig.APPLICATION_SERVER_CONFIG`.  The example below is
// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)
// that showcase how to implement such a service to get you started.
MyRPCService rpcService = ...;
rpcService.listenAt(rpcEndpoint);

Discovering and accessing application instances and their local state stores

The following methods return StreamsMetadata objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.

  • KafkaStreams#allMetadata(): find all instances of this application
  • KafkaStreams#allMetadataForStore(String storeName): find those applications instances that manage local instances of the state store “storeName”
  • KafkaStreams#metadataForKey(String storeName, K key, Serializer<K> keySerializer): using the default stream partitioning strategy, find the one application instance that holds the data for the given key in the given state store
  • KafkaStreams#metadataForKey(String storeName, K key, StreamPartitioner<K, ?> partitioner): using partitioner, find the one application instance that holds the data for the given key in the given state store

Attention

If application.server is not configured for an application instance, then the above methods will not find any StreamsMetadata for it.

For example, we can now find the StreamsMetadata for the state store named “word-count” that we defined in the code example shown in the previous section:

KafkaStreams streams = ...;
// Find all the locations of local instances of the state store named "word-count"
Collection<StreamsMetadata> wordCountHosts = streams.allMetadataForStore("word-count");

// For illustrative purposes, we assume using an HTTP client to talk to remote app instances.
HttpClient http = ...;

// Get the word count for word (aka key) 'alice': Approach 1
//
// We first find the one app instance that manages the count for 'alice' in its local state stores.
StreamsMetadata metadata = streams.metadataForKey("word-count", "alice", Serdes.String().serializer());
// Then, we query only that single app instance for the latest count of 'alice'.
// Note: The RPC URL shown below is fictitious and only serves to illustrate the idea.  Ultimately,
// the URL (or, in general, the method of communication) will depend on the RPC layer you opted to
// implement.  Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase
// how to implement such an RPC layer.
Long result = http.getLong("http://" + metadata.host() + ":" + metadata.port() + "/word-count/alice");

// Get the word count for word (aka key) 'alice': Approach 2
//
// Alternatively, we could also choose (say) a brute-force approach where we query every app instance
// until we find the one that happens to know about 'alice'.
Optional<Long> result = streams.allMetadataForStore("word-count")
    .stream()
    .map(streamsMetadata -> {
        // Construct the (fictituous) full endpoint URL to query the current remote application instance
        String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
        // Read and return the count for 'alice', if any.
        return http.getLong(url);
    })
    .filter(s -> s != null)
    .findFirst();

At this point the full state of the application is interactively queryable:

  • You can discover the running instances of the application and the state stores they manage locally.
  • Through the RPC layer that was added to the application, you can communicate with these application instances over the network and query them for locally available state.
  • The application instances are able to serve such queries because they can directly query their own local state stores and respond via the RPC layer.
  • Collectively, this allows us to query the full state of the entire application.

To see an end-to-end application with interactive queries, review the demo applications.

Previous Next

8 - Memory Management

Memory Management

You can specify the total memory (RAM) size used for internal caching and compacting of records. This caching happens before the records are written to state stores or forwarded downstream to other nodes.

The record caches are implemented slightly different in the DSL and Processor API.

Table of Contents

  • Record caches in the DSL
  • Record caches in the Processor API
  • Other memory usage

Record caches in the DSL

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is leveraged by the following KTable instances:

  • Source KTable: KTable instances that are created via StreamsBuilder#table() or StreamsBuilder#globalTable().
  • Aggregation KTable: instances of KTable that are created as a result of aggregations.

For such KTable instances, the record cache is used for:

  • Internal caching and compacting of output records before they are written by the underlying stateful processor node to its internal state stores.
  • Internal caching and compacting of output records before they are forwarded from the underlying stateful processor node to any of its downstream processor nodes.

Use the following example to understand the behaviors with and without record caching. In this example, the input is a KStream<String, Integer> with the records <K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>. The focus in this example is on the records with key == A.

  • An aggregation computes the sum of record values, grouped by key, for the input and returns a KTable<String, Integer>.
* **Without caching** : a sequence of output records is emitted for key `A` that represent changes in the resulting aggregation table. The parentheses (`()`) denote changes, the left number is the new aggregate value and the right number is the old aggregate value: `<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>`.
* **With caching** : a single output record is emitted for key `A` that would likely be compacted in the cache, leading to a single output record of `<A, (321, null)>`. This record is written to the aggregation's internal state store and forwarded to any downstream operations.

The cache size is specified through the cache.max.bytes.buffering parameter, which is a global setting per processing topology:

// Enable record cache of size 10 MB.
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);

This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with T threads and C bytes allocated for caching, each thread will have an even C/T bytes to construct its own cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of caches across threads happens.

The basic API for the cache is made of put() and get() calls. Records are evicted using a simple LRU scheme after the cache size is reached. The first time a keyed record R1 = <K1, V1> finishes processing at a node, it is marked as dirty in the cache. Any other keyed record R2 = <K1, V2> with the same key K1 that is processed on that node during that time will overwrite <K1, V1>, this is referred to as “being compacted”. This has the same effect as Kafka’s log compaction, but happens earlier, while the records are still in memory, and within your client-side application, rather than on the server-side (i.e. the Kafka broker). After flushing, R2 is forwarded to the next processing node and then written to the local state store.

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits. Both commit.interval.ms and cache.max.bytes.buffering are global parameters. As such, it is not possible to specify different parameters for individual nodes.

Here are example settings for both parameters based on desired scenarios.

  • To turn off caching the cache size can be set to zero:
// Disable record cache
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Turning off caching might result in high write traffic for the underlying RocksDB store. With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled. Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off.

For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB. For more information, see the RocksDB config.

  • To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:
Properties streamsConfiguration = new Properties();
// Enable record cache of size 10 MB.
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// Set commit interval to 1 second.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.

  • When the cache is disabled (a), all of the input records will be output.

  • When the cache is enabled (b):

* Most records are output at the end of commit intervals (e.g., at `t1` a single blue record is output, which is the final over-write of the blue key up to that time).
* Some records are output because of cache pressure (i.e. before the end of a commit interval). For example, see the red record before `t2`. With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor.
* The total number of records output has been reduced from 15 to 8.

Record caches in the Processor API

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is used for internal caching and compacting of output records before they are written from a stateful processor node to its state stores.

The record cache in the Processor API does not cache or compact any output records that are being forwarded downstream. This means that all downstream processor nodes can see all records, whereas the state stores see a reduced number of records. This does not impact correctness of the system, but is a performance optimization for the state stores. For example, with the Processor API you can store a record in a state store while forwarding a different value downstream.

Following from the example first shown in section State Stores, to enable caching, you can add the withCachingEnabled call (note that caches are disabled by default and there is no explicit withDisableCaching call).

Tip: Caches are disabled by default and there is no explicit disableCaching call).

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withCachingEnabled()

Other memory usage

There are other modules inside Apache Kafka that allocate memory during runtime. They include the following:

  • Producer buffering, managed by the producer config buffer.memory.
  • Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e., fetch.max.bytes and fetch.max.wait.ms.
  • Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory. These are controlled by the send.buffer.bytes / receive.buffer.bytes configs.
  • Deserialized objects buffering: after consumer.poll() returns records, they will be deserialized to extract timestamp and buffered in the streams space. Currently this is only indirectly controlled by buffered.records.per.partition.
  • RocksDB’s own memory usage, both on-heap and off-heap; critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size and max_write_buffer_number. These can be specified through the rocksdb.config.setter configuration.

Tip

Iterators should be closed explicitly to release resources: Store iterators (e.g., KeyValueIterator and WindowStoreIterator) must be closed explicitly upon completeness to release resources such as open file handlers and in-memory read buffers, or use try-with-resources statement (available since JDK7) for this Closeable class.

Otherwise, stream application’s memory usage keeps increasing when running until it hits an OOM.

Previous Next

9 - Running Streams Applications

Running Streams Applications

You can run Java applications that use the Kafka Streams library without any additional configuration or requirements.

Table of Contents

  • Starting a Kafka Streams application
  • Elastic scaling of your application
    • Adding capacity to your application
    • Removing capacity from your application
    • State restoration during workload rebalance
    • Determining how many application instances to run

Running Streams Applications

You can run Java applications that use the Kafka Streams library without any additional configuration or requirements. Kafka Streams also provides the ability to receive notification of the various states of the application. The ability to monitor the runtime status is discussed in the monitoring guide.

Table of Contents

  • Starting a Kafka Streams application
  • Elastic scaling of your application
    • Adding capacity to your application
    • Removing capacity from your application
    • State restoration during workload rebalance
    • Determining how many application instances to run

Starting a Kafka Streams application

You can package your Java application as a fat JAR file and then start the application like this:

# Start the application in class `com.example.MyStreamsApp`
# from the fat JAR named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

When you start your application you are launching a Kafka Streams instance of your application. You can run multiple instances of your application. A common scenario is that there are multiple instances of your application running in parallel. For more information, see Parallelism Model.

When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks. If the processor topology defines any state stores, these are also constructed during the initialization period. For more information, see the State restoration during workload rebalance section).

Elastic scaling of your application

Kafka Streams makes your stream processing applications elastic and scalable. You can add and remove processing capacity dynamically during application runtime without any downtime or data loss. This makes your applications resilient in the face of failures and for allows you to perform maintenance as needed (e.g. rolling upgrades).

For more information about this elasticity, see the Parallelism Model section. Kafka Streams leverages the Kafka group management functionality, which is built right into the Kafka wire protocol. It is the foundation that enables the elasticity of Kafka Streams applications: members of a group coordinate and collaborate jointly on the consumption and processing of data in Kafka. Additionally, Kafka Streams provides stateful processing and allows for fault-tolerant state in environments where application instances may come and go at any time.

Adding capacity to your application

If you need more processing capacity for your stream processing application, you can simply start another instance of your stream processing application, e.g. on another machine, in order to scale out. The instances of your application will become aware of each other and automatically begin to share the processing work. More specifically, what will be handed over from the existing instances to the new instances is (some of) the stream tasks that have been run by the existing instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

The various instances of your application each run in their own JVM process, which means that each instance can leverage all the processing capacity that is available to their respective JVM process (minus the capacity that any non-Kafka-Streams part of your application may be using). This explains why running additional instances will grant your application additional processing capacity. The exact capacity you will be adding by running a new instance depends of course on the environment in which the new instance runs: available CPU cores, available main memory and Java heap space, local storage, network bandwidth, and so on. Similarly, if you stop any of the running instances of your application, then you are removing and freeing up the respective processing capacity.

Before adding capacity: only a single instance of your Kafka Streams application is running. At this point the corresponding Kafka consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.

After adding capacity: now two additional instances of your Kafka Streams application are running, and they have automatically joined the application’s Kafka consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.

Removing capacity from your application

To remove processing capacity, you can stop running stream processing application instances (e.g., shut down two of the four instances), it will automatically leave the application’s consumer group, and the remaining instances of your application will automatically take over the processing work. The remaining instances take over the stream tasks that were run by the stopped instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks. The state of a stream task is recreated in the target instance from its changelog topic.

State restoration during workload rebalance

When a task is migrated, the task processing state is fully restored before the application instance resumes processing. This guarantees the correct processing results. In Kafka Streams, state restoration is usually done by replaying the corresponding changelog topic to reconstruct the state store. To minimize changelog-based restoration latency by using replicated local state stores, you can specify num.standby.replicas. When a stream task is initialized or re-initialized on the application instance, its state store is restored like this:

  • If no local state store exists, the changelog is replayed from the earliest to the current offset. This reconstructs the local state store to the most recent snapshot.
  • If a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog.

For more information, see Standby Replicas.

Determining how many application instances to run

The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.

The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.

To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:

  • Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
  • Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.

Previous Next

10 - Managing Streams Application Topics

Managing Streams Application Topics

A Kafka Streams application continuously reads from Kafka topics, processes the read data, and then writes the processing results back into Kafka topics. The application may also auto-create other Kafka topics in the Kafka brokers, for example state store changelogs topics. This section describes the differences these topic types and how to manage the topics and your applications.

Kafka Streams distinguishes between user topics and internal topics.

User topics

User topics exist externally to an application and are read from or written to by the application, including:

Input topics Topics that are specified via source processors in the application’s topology; e.g. via StreamsBuilder#stream(), StreamsBuilder#table() and Topology#addSource(). Output topics Topics that are specified via sink processors in the application’s topology; e.g. via KStream#to(), KTable.to() and Topology#addSink(). Intermediate topics Topics that are both input and output topics of the application’s topology; e.g. via KStream#through().

User topics must be created and manually managed ahead of time (e.g., via the topic tools). If user topics are shared among multiple applications for reading and writing, the application users must coordinate topic management. If user topics are centrally managed, then application users then would not need to manage topics themselves but simply obtain access to them.

Note

You should not use the auto-create topic feature on the brokers to create user topics, because:

  • Auto-creation of topics may be disabled in your Kafka cluster.
  • Auto-creation automatically applies the default topic settings such as the replicaton factor. These default settings might not be what you want for certain output topics (e.g., auto.create.topics.enable=true in the Kafka broker configuration).

Internal topics

Internal topics are used internally by the Kafka Streams application while executing, for example the changelog topics for state stores. These topics are created by the application and are only used by that stream application.

If security is enabled on the Kafka brokers, you must grant the underlying clients admin permissions so that they can create internal topics set. For more information, see Streams Security .

Kafka Streams automatically creates internal repartitioning and changelog topics, and does not use broker auto-topic creation. Therefore, internal topics take on the default broker configurations for new topics. You can override the default configs used when creating these topics by adding any configs from TopicConfig to your StreamsConfig with the prefix StreamsConfig.topicPrefix().

          Properties config = new Properties();
          config.put(StreamsConfig.APPLICATION_ID_CONFIG, "myapp");
          config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), 100);
          KafkaStreams streams = new KafkaStreams(topology, config);

Note

The internal topics follow the naming convention <application.id>-<operatorName>-<suffix>, but this convention is not guaranteed for future releases.

Previous Next

11 - Streams Security

Streams Security

Table of Contents

  • Required ACL setting for secure Kafka clusters
  • Security example

Kafka Streams natively integrates with the Kafka’s security features and supports all of the client-side security features in Kafka. Streams leverages the Java Producer and Consumer API.

To secure your Stream processing applications, configure the security settings in the corresponding Kafka producer and consumer clients, and then specify the corresponding configuration settings in your Kafka Streams application.

Kafka supports cluster encryption and authentication, including a mix of authenticated and unauthenticated, and encrypted and non-encrypted clients. Using security is optional.

Here a few relevant client-side security features:

Encrypt data-in-transit between your applications and Kafka brokers You can enable the encryption of the client-server communication between your applications and the Kafka brokers. For example, you can configure your applications to always use encryption when reading and writing data to and from Kafka. This is critical when reading and writing data across security domains such as internal network, public internet, and partner networks. Client authentication You can enable client authentication for connections from your application to Kafka brokers. For example, you can define that only specific applications are allowed to connect to your Kafka cluster. Client authorization You can enable client authorization of read and write operations by your applications. For example, you can define that only specific applications are allowed to read from a Kafka topic. You can also restrict write access to Kafka topics to prevent data pollution or fraudulent activities.

For more information about the security features in Apache Kafka, see Kafka Security.

Required ACL setting for secure Kafka clusters

When applications are run against a secured Kafka cluster, the principal running the application must have the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.

To avoid providing this permission to your application, you can create the required internal topics manually. If the internal topics exist, Kafka Streams will not try to recreate them. Note, that the internal repartition and changelog topics must be created with the correct number of partitions–otherwise, Kafka Streams will fail on startup. The topics must be created with the same number of partitions as your input topic, or if there are multiple topics, the maximum number of partitions across all input topics. Additionally, changelog topics must be created with log compaction enabled–otherwise, your application might lose data. You can find out more about the names of the required internal topics via Topology#describe(). All internal topics follow the naming pattern <application.id>-<operatorName>-<suffix> where the suffix is either repartition or changelog. Note, that there is no guarantee about this naming pattern in future releases–it’s not part of the public API.

Security example

The purpose is to configure a Kafka Streams application to enable client authentication and encrypt data-in-transit when communicating with its Kafka cluster.

This example assumes that the Kafka brokers in the cluster already have their security setup and that the necessary SSL certificates are available to the application in the local filesystem locations. For example, if you are using Docker then you must also include these SSL certificates in the correct locations within the Docker image.

The snippet below shows the settings to enable client authentication and SSL encryption for data-in-transit between your Kafka Streams application and the Kafka cluster it is reading and writing from:

# Essential security settings to enable client authentication and SSL encryption
bootstrap.servers=kafka.example.com:9093
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Configure these settings in the application for your StreamsConfig instance. These settings will encrypt any data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the Kafka brokers that it is communicating with. Note that this example does not cover client authorization.

// Code of your Java application that uses the Kafka Streams library
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app");
// Where to find secure Kafka brokers.  Here, it's on port 9093.
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
//
// ...further non-security related settings may follow here...
//
// Security settings.
// 1. These settings must match the security settings of the secure Kafka cluster.
// 2. The SSL trust store and key store files must be locally accessible to the application.
settings.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
settings.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
settings.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
StreamsConfig streamsConfiguration = new StreamsConfig(settings);

If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you start it. For example, if you enter an incorrect password for the ssl.keystore.password setting, an error message similar to this would be logged and then the application would terminate:

# Misconfigured ssl.keystore.password
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[...snip...]
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
   java.io.IOException: Keystore was tampered with, or password was incorrect
[...snip...]
Caused by: java.security.UnrecoverableKeyException: Password verification failed

Monitor your Kafka Streams application log files for such error messages to spot any misconfigured applications quickly.

Previous Next

12 - Application Reset Tool

Application Reset Tool

You can reset an application and force it to reprocess its data from scratch by using the application reset tool. This can be useful for development and testing, or when fixing bugs.

The application reset tool handles the Kafka Streams user topics (input, output, and intermediate topics) and internal topics differently when resetting the application.

Here’s what the application reset tool does for each topic type:

  • Input topics: Reset offsets to specified position (by default to the beginning of the topic).
  • Intermediate topics: Skip to the end of the topic, i.e., set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).
  • Internal topics: Delete the internal topic (this automatically deletes any committed offsets).

The application reset tool does not:

  • Reset output topics of an application. If any output (or intermediate) topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.
  • Reset the local environment of your application instances. It is your responsibility to delete the local state on any machine on which an application instance was run. See the instructions in section Step 2: Reset the local environments of your application instances on how to do this.

Prerequisites

  • All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups.

  • Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in application.id) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or your Kafka topics.

  • You should manually delete and re-create any intermediate topics before running the application reset tool. This will free up disk space in Kafka brokers.

  • You should delete and recreate intermediate topics before running the application reset tool, unless the following applies:

* You have external downstream consumers for the application's intermediate topics.
* You are in a development environment where manually deleting and re-creating intermediate topics is unnecessary.

Step 1: Run the application reset tool

Invoke the application reset tool from the command line

<path-to-kafka>/bin/kafka-streams-application-reset

The tool accepts the following parameters:

Option (* = required)                 Description
---------------------                 -----------
* --application-id <String: id>       The Kafka Streams application ID
                                        (application.id).
--bootstrap-servers <String: urls>    Comma-separated list of broker urls with
                                        format: HOST1:PORT1,HOST2:PORT2
                                        (default: localhost:9092)
--by-duration <String: urls>      Reset offsets to offset by duration from
                                        current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name>     Property file containing configs to be
                                        passed to admin clients and embedded
                                        consumer.
--dry-run                             Display the actions that would be
                                        performed without executing the reset
                                        commands.
--from-file <String: urls>        Reset offsets to values defined in CSV
                                        file.
--input-topics <String: list>         Comma-separated list of user input
                                        topics. For these topics, the tool will
                                        reset the offset to the earliest
                                        available offset.
--intermediate-topics <String: list>  Comma-separated list of intermediate user
                                        topics (topics used in the through()
                                        method). For these topics, the tool
                                        will skip to the end.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
                                        'n', where 'n' can be positive or
                                        negative
--to-datetime <String>                Reset offsets to offset from datetime.
                                        Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                         Reset offsets to earliest offset.
--to-latest                           Reset offsets to latest offset.
--to-offset <Long>                    Reset offsets to a specific offset.
--zookeeper                           Zookeeper option is deprecated by
                                        bootstrap.servers, as the reset tool
                                        would no longer access Zookeeper
                                        directly.

Consider the following as reset-offset scenarios for input-topics:

  • by-duration
  • from-file
  • shift-by
  • to-datetime
  • to-earliest
  • to-latest
  • to-offset

Only one of these scenarios can be defined. If not, to-earliest will be executed by default

All the other parameters can be combined as needed. For example, if you want to restart an application from an empty internal state, but not reprocess previous data, simply omit the parameters --input-topics and --intermediate-topics.

Step 2: Reset the local environments of your application instances

For a complete application reset, you must delete the application’s local state directory on any machines where the application instance was run. You must do this before restarting an application instance on the same machine. You can use either of these methods:

  • The API method KafkaStreams#cleanUp() in your application code.
  • Manually delete the corresponding local state directory (default location: /tmp/kafka-streams/<application.id>). For more information, see Streams javadocs.

Previous Next