1 - Writing a Streams Application

Writing a Streams Application

Table of Contents

  • Libraries and Maven artifacts
  • Using Kafka Streams within your application code
  • Testing a Streams application

Any Java or Scala application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

You can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs. If you’re writing a Scala application then you can use the Kafka Streams DSL for Scala library which removes much of the Java/Scala interoperability boilerplate as opposed to working directly with the Java DSL. Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Libraries and Maven artifacts

This section lists the Kafka Streams related libraries that are available for writing your Kafka Streams applications.

You can define dependencies on the following libraries for your Kafka Streams applications.

Group IDArtifact IDVersionDescription
org.apache.kafkakafka-streams4.0.0(Required) Base library for Kafka Streams.
org.apache.kafkakafka-clients4.0.0(Required) Kafka client library. Contains built-in serializers/deserializers.
org.apache.kafkakafka-streams-scala4.0.0(Optional) Kafka Streams DSL for Scala library to write Scala Kafka Streams applications. When not using SBT you will need to suffix the artifact ID with the correct version of Scala your application is using (_2.12, _2.13)

Tip

See the section Data Types and Serialization for more information about Serializers/Deserializers.

Example pom.xml snippet when using Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>4.0.0</version>
</dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-scala_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

Using Kafka Streams within your application code

You can call Kafka Streams from anywhere in your application code, but usually these calls are made within the main() method of your application, or some variant thereof. The basic elements of defining a processing topology within your application are described below.

First, you must create an instance of KafkaStreams.

  • The first argument of the KafkaStreams constructor takes a topology (either StreamsBuilder#build() for the DSL or Topology for the Processor API) that is used to define a topology.
  • The second argument is an instance of java.util.Properties, which defines the configuration for this specific topology.

Code example:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = ...;  // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);

At this point, internal structures are initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the KafkaStreams#start() method:

// Start the Kafka Streams threads
streams.start();

If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started. For more information, see Stream Partitions and Tasks and Threading Model.

To catch any unexpected exceptions, you can set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:

streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
  // here you should examine the throwable/exception and perform an appropriate action!
});

To stop the application instance, call the KafkaStreams#close() method:

// Stop the Kafka Streams threads
streams.close();

To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close.

Here is a shutdown hook example in Java:

// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining instances.

Testing a Streams application

Kafka Streams comes with a test-utils module to help you test your application here.

Previous Next

2 - Configuring a Streams Application

Configuring a Streams Application

Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a java.util.Properties instance.

  1. Create a java.util.Properties instance.

  2. Set the parameters. For example:

    import java.util.Properties; import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Any further settings
settings.put(... , ...);

Configuration parameter reference

This section contains the most common Streams configuration parameters. For a full reference, see the Streams Javadocs.

  • Required configuration parameters
    • application.id
    • bootstrap.servers
  • Recommended configuration parameters for resiliency
    • acks
    • replication.factor
    • min.insync.replicas
    • num.standby.replicas
  • Optional configuration parameters
    • acceptable.recovery.lag
    • default.deserialization.exception.handler (deprecated since 4.0)
    • default.key.serde
    • default.production.exception.handler (deprecated since 4.0)
    • default.timestamp.extractor
    • default.value.serde
    • deserialization.exception.handler
    • enable.metrics.push
    • log.summary.interval.ms
    • max.task.idle.ms
    • max.warmup.replicas
    • num.standby.replicas
    • num.stream.threads
    • probing.rebalance.interval.ms
    • processing.exception.handler
    • processing.guarantee
    • processor.wrapper.class
    • production.exception.handler
    • rack.aware.assignment.non_overlap_cost
    • rack.aware.assignment.strategy
    • rack.aware.assignment.tags
    • rack.aware.assignment.traffic_cost
    • replication.factor
    • rocksdb.config.setter
    • state.dir
    • task.assignor.class
    • topology.optimization
    • windowed.inner.class.serde
  • Kafka consumers and producer configuration parameters
    • Naming
    • Default Values
    • Parameters controlled by Kafka Streams
    • enable.auto.commit

Required configuration parameters

Here are the required Streams configuration parameters.

Parameter NameImportanceDescriptionDefault Value
application.idRequiredAn identifier for the stream processing application. Must be unique within the Kafka cluster.None
bootstrap.serversRequiredA list of host/port pairs to use for establishing the initial connection to the Kafka cluster.None

application.id

(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to all instances of the application. It is recommended to use only alphanumeric characters, . (dot), - (hyphen), and _ (underscore). Examples: "hello_world", "hello_world-v1.0.0"

This ID is used in the following places to isolate resources used by the application from others:

  • As the default Kafka consumer and producer client.id prefix
  • As the Kafka consumer group.id for coordination
  • As the name of the subdirectory in the state directory (cf. state.dir)
  • As the prefix of internal Kafka topic names

Tip: When an application is updated, the application.id should be changed unless you want to reuse the existing data in internal topics and state stores. For example, you could embed the version information within application.id, as my-app-v1.0.0 and my-app-v1.0.2.

bootstrap.servers

(Required) The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092".

Recommended configuration parameters for resiliency

There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:

Parameter NameCorresponding ClientDefault valueConsider setting to
acksProducer (for version <=2.8)acks="1")acks="all"
replication.factor (for broker version 2.3 or older)Streams-13 (broker 2.4+: ensure broker config default.replication.factor=3)
min.insync.replicasBroker12
num.standby.replicasStreams01

Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. The tradeoff from moving to the default values to the recommended ones is that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.

acks

The number of acknowledgments that the leader must have received before considering a request complete. This controls the durability of records that are sent. The possible values are:

  • acks="0" The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the producer won’t generally know of any failures. The offset returned for each record will always be set to -1.
  • acks="1" The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.
  • acks="all" (default since 3.0 release) The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.

For more information, see the Kafka Producer documentation.

replication.factor

See the description here.

min.insync.replicas

The minimum number of in-sync replicas available for replication if the producer is configured with acks="all" (see topic configs).

num.standby.replicas

See the description here.

Properties streamsSettings = new Properties();
// for broker version 2.3 or older
//streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// for version 2.8 or older
//streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Optional configuration parameters

Here are the optional Streams javadocs, sorted by level of importance:

  • High: These are parameters with a default value which is most likely not a good fit for production use. It’s highly recommended to revisit these parameters for production usage.
  • Medium: The default values of these parameters should work for production for many cases, but it’s not uncommon that they are changed, for example to tune performance.
  • Low: It should rarely be necessary to change the value for these parameters. It’s only recommended to change them if there is a very specific issue you want to address.
Parameter NameImportanceDescriptionDefault Value
acceptable.recovery.lagMediumThe maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.10000
application.serverLowA host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single Kafka Streams application. The value of this must be different for each instance of the application.the empty string
buffered.records.per.partitionLowThe maximum number of records to buffer per partition.1000
statestore.cache.max.bytesMediumMaximum number of memory bytes to be used for record caches across all threads.10485760
cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.)MediumMaximum number of memory bytes to be used for record caches across all threads.10485760
client.idMediumAn ID string to pass to the server when making requests. (This setting is passed to the consumer/producer clients used internally by Kafka Streams.)the empty string
commit.interval.msLowThe frequency in milliseconds with which to save the position (offsets in source topics) of tasks.30000 (30 seconds)
default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)MediumException handling class that implements the DeserializationExceptionHandler interface.LogAndContinueExceptionHandler
default.key.serdeMediumDefault serializer/deserializer class for record keys, implements the Serde interface. Must be set by the user or all serdes must be passed in explicitly (see also default.value.serde).null
default.production.exception.handler (Deprecated. Use production.exception.handler instead.)MediumException handling class that implements the ProductionExceptionHandler interface.DefaultProductionExceptionHandler
default.timestamp.extractorMediumTimestamp extractor class that implements the TimestampExtractor interface. See Timestamp ExtractorFailOnInvalidTimestamp
default.value.serdeMediumDefault serializer/deserializer class for record values, implements the Serde interface. Must be set by the user or all serdes must be passed in explicitly (see also default.key.serde).null
default.dsl.storeLow[DEPRECATED] The default state store type used by DSL operators. Deprecated in favor of dsl.store.suppliers.class"ROCKS_DB"
deserialization.exception.handlerMediumException handling class that implements the DeserializationExceptionHandler interface.LogAndContinueExceptionHandler
dsl.store.suppliers.classLowDefines a default state store implementation to be used by any stateful DSL operator that has not explicitly configured the store implementation type. Must implement the org.apache.kafka.streams.state.DslStoreSuppliers interface.BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers
log.summary.interval.msLowThe output interval in milliseconds for logging summary information (disabled if negative).120000 (2 minutes)
enable.metrics.pushLowWhether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.true
max.task.idle.msMediumThis config controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams. The default (zero) does not wait for producers to send more records, but it does wait to fetch data that is already present on the brokers. This default means that for records that are already present on the brokers, Streams will process them in timestamp order. Set to -1 to disable idling entirely and process any locally available data, even though doing so may produce out-of-order processing.0
max.warmup.replicasMediumThe maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.2
metric.reportersLowA list of classes to use as metrics reporters.the empty list
metrics.num.samplesLowThe number of samples maintained to compute metrics.2
metrics.recording.levelLowThe highest recording level for metrics.INFO
metrics.sample.window.msLowThe window of time in milliseconds a metrics sample is computed over.30000 (30 seconds)
num.standby.replicasHighThe number of standby replicas for each task.0
num.stream.threadsMediumThe number of threads to execute stream processing.1
probing.rebalance.interval.msLowThe maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.600000 (10 minutes)
processing.exception.handlerMediumException handling class that implements the ProcessingExceptionHandler interface.LogAndFailProcessingExceptionHandler
processing.guaranteeMediumThe processing mode. Can be either "at_least_once" or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). See Processing Guarantee.."at_least_once"
processor.wrapper.classMediumA class or class name implementing the ProcessorWrapper interface. Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should use the StreamsBuilder#new(TopologyConfig) constructor for DSL applications, and the Topology#new(TopologyConfig) constructor for PAPI applications.
production.exception.handlerMediumException handling class that implements the ProductionExceptionHandler interface.DefaultProductionExceptionHandler
poll.msLowThe amount of time in milliseconds to block waiting for input.100
rack.aware.assignment.strategyLowThe strategy used for rack aware assignment. Acceptable value are "none" (default), "min_traffic", and "balance_suttopology". See Rack Aware Assignment Strategy."none"
List of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over clients with different tag values. See Rack Aware Assignment Tags. the empty list
rack.aware.assignment.non_overlap_costLowCost associated with moving tasks from existing assignment. See Rack Aware Assignment Non-Overlap-Cost.null
rack.aware.assignment.non_overlap_costLowCost associated with cross rack traffic. See Rack Aware Assignment Traffic-Cost.null
replication.factorMediumThe replication factor for changelog topics and repartition topics created by the application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer.-1
retry.backoff.msLowThe amount of time in milliseconds, before a request is retried.100
rocksdb.config.setterMediumThe RocksDB configuration.null
state.cleanup.delay.msLowThe amount of time in milliseconds to wait before deleting state when a partition has migrated.600000 (10 minutes)
state.dirHighDirectory location for state stores./${java.io.tmpdir}/kafka-streams
task.assignor.classMediumA task assignor class or class name implementing the TaskAssignor interface.The high-availability task assignor.
task.timeout.msMediumThe maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised.300000 (5 minutes)
topology.optimizationMediumA configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics), StreamsConfig.SINGLE_STORE_SELF_JOIN (single.store.self.join)."NO_OPTIMIZATION"
upgrade.fromMediumThe version you are upgrading from during a rolling upgrade. See Upgrade Fromnull
windowstore.changelog.additional.retention.msLowAdded to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.86400000 (1 day)
window.size.msLowSets window size for the deserializer in order to calculate window end times.null

acceptable.recovery.lag

The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.

Note: if you set this to Long.MAX_VALUE it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced assignment and migrate tasks to a new instance without first warming them up.

deserialization.exception.handler (deprecated: default.deserialization.exception.handler)

The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue and continue processing. The following library built-in exception handlers are available:

  • LogAndContinueExceptionHandler: This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to deserialize.
  • LogAndFailExceptionHandler. This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.

You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt records into a quarantine topic (think: a “dead letter queue”) for further processing. To do this, use the Producer API to write a corrupted record directly to the quarantine topic. To be more concrete, you can create a separate KafkaProducer object outside the Streams client, and pass in this object as well as the dead letter queue topic name into the Properties map, which then can be retrieved from the configure function call. The drawback of this approach is that “manual” writes are side effects that are invisible to the Kafka Streams runtime library, so they do not benefit from the end-to-end processing guarantees of the Streams API:

public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;

    @Override
    public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {

        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);

        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();

        return DeserializationHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}

production.exception.handler (deprecated: default.production.exception.handler)

The production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the DefaultProductionExceptionHandler that always fails when these exceptions occur.

An exception handler can return FAIL, CONTINUE, or RETRY depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down. CONTINUE will signal that Streams should ignore the issue and continue processing. For RetriableException the handler may return RETRY to tell the runtime to retry sending the failed record (Note: If RETRY is returned for a non-RetriableException it will be treated as FAIL.) If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;

public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}

    public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
                                                     final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

Properties settings = new Properties();

// other various kafka streams settings, e.g. bootstrap servers, application id, etc

settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);

default.timestamp.extractor

A timestamp extractor pulls a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

The default extractor is FailOnInvalidTimestamp. This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client since Kafka version 0.10. Depending on the setting of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters, this extractor provides you with:

  • event-time processing semantics if log.message.timestamp.type is set to CreateTime aka “producer time” (which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s official producer client, the timestamp represents milliseconds since the epoch.
  • ingestion-time processing semantics if log.message.timestamp.type is set to LogAppendTime aka “broker time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.

The FailOnInvalidTimestamp extractor throws an exception if a record contains an invalid (i.e. negative) built-in timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.

If you have data with invalid timestamps and want to process it, then there are two alternative extractors available. Both work on built-in timestamps, but handle invalid timestamps differently.

  • LogAndSkipOnInvalidTimestamp: This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but silently drop the record. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an invalid built-in timestamp in your input data.
  • UsePartitionTimeOnInvalidTimestamp. This extractor returns the record’s built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception.

Another built-in extractor is WallclockTimestampExtractor. This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: System.currentTimeMillis()), which effectively means Streams will operate on the basis of the so-called processing-time of events.

You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or estimate a timestamp. Returning a negative timestamp will result in data loss - the corresponding record will not be processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via previousTimestamp (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
    long timestamp = -1;
    final Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      timestamp = myPojo.getTimestampInMillis();
    }
    if (timestamp < 0) {
      // Invalid timestamp!  Attempt to estimate a new timestamp,
      // otherwise fall back to wall-clock time (processing-time).
      if (previousTimestamp >= 0) {
        return previousTimestamp;
      } else {
        return System.currentTimeMillis();
      }
    }
  }

}

You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

default.key.serde

The default Serializer/Deserializer class for record keys, null unless set by user. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the StreamsBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

default.value.serde

The default Serializer/Deserializer class for record values, null unless set by user. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

  • Whenever data is read from or written to a Kafka topic (e.g., via the StreamsBuilder#stream() and KStream#to() methods).
  • Whenever data is read from or written to a state store.

This is discussed in more detail in Data types and serialization.

rack.aware.assignment.non_overlap_cost

This configuration sets the cost of moving a task from the original assignment computed either by StickyTaskAssignor or HighAvailabilityTaskAssignor. Together with rack.aware.assignment.traffic_cost, they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.traffic_cost, the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting rack.aware.assignment.non_overlap_cost to 10 and rack.aware.assignment.traffic_cost to 1 is more likely to maintain existing assignment than setting rack.aware.assignment.non_overlap_cost to 100 and rack.aware.assignment.traffic_cost to 50.

The default value is null which means default non_overlap_cost in different assignors will be used. In StickyTaskAssignor, it has a default value of 10 and rack.aware.assignment.traffic_cost has a default value of 1, which means maintaining stickiness is preferred in StickyTaskAssignor. In HighAvailabilityTaskAssignor, it has a default value of 1 and rack.aware.assignment.traffic_cost has a default value of 10, which means minimizing cross rack traffic is preferred in HighAvailabilityTaskAssignor.

rack.aware.assignment.strategy

This configuration sets the strategy Kafka Streams uses for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when broker.rack is set on the brokers and client.rack is set on Kafka Streams side. There are two settings for this config:

  • none. This is the default value which means rack aware task assignment will be disabled.
  • min_traffic. This settings means that the rack aware task assigner will compute an assignment which tries to minimize cross rack traffic.
  • balance_subtopology. This settings means that the rack aware task assigner will compute an assignment which will try to balance tasks from same subtopology to different clients and minimize cross rack traffic on top of that.

This config can be used together with rack.aware.assignment.non_overlap_cost and rack.aware.assignment.traffic_cost to balance reducing cross rack traffic and maintaining the existing assignment.

rack.aware.assignment.tags

This configuration sets a list of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over clients with different tag values.

Tags for the Kafka Streams clients can be set via client.tag. prefix. Example:

Client-1                                   | Client-2
_______________________________________________________________________
client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster1           | client.tag.cluster: k8s-cluster1
rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster


Client-3                                   | Client-4
_______________________________________________________________________
client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster2           | client.tag.cluster: k8s-cluster2
rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster

In the above example, we have four Kafka Streams clients across two zones (eu-central-1a, eu-central-1b) and across two clusters (k8s-cluster1, k8s-cluster2). For an active task located on Client-1, Kafka Streams will allocate a standby task on Client-4, since Client-4 has a different zone and a different cluster than Client-1.

rack.aware.assignment.traffic_cost

This configuration sets the cost of cross rack traffic. Together with rack.aware.assignment.non_overlap_cost, they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than rack.aware.assignment.non_overlap_cost, the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting rack.aware.assignment.traffic_cost to 10 and rack.aware.assignment.non_overlap_cost to 1 is more likely to minimize cross rack traffic than setting rack.aware.assignment.traffic_cost to 100 and rack.aware.assignment.non_overlap_cost to 50.

The default value is null which means default traffic cost in different assignors will be used. In StickyTaskAssignor, it has a default value of 1 and rack.aware.assignment.non_overlap_cost has a default value of 10. In HighAvailabilityTaskAssignor, it has a default value of 10 and rack.aware.assignment.non_overlap_cost has a default value of 1.

log.summary.interval.ms

This configuration controls the output interval for summary information. If greater or equal to 0, the summary log will be output according to the set time interval; If less than 0, summary output is disabled.

enable.metrics.push

Kafka Streams metrics can be pushed to the brokers similar to client metrics. Additionally, Kafka Streams allows to enable/disable metric pushing for each embedded client individually. However, pushing Kafka Streams metrics requires that enable.metric.push is enabled on the main-consumer and admin client.

max.task.idle.ms

This configuration controls how long Streams will wait to fetch data in order to provide in-order processing semantics.

When processing a task that has multiple input partitions (as in a join or merge), Streams needs to choose which partition to process the next record from. When all input partitions have locally buffered data, Streams picks the partition whose next record has the lowest timestamp. This has the desirable effect of collating the input partitions in timestamp order, which is generally what you want in a streaming join or merge. However, when Streams does not have any data buffered locally for one of the partitions, it does not know whether the next record for that partition will have a lower or higher timestamp than the remaining partitions’ records.

There are two cases to consider: either there is data in that partition on the broker that Streams has not fetched yet, or Streams is fully caught up with that partition on the broker, and the producers simply haven’t produced any new records since Streams polled the last batch.

The default value of 0 causes Streams to delay processing a task when it detects that it has no locally buffered data for a partition, but there is data available on the brokers. Specifically, when there is an empty partition in the local buffer, but Streams has a non-zero lag for that partition. However, as soon as Streams catches up to the broker, it will continue processing, even if there is no data in one of the partitions. That is, it will not wait for new data to be produced. This default is designed to sacrifice some throughput in exchange for intuitively correct join semantics.

Any config value greater than zero indicates the number of extra milliseconds that Streams will wait if it has a caught-up but empty partition. In other words, this is the amount of time to wait for new data to be produced to the input partitions to ensure in-order processing of data in the event of a slow producer.

The config value of -1 indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp, which achieves maximum throughput at the expense of introducing out-of-order processing.

max.warmup.replicas

The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1.

Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup task can only be promoted to an active task during a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the probing.rebalance.interval.ms config). This means that the maximum rate at which active tasks can be migrated from one Kafka Streams instance to another instance can be determined by (max.warmup.replicas / probing.rebalance.interval.ms).

num.standby.replicas

The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the specified number of replicas per store and keep them up to date as long as there are enough instances running. Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is preferred to restart on an instance that has standby replicas so that the local state store restoration process from its changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of resuming tasks on failover can be found in the State section.

Recommendation: Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability. Increasing the number of standbys requires more client-side storage space. For example, with 1 standby, 2x space is required.

Note: If you enable n standby tasks, you need to provision n+1 KafkaStreams instances.

num.stream.threads

This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these thread. For more information about Kafka Streams threading model, see Threading Model.

probing.rebalance.interval.ms

The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to instances that are caught up and within the acceptable.recovery.lag, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.

processing.exception.handler

The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception handler needs to return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue and continue processing. The following library built-in exception handlers are available:

  • LogAndContinueProcessingExceptionHandler: This handler logs the processing exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to be processed.
  • LogAndFailProcessingExceptionHandler. This handler logs the processing exception and then signals the processing pipeline to stop processing more records.

You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt records into a quarantine topic (think: a “dead letter queue”) for further processing. To do this, use the Producer API to write a corrupted record directly to the quarantine topic. To be more concrete, you can create a separate KafkaProducer object outside the Streams client, and pass in this object as well as the dead letter queue topic name into the Properties map, which then can be retrieved from the configure function call. The drawback of this approach is that “manual” writes are side effects that are invisible to the Kafka Streams runtime library, so they do not benefit from the end-to-end processing guarantees of the Streams API:

public class SendToDeadLetterQueueExceptionHandler implements ProcessingExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;

    @Override
    public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
                                            final Record record,
                                            final Exception exception) {

        log.warn("Exception caught during message processing, sending to the dead queue topic; " +
            "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
            context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
            exception);

        dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), (byte[]) record.key(), (byte[]) record.value(), record.headers()));

        return ProcessingHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}

processing.guarantee

The processing guarantee that should be used. Possible values are "at_least_once" (default) and "exactly_once_v2" (for EOS version 2). Deprecated config options are "exactly_once" (for EOS alpha), and "exactly_once_beta" (for EOS version 2). Using "exactly_once_v2" (or the deprecated "exactly_once_beta") requires broker version 2.5 or newer, while using the deprecated "exactly_once" requires broker version 0.11.0 or newer. Note that if exactly-once processing is enabled, the default for parameter commit.interval.ms changes to 100ms. Additionally, consumers are configured with isolation.level="read_committed" and producers are configured with enable.idempotence=true per default. Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. For development, you can change this configuration by adjusting broker setting transaction.state.log.replication.factor and transaction.state.log.min.isr to the number of brokers you want to use. For more details see Processing Guarantees.

Recommendation: While it is technically possible to use EOS with any replication factor, using a replication factor lower than 3 effectively voids EOS. Thus it is strongly recommended to use a replication factor of 3 (together with min.in.sync.replicas=2). This recommendation applies to all topics (i.e. __transaction_state, __consumer_offsets, Kafka Streams internal topics, and user topics).

processor.wrapper.class

A class or class name implementing the ProcessorWrapper interface. This feature allows you to wrap any of the processors in the compiled topology, including both custom processor implementations and those created by Streams for DSL operators. This can be useful for logging or tracing implementations since it allows access to the otherwise-hidden processor context for DSL operators, and also allows for injecting additional debugging information to an entire application topology with just a single config.

IMPORTANT: This MUST be passed in when creating the topology, and will not be applied unless passed in to the appropriate topology-building constructor. You should use the StreamsBuilder#new(TopologyConfig) constructor for DSL applications, and the Topology#new(TopologyConfig) constructor for PAPI applications.

replication.factor

This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.

Recommendation: Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures. Note that you will require more storage space as well (3x with the replication factor of 3).

rocksdb.config.setter

The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, you can implement RocksDBConfigSetter and provide your custom class via rocksdb.config.setter.

Here is an example that adjusts the memory size consumed by RocksDB.

public static class CustomRocksDBConfig implements RocksDBConfigSetter {
    // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
    private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);

    @Override
    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
        // See #1 below.
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setBlockCache(cache);
        // See #2 below.
        tableConfig.setBlockSize(16 * 1024L);
        // See #3 below.
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);
        // See #4 below.
        options.setMaxWriteBufferNumber(2);
    }

    @Override
    public void close(final String storeName, final Options options) {
        // See #5 below.
        cache.close();
    }
}

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);

Notes for example:

  1. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); Get a reference to the existing table config rather than create a new one, so you don’t accidentally overwrite defaults such as the BloomFilter, which is an important optimization.
  2. tableConfig.setBlockSize(16 * 1024L); Modify the default block size per these instructions from the RocksDB GitHub.
  3. tableConfig.setCacheIndexAndFilterBlocks(true); Do not let the index and filter blocks grow unbounded. For more information, see the RocksDB GitHub.
  4. options.setMaxWriteBufferNumber(2); See the advanced options in the RocksDB GitHub.
  5. cache.close(); To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See RocksJava docs for more details.

state.dir

The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated with the application are created under this subdirectory. When running multiple instances of the same application on a single machine, this path must be unique for each such instance.

task.assignor.class

A task assignor class or class name implementing the org.apache.kafka.streams.processor.assignment.TaskAssignor interface. Defaults to the high-availability task assignor. One possible alternative implementation provided in Apache Kafka is the org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor, which was the default task assignor before KIP-441 and minimizes task movement at the cost of stateful task availability. Alternative implementations of the task assignment algorithm can be plugged into the application by implementing a custom TaskAssignor and setting this config to the name of the custom task assignor class.

topology.optimization

A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics), StreamsConfig.SINGLE_STORE_SELF_JOIN (single.store.self.join).

We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.

These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.

Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to StreamsConfig.OPTIMIZE, you’ll need to pass in your configuration properties when building your topology by using the overloaded StreamsBuilder.build(Properties) method. For example KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties).

windowed.inner.class.serde

Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.

Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.

upgrade.from

The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.

Kafka consumers, producer and admin client configuration parameters

You can specify parameters for the Kafka consumers, producers, and admin client that are used internally. The consumer, producer and admin client settings are defined by specifying parameters in a StreamsConfig instance.

In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:

 Properties streamsSettings = new Properties();
 // Example of a "normal" setting for Kafka Streams
 streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
 // Customize the Kafka consumer settings of your Streams application
 streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

Naming

Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, send.buffer.bytes and receive.buffer.bytes are used to configure TCP buffers; request.timeout.ms and retry.backoff.ms control retries for client request. You can avoid duplicate names by prefix parameter names with consumer., producer., or admin. (e.g., consumer.send.buffer.bytes and producer.send.buffer.bytes).

 Properties streamsSettings = new Properties();
 // same value for consumer, producer, and admin client
 streamsSettings.put("PARAMETER_NAME", "value");
 // different values for consumer and producer
 streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
 streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
 streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
 // alternatively, you can use
 streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
 streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
 streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");

You could further separate consumer configuration by adding different prefixes:

  • main.consumer. for main consumer which is the default consumer of stream source.
  • restore.consumer. for restore consumer which is in charge of state store recovery.
  • global.consumer. for global consumer which is used in global KTable construction.

For example, if you only want to set restore consumer config without touching other consumers’ settings, you could simply use restore.consumer. to set the config.

 Properties streamsSettings = new Properties();
 // same config value for all consumer types
 streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
 // set a different restore consumer config. This would make restore consumer take restore-consumer-value,
 // while main consumer and global consumer stay with general-consumer-value
 streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
 // alternatively, you can use
 streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");

Same applied to main.consumer. and main.consumer., if you only want to specify one consumer type config.

Additionally, to configure the internal repartition/changelog topics, you could use the topic. prefix, followed by any of the standard topic configs.

 Properties streamsSettings = new Properties();
 // Override default for both changelog and repartition topics
 streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
 // alternatively, you can use
 streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");

Default Values

Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions of these configs, see Producer Configs and Consumer Configs.

Parameter NameCorresponding ClientStreams Default
auto.offset.resetConsumerearliest
linger.msProducer100
max.poll.recordsConsumer1000
client.id-<application.id>-<random-UUID>

If EOS is enabled, other parameters have the following default values.

Parameter NameCorresponding ClientStreams Default
transaction.timeout.msProducer10000
delivery.timeout.msProducerInteger.MAX_VALUE

Parameters controlled by Kafka Streams

Some parameters are not configurable by the user. If you supply a value that is different from the default value, your value is ignored. Below is a list of some of these parameters.

Parameter NameCorresponding ClientStreams Default
allow.auto.create.topicsConsumerfalse
group.idConsumerapplication.id
enable.auto.commitConsumerfalse
partition.assignment.strategyConsumerStreamsPartitionAssignor

If EOS is enabled, other parameters are set with the following values.

Parameter NameCorresponding ClientStreams Default
isolation.levelConsumerREAD_COMMITTED
enable.idempotenceProducertrue

client.id

Kafka Streams uses the client.id parameter to compute derived client IDs for internal clients. If you don’t set client.id, Kafka Streams sets it to <application.id>-<random-UUID>.

This value will be used to derive the client IDs of the following internal clients.

Clientclient.id
Consumer<client.id>-StreamThread-<threadIdx>-consumer
Restore consumer<client.id>-StreamThread-<threadIdx>-restore-consumer
Global consumer<client.id>-global-consumer
ProducerFor Non-EOS and EOS v2: <client.id>-StreamThread-<threadIdx>-producer
For EOS v1: <client.id>-StreamThread-<threadIdx>-<taskId>-producer
Admin<client.id>-admin

enable.auto.commit

The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config value to false. Consumers will only commit explicitly via commitSync calls when the Kafka Streams library or a user decides to commit the current processing state.

Previous Next

3 - Streams DSL

Streams DSL

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

Table of Contents

  • Overview
  • Creating source streams from Kafka
  • Transform a stream
    • Stateless transformations
    • Stateful transformations
      • Aggregating
      • Joining
        • Join co-partitioning requirements
        • KStream-KStream Join
        • KTable-KTable Equi-Join
        • KTable-KTable Foreign-Key Join
        • KStream-KTable Join
        • KStream-GlobalKTable Join
      • Windowing
        • Hopping time windows
        • Tumbling time windows
        • Sliding time windows
        • Session Windows
        • Window Final Results
    • Applying processors (Processor API integration)
    • Transformers removal and migration to processors
  • Naming Operators in a Streams DSL application
  • Controlling KTable update rate
  • Using timestamp-based semantics for table processors
  • Writing streams back to Kafka
  • Testing a Streams application
  • Kafka Streams DSL for Scala
    • Sample Usage
    • Implicit Serdes
    • User-Defined Serdes

Overview

In comparison to the Processor API, only the DSL supports:

  • Built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable. Having first-class support for streams and tables is crucial because, in practice, most use cases require not just either streams or databases/tables, but a combination of both. For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your application will be doing is transforming many input streams of customer-related events into an output table that contains a continuously updated 360-degree view of your customers.
  • Declarative, functional programming style with stateless transformations (e.g. map and filter) as well as stateful transformations such as aggregations (e.g. count and reduce), joins (e.g. leftJoin), and windowing (e.g. session windows).

With the DSL, you can define processor topologies (i.e., the logical processing plan) in your application. The steps to accomplish this are:

  1. Specify one or more input streams that are read from Kafka topics.
  2. Compose transformations on these streams.
  3. Write the resulting output streams back to Kafka topics, or expose the processing results of your application directly to other applications through interactive queries (e.g., via a REST API).

After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below.

For a complete list of available API functionality, see also the Streams API docs.

KStream

Only the Kafka Streams DSL has the notion of a KStream.

A KStream is an abstraction of a record stream , where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” -- think: adding more entries to an append-only ledger – because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 4 for alice. Why? Because the second data record would not be considered an update of the previous record. Compare this behavior of KStream to KTable below, which would return 3 for alice.

KTable

Only the Kafka Streams DSL has the notion of a KTable.

A KTable is an abstraction of a changelog stream , where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

To illustrate, let’s imagine the following two data records are being sent to the stream:

(“alice”, 1) –> (“alice”, 3)

If your stream processing application were to sum the values per user, it would return 3 for alice. Why? Because the second data record would be considered an update of the previous record.

Effects of Kafka’s log compaction: Another way of thinking about KStream and KTable is as follows: If you were to store a KTable into a Kafka topic, you’d probably want to enable Kafka’s log compaction feature, e.g. to save storage space.

However, it would not be safe to enable log compaction in the case of a KStream because, as soon as log compaction would begin purging older data records of the same key, it would break the semantics of the data. To pick up the illustration example again, you’d suddenly get a 3 for alice instead of a 4 because log compaction would have removed the ("alice", 1) data record. Hence log compaction is perfectly safe for a KTable (changelog stream) but it is a mistake for a KStream (record stream).

We have already seen an example of a changelog stream in the section streams and tables. Another example are change data capture (CDC) records in the changelog of a relational database, representing which row in a database table was inserted, updated, or deleted.

KTable also provides an ability to look up current values of data records by keys. This table-lookup functionality is available through join operations (see also Joining in the Developer Guide) as well as through Interactive Queries.

GlobalKTable

Only the Kafka Streams DSL has the notion of a GlobalKTable.

Like a KTable , a GlobalKTable is an abstraction of a changelog stream , where each data record represents an update.

A GlobalKTable differs from a KTable in the data that they are being populated with, i.e. which data from the underlying Kafka topic is being read into the respective table. Slightly simplified, imagine you have an input topic with 5 partitions. In your application, you want to read this topic into a table. Also, you want to run your application across 5 application instances for maximum parallelism.

  • If you read the input topic into a KTable , then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable , then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

GlobalKTable provides the ability to look up current values of data records by keys. This table-lookup functionality is available through join operations. Note that a GlobalKTable has no notion of time in contrast to a KTable.

Benefits of global tables:

  • More convenient and/or efficient joins : Notably, global tables allow you to perform star joins, they support “foreign-key” lookups (i.e., you can lookup data in the table not just by record key, but also by data in the record values), and they are more efficient when chaining multiple joins. Also, when joining against a global table, the input data does not need to be co-partitioned.
  • Can be used to “broadcast” information to all the running instances of your application.

Downsides of global tables:

  • Increased local storage consumption compared to the (partitioned) KTable because the entire topic is tracked.
  • Increased network and Kafka broker load compared to the (partitioned) KTable because the entire topic is read.

Creating source streams from Kafka

You can easily read data from Kafka topics into your application. The following operations are supported.

Reading from KafkaDescription
Stream
  • input topics -> KStream

| Creates a KStream from the specified Kafka input topics and interprets the data as a record stream. A KStream represents a partitioned record stream. (details) In the case of a KStream, the local KStream instance of every application instance will be populated with data from only a subset of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

If you do not specify Serdes explicitly, the default Serdes from the configuration are used. You must specify Serdes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default Serdes. For information about configuring default Serdes, available Serdes, and implementing your own custom Serdes see Data Types and Serialization. Several variants of stream exist. For example, you can specify a regex pattern for input topics to read from (note that all matching topics will be part of the same input topic group, and the work will not be parallelized for different topics if subscribed to in this way).
Table

  • input topic -> KTable

| Reads the specified Kafka input topic into a KTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or as DELETE (when the value is null) for that key. (details) In the case of a KTable, the local KTable instance of every application instance will be populated with data from only a subset of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed. You must provide a name for the table (more precisely, for the internal state store that backs the table). This is required for supporting interactive queries against the table. When a name is not provided the table will not be queryable and an internal name will be provided for the state store. If you do not specify Serdes explicitly, the default Serdes from the configuration are used. You must specify Serdes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default Serdes. For information about configuring default Serdes, available Serdes, and implementing your own custom Serdes see Data Types and Serialization. Several variants of table exist, for example to specify the auto.offset.reset policy to be used when reading from the input topic.
Global Table

  • input topic -> GlobalKTable

| Reads the specified Kafka input topic into a GlobalKTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not null) or as DELETE (when the value is null) for that key. (details) In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be populated with data from all the partitions of the input topic. You must provide a name for the table (more precisely, for the internal state store that backs the table). This is required for supporting interactive queries against the table. When a name is not provided the table will not be queryable and an internal name will be provided for the state store.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

StreamsBuilder builder = new StreamsBuilder();

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

You must specify Serdes explicitly if the key or value types of the records in the Kafka input topics do not match the configured default Serdes. For information about configuring default Serdes, available Serdes, and implementing your own custom Serdes see Data Types and Serialization. Several variants of globalTable exist to e.g. specify explicit Serdes.

Transform a stream

The KStream and KTable interfaces support a variety of transformation operations. Each of these operations can be translated into one or more connected processors into the underlying processor topology. Since KStream and KTable are strongly typed, all of these transformation operations are defined as generic functions where users could specify the input and output data types.

Some KStream transformations may generate one or more KStream objects, for example: - filter and map on a KStream will generate another KStream - split on KStream can generate multiple KStreams

Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of out-of-order records after it has already been produced to the downstream transformation operators.

All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function that converts a KTable representation into a KStream. All of these transformation methods can be chained together to compose a complex processor topology.

These transformation operations are described in the following subsections:

  • Stateless transformations
  • Stateful transformations

Stateless transformations

Stateless transformations do not require state for processing and they do not require a state store associated with the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless KTable transformation. This allows the result to be queried through interactive queries. To materialize a KTable, each of the below stateless operations can be augmented with an optional queryableStoreName argument.

TransformationDescription
Branch
  • KStream -> BranchedKStream

| Branch (or split) a KStream based on the supplied predicates into one or more KStream instances. (details) Predicates are evaluated in order. A record is placed to one and only one output stream on the first match: if the n-th predicate evaluates to true, the record is placed to n-th stream. If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch is created. Branching is useful, for example, to route records to different downstream topics.

KStream<String, Long> stream = ...;
Map<String, KStream<String, Long>> branches =
    stream.split(Named.as("Branch-"))
        .branch((key, value) -> key.startsWith("A"),  /* first predicate  */
             Branched.as("A"))
        .branch((key, value) -> key.startsWith("B"),  /* second predicate */
             Branched.as("B"))
        .defaultBranch(Branched.as("C"))              /* default branch */
);

// KStream branches.get("Branch-A") contains all records whose keys start with "A"
// KStream branches.get("Branch-B") contains all records whose keys start with "B"
// KStream branches.get("Branch-C") contains all other records

Broadcast/Multicast

  • no operator

| Broadcasting a KStream into multiple downstream operators. A record is sent to more than one operator by applying multiple operators to the same KStream instance.

KStream<String, Long> stream = ...;
KStream<...> stream1 = stream.map(...);
KStream<...> stream2 = stream.mapValue(...);
KStream<...> stream3 = stream.flatMap(...);

Multicasting a KStream into multiple downstream operators. In contrast to branching , which sends each record to at most one downstream branch, a multicast may send a record to any number of downstream KStream instances. A multicast is implemented as a broadcast plus filters.

KStream<String, Long> stream = ...;
KStream<...> stream1 = stream.filter((key, value) -> key.startsWith("A")); // contains all records whose keys start with "A"
KStream<...> stream2 = stream.filter((key, value) -> key.startsWith("AB")); // contains all records whose keys start with "AB" (subset of stream1)
KStream<...> stream3 = stream.filter((key, value) -> key.contains("B")); // contains all records whose keys contains a "B" (superset of stream2)

Filter

  • KStream -> KStream
  • KTable -> KTable

| Evaluates a boolean function for each element and retains those for which the function returns true. (KStream details, KTable details)

KStream<String, Long> stream = ...;

// A filter that selects (keeps) only positive numbers
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);

Inverse Filter

  • KStream -> KStream
  • KTable -> KTable

| Evaluates a boolean function for each element and drops those for which the function returns true. (KStream details, KTable details)

KStream<String, Long> stream = ...;

// An inverse filter that discards any negative numbers or zero
KStream<String, Long> onlyPositives = stream.filterNot((key, value) -> value <= 0);

FlatMap

  • KStream -> KStream

| Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types. (details) Marks the stream for data re-partitioning: Applying a grouping or a join after flatMap will result in re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

KStream<Long, String> stream = ...;
KStream<String, Integer> transformed = stream.flatMap(
     // Here, we generate two output records for each input record.
     // We also change the key and value types.
     // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
    (key, value) -> {
      List<KeyValue<String, Integer>> result = new LinkedList<>();
      result.add(KeyValue.pair(value.toUpperCase(), 1000));
      result.add(KeyValue.pair(value.toLowerCase(), 9000));
      return result;
    }
  );

FlatMapValues

  • KStream -> KStream

| Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type. (details) flatMapValues is preferable to flatMap because it will not cause data re-partitioning. However, you cannot modify the key or key type like flatMap does.

// Split a sentence into words.
KStream<byte[], String> sentences = ...;
KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\s+")));

Foreach

  • KStream -> void
  • KStream -> void
  • KTable -> void

| Terminal operation. Performs a stateless action on each record. (details) You would use foreach to cause side effects based on the input data (similar to peek) and then stop further processing of the input data (unlike peek, which is not a terminal operation). Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.

KStream<String, Long> stream = ...;

// Print the contents of the KStream to the local console.
stream.foreach((key, value) -> System.out.println(key + " => " + value));

GroupByKey

  • KStream -> KGroupedStream

| Groups the records by the existing key. (details) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations. When to set explicit Serdes: Variants of groupByKey exist to override the configured default Serdes of your application, which you must do if the key and/or value types of the resulting KGroupedStream do not match the configured default Serdes. Note Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins. Causes data re-partitioning if and only if the stream was marked for re-partitioning. groupByKey is preferable to groupBy because it re-partitions data only if the stream was already marked for re-partitioning. However, groupByKey does not allow you to modify the key or key type like groupBy does.

KStream<byte[], String> stream = ...;

// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();

// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
    Grouped.with(
      Serdes.ByteArray(), /* key */
      Serdes.String())     /* value */
  );  

GroupBy

  • KStream -> KGroupedStream
  • KTable -> KGroupedTable

| Groups the records by a new key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. groupBy is a shorthand for selectKey(...).groupByKey(). (KStream details, KTable details) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations. When to set explicit Serdes: Variants of groupBy exist to override the configured default Serdes of your application, which you must do if the key and/or value types of the resulting KGroupedStream or KGroupedTable do not match the configured default Serdes. Note Grouping vs. Windowing: A related operation is windowing, which lets you control how to “sub-group” the grouped records of the same key into so-called windows for stateful operations such as windowed aggregations or windowed joins. Always causes data re-partitioning: groupBy always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.

KStream<byte[], String> stream = ...;
KTable<byte[], String> table = ...;

// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    (key, value) -> KeyValue.pair(value, value.length()),
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.Integer()) /* value (note: type was modified) */
  );

Cogroup

  • KGroupedStream -> CogroupedKStream
  • CogroupedKStream -> CogroupedKStream

| Cogrouping allows to aggregate multiple input streams in a single operation. The different (already grouped) input streams must have the same key type and may have different values types. KGroupedStream#cogroup() creates a new cogrouped stream with a single input stream, while CogroupedKStream#cogroup() adds a grouped stream to an existing cogrouped stream. A CogroupedKStream may be windowed before it is aggregated. Cogroup does not cause a repartition as it has the prerequisite that the input streams are grouped. In the process of creating these groups they will have already been repartitioned if the stream was already marked for repartitioning.

KStream<byte[], String> stream = ...;
                        KStream<byte[], String> stream2 = ...;

// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
KGroupedStream<byte[], String> groupedStream2 = stream2.groupByKey();
CogroupedKStream<byte[], String> cogroupedStream = groupedStream.cogroup(aggregator1).cogroup(groupedStream2, aggregator2);

KTable<byte[], String> table = cogroupedStream.aggregate(initializer);

KTable<byte[], String> table2 = cogroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(500))).aggregate(initializer);  

Map

  • KStream -> KStream

| Takes one record and produces one record. You can modify the record key and value, including their types. (details) Marks the stream for data re-partitioning: Applying a grouping or a join after map will result in re-partitioning of the records. If possible use mapValues instead, which will not cause data re-partitioning.

KStream<byte[], String> stream = ...;

// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

Map (values only)

  • KStream -> KStream
  • KTable -> KTable

| Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type. (KStream details, KTable details) mapValues is preferable to map because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like map does.

KStream<byte[], String> stream = ...;

KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());

Merge

  • KStream -> KStream

| Merges records of two streams into one larger stream. (details) There is no ordering guarantee between records from different streams in the merged stream. Relative order is preserved within each input stream though (ie, records within the same input stream are processed in order)

KStream<byte[], String> stream1 = ...;

KStream<byte[], String> stream2 = ...;

KStream<byte[], String> merged = stream1.merge(stream2);  

Peek

  • KStream -> KStream

| Performs a stateless action on each record, and returns an unchanged stream. (details) You would use peek to cause side effects based on the input data (similar to foreach) and continue processing the input data (unlike foreach, which is a terminal operation). peek returns the input stream as-is; if you need to modify the input stream, use map or mapValues instead. peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting. Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.

KStream<byte[], String> stream = ...;

KStream<byte[], String> unmodifiedStream = stream.peek(
    (key, value) -> System.out.println("key=" + key + ", value=" + value));

Print

  • KStream -> void

| Terminal operation. Prints the records to System.out. See Javadocs for serde and toString() caveats. (details) Calling print() is the same as calling foreach((key, value) -> System.out.println(key + ", " + value)) print is mainly for debugging/testing purposes, and it will try to flush on each record print. Hence it should not be used for production usage if performance requirements are concerned.

KStream<byte[], String> stream = ...;
// print to sysout
stream.print();

// print to file with a custom label
stream.print(Printed.toFile("streams.out").withLabel("streams"));  

SelectKey

  • KStream -> KStream

| Assigns a new key - possibly of a new key type - to each record. (details) Calling selectKey(mapper) is the same as calling map((key, value) -> mapper(key, value), value). Marks the stream for data re-partitioning: Applying a grouping or a join after selectKey will result in re-partitioning of the records.

KStream<byte[], String> stream = ...;

// Derive a new record key from the record's value.  Note how the key type changes, too.
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])

Table to Stream

  • KTable -> KStream

| Get the changelog stream of this table. (details)

KTable<byte[], String> table = ...;

// Also, a variant of `toStream` exists that allows you
// to select a new key for the resulting stream.
KStream<byte[], String> stream = table.toStream();  

Stream to Table

  • KStream -> KTable

| Convert an event stream into a table, or say a changelog stream. (details)

KStream<byte[], String> stream = ...;

KTable<byte[], String> table = stream.toTable();  

Repartition

  • KStream -> KStream

| Manually trigger repartitioning of the stream with desired number of partitions. (details) repartition() is similar to through() however Kafka Streams will manage the topic for you. Generated topic is treated as internal topic, as a result data will be purged automatically as any other internal repartition topic. In addition, you can specify the desired number of partitions, which allows to easily scale in/out downstream sub-topologies. repartition() operation always triggers repartitioning of the stream, as a result it can be used with embedded Processor API methods (like transform() et al.) that do not trigger auto repartitioning when key changing operation is performed beforehand.

KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));  

Stateful transformations

Stateful transformations depend on state for processing inputs and producing outputs and require a state store associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary.

Note: Following store types are used regardless of the possibly specified type (via the parameter materialized):

Note, that state stores are fault-tolerant. In case of failure, Kafka Streams guarantees to fully restore all state stores prior to resuming the processing. See Fault Tolerance for further information.

Available stateful transformations in the DSL include:

  • Aggregating
  • Joining
  • Windowing (as part of aggregations and joins)
  • Applying custom processors and transformers, which may be stateful, for Processor API integration

The following diagram shows their relationships:

Stateful transformations in the DSL.

Here is an example of a stateful application: the WordCount algorithm.

WordCount example:

// Assume the record values represent lines of text.  For the sake of this example, you can ignore
// whatever may be stored in the record keys.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.  The text lines are the record
    // values, i.e. you can ignore whatever data is in the record keys and thus invoke
    // `flatMapValues` instead of the more generic `flatMap`.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
    // Group the stream by word to ensure the key of the record is the word.
    .groupBy((key, word) -> word)
    // Count the occurrences of each word (record key).
    //
    // This will change the stream type from `KGroupedStream<String, String>` to
    // `KTable<String, Long>` (word -> count).
    .count()
    // Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
    .toStream();

Aggregating

After records are grouped by key via groupByKey or groupBy - and thus represented as either a KGroupedStream or a KGroupedTable, they can be aggregated via an operation such as reduce. Aggregations are key-based operations, which means that they always operate over records (notably record values) of the same key. You can perform aggregations on windowed or non-windowed data.

TransformationDescription
Aggregate
  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Aggregates the values of (non-windowed) records by the grouped key or cogrouped. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (KGroupedStream details, KGroupedTable details KGroupedTable details) When aggregating a grouped stream , you must provide an initializer (e.g., aggValue = 0) and an “adder” aggregator (e.g., aggValue + curValue). When aggregating a grouped table , you must additionally provide a “subtractor” aggregator (think: aggValue - oldValue). When aggregating a cogrouped stream , the actual aggregators are provided for each input stream in the prior cogroup()calls, and thus you only need to provide an initializer (e.g., aggValue = 0) Several variants of aggregate exist, see Javadocs for details.

KGroupedStream<byte[], String> groupedStream = ...;
KGroupedTable<byte[], String> groupedTable = ...;

// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long()); /* serde for aggregate value */

// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-table-store") /* state store name */
	.withValueSerde(Serdes.Long()) /* serde for aggregate value */

Detailed behavior of KGroupedStream:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder).
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior of KGroupedTable:

  • Input records with null keys are ignored.
  • When a record key is received for the first time, the initializer is called (and called before the adder and subtractor). Note that, in contrast to KGroupedStream, over time the initializer may be called more than once for a key as a result of having received input tombstone records for that key (see below).
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The subtractor is guaranteed to be called before the adder if the extracted grouping key of the old and new value is the same. The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor and adder is not defined.
  • When a tombstone record - i.e. a record with a null value - is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will trigger the initializer again.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Aggregate (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Aggregates the values of records, per window, by the grouped key. Aggregating is a generalization of reduce and allows, for example, the aggregate value to have a different type than the input values. (TimeWindowedKStream details, SessionWindowedKStream details) You must provide an initializer (e.g., aggValue = 0), “adder” aggregator (e.g., aggValue + curValue), and a window. When windowing based on sessions, you must additionally provide a “session merger” aggregator (e.g., mergedAggValue = leftAggValue + rightAggValue). The windowed aggregate turns a TimeWindowedKStream<K, V> or SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of aggregate exist, see Javadocs for details.

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(Duration.ofMinutes(5))
    .aggregate(
        () -> 0L, /* initializer */
        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30)))
    .aggregate(
        () -> 0L, /* initializer */
        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)).
    aggregate(
    	() -> 0L, /* initializer */
    	(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
        Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

Detailed behavior:

  • The windowed aggregate behaves similar to the rolling aggregate described above. The additional twist is that the behavior applies per window.
  • Input records with null keys are ignored in general.
  • When a record key is received for the first time for a given window, the initializer is called (and called before the adder).
  • Whenever a record with a non-null value is received for a given window, the adder is called.
  • When using session windows: the session merger is called whenever two sessions are being merged.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Count

  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Counts the number of records by the grouped key. (KGroupedStream details, KGroupedTable details) Several variants of count exist, see Javadocs for details.

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Counting a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.count();

// Counting a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.count();

Detailed behavior for KGroupedStream:

  • Input records with null keys or values are ignored.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored. Records with null values are not ignored but interpreted as “tombstones” for the corresponding key, which indicate the deletion of the key from the table.

Count (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Counts the number of records, per window, by the grouped key. (TimeWindowedKStream details, SessionWindowedKStream details) The windowed count turns a TimeWindowedKStream<K, V> or SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of count exist, see Javadocs for details.

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) /* time-based window */
    .count();

// Counting a KGroupedStream with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30))) /* time-based window */
    .count();

// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session window */
    .count();

Detailed behavior:

  • Input records with null keys or values are ignored.

Reduce

  • KGroupedStream -> KTable
  • KGroupedTable -> KTable

| Rolling aggregation. Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike aggregate. (KGroupedStream details, KGroupedTable details) When reducing a grouped stream , you must provide an “adder” reducer (e.g., aggValue + curValue). When reducing a grouped table , you must additionally provide a “subtractor” reducer (e.g., aggValue - oldValue). Several variants of reduce exist, see Javadocs for details.

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */);

// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
    (aggValue, newValue) -> aggValue + newValue, /* adder */
    (aggValue, oldValue) -> aggValue - oldValue /* subtractor */);

Detailed behavior for KGroupedStream:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received, the adder is called.

Detailed behavior for KGroupedTable:

  • Input records with null keys are ignored in general.
  • When a record key is received for the first time, then the value of that record is used as the initial aggregate value. Note that, in contrast to KGroupedStream, over time this initialization step may happen more than once for a key as a result of having received input tombstone records for that key (see below).
  • When the first non-null value is received for a key (e.g., INSERT), then only the adder is called.
  • When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The subtractor is guaranteed be called before the adder if the extracted grouping key of the old and new value is the same. The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor and adder is not defined.
  • When a tombstone record - i.e. a record with a null value - is received for a key (e.g., DELETE), then only the subtractor is called. Note that, whenever the subtractor returns a null value itself, then the corresponding key is removed from the resulting KTable. If that happens, any next input record for that key will re-initialize its aggregate value.

See the example at the bottom of this section for a visualization of the aggregation semantics.
Reduce (windowed)

  • KGroupedStream -> KTable

| Windowed aggregation. Combines the values of records, per window, by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. Records with null key or value are ignored. The result value type cannot be changed, unlike aggregate. (TimeWindowedKStream details, SessionWindowedKStream details) The windowed reduce turns a turns a TimeWindowedKStream<K, V> or a SessionWindowedKStream<K, V> into a windowed KTable<Windowed<K>, V>. Several variants of reduce exist, see Javadocs for details.

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30))) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy(
  SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session window */
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

Detailed behavior:

  • The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that the behavior applies per window.
  • Input records with null keys are ignored in general.
  • When a record key is received for the first time for a given window, then the value of that record is used as the initial aggregate value.
  • Whenever a record with a non-null value is received for a given window, the adder is called.

See the example at the bottom of this section for a visualization of the aggregation semantics.

Example of semantics for stream aggregations: A KGroupedStream -> KTable example is shown below. The streams and the table are initially empty. Bold font is used in the column for “KTable aggregated” to highlight changed state. An entry such as (hello, 1) denotes a record with key hello and value 1. To improve the readability of the semantics table you can assume that all records are processed in timestamp order.

// Key: word, value: count
KStream<String, Integer> wordCounts = ...;

KGroupedStream<String, Integer> groupedStream = wordCounts
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()));

KTable<String, Integer> aggregated = groupedStream.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

Note

Impact of record caches : For illustration purposes, the column “KTable aggregated” below shows the table’s state changes over time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key kafka in the KTable (here: from (kafka 1) directly to (kafka, 3). Typically, you should only disable record caches for testing or debugging purposes - under normal circumstances it is better to leave record caches enabled.

KStream wordCountsKGroupedStream groupedStreamKTable aggregated
TimestampInput recordGroupingInitializer
1(hello, 1)(hello, 1)0 (for hello)
2(kafka, 1)(kafka, 1)0 (for kafka)
3(streams, 1)(streams, 1)0 (for streams)
4(kafka, 1)(kafka, 1)
5(kafka, 1)(kafka, 1)
6(streams, 1)(streams, 1)

Example of semantics for table aggregations: A KGroupedTable -> KTable example is shown below. The tables are initially empty. Bold font is used in the column for “KTable aggregated” to highlight changed state. An entry such as (hello, 1) denotes a record with key hello and value 1. To improve the readability of the semantics table you can assume that all records are processed in timestamp order.

// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
KTable<String, String> userProfiles = ...;

// Re-group `userProfiles`.  Don't read too much into what the grouping does:
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable<String, Integer> groupedTable = userProfiles
    .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());

KTable<String, Integer> aggregated = groupedTable.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

Note

Impact of record caches : For illustration purposes, the column “KTable aggregated” below shows the table’s state changes over time in a very granular way. In practice, you would observe state changes in such a granular way only when record caches are disabled (default: enabled). When record caches are enabled, what might happen for example is that the output results of the rows with timestamps 4 and 5 would be compacted, and there would only be a single state update for the key kafka in the KTable (here: from (kafka 1) directly to (kafka, 3). Typically, you should only disable record caches for testing or debugging purposes - under normal circumstances it is better to leave record caches enabled.

KTable userProfilesKGroupedTable groupedTableKTable aggregated
TimestampInput recordInterpreted asGrouping
1(alice, E)INSERT alice(E, 5)
2(bob, A)INSERT bob(A, 3)
3(charlie, A)INSERT charlie(A, 7)
4(alice, A)UPDATE alice(A, 5)
5(charlie, null)DELETE charlie(null, 7)
6(null, E)ignored
7(bob, E)UPDATE bob(E, 3)

Joining

Streams and tables can also be joined. Many stream processing applications in practice are coded as streaming joins. For example, applications backing an online shop might need to access multiple, updating database tables (e.g. sales prices, inventory, customer information) in order to enrich a new data record (e.g. customer transaction) with context information. That is, scenarios where you need to perform table lookups at very large scale and with a low processing latency. Here, a popular pattern is to make the information in the databases available in Kafka through so-called change data capture in combination with Kafka’s Connect API, and then implementing applications that leverage the Streams API to perform very fast and efficient local joins of such tables and streams, rather than requiring the application to make a query to a remote database over the network for each record. In this example, the KTable concept in Kafka Streams would enable you to track the latest state (e.g., snapshot) of each table in a local state store, thus greatly reducing the processing latency as well as reducing the load of the remote databases when doing such streaming joins.

The following join operations are supported, see also the diagram in the overview section of Stateful Transformations. Depending on the operands, joins are either windowed joins or non-windowed joins.

Join operandsType(INNER) JOINLEFT JOINOUTER JOIN
KStream-to-KStreamWindowedSupportedSupportedSupported
KTable-to-KTableNon-windowedSupportedSupportedSupported
KTable-to-KTable Foreign-Key JoinNon-windowedSupportedSupportedNot Supported
KStream-to-KTableNon-windowedSupportedSupportedNot Supported
KStream-to-GlobalKTableNon-windowedSupportedSupportedNot Supported
KTable-to-GlobalKTableN/ANot SupportedNot SupportedNot Supported

Each case is explained in more detail in the subsequent sections.

Join co-partitioning requirements

For equi-joins, input data must be co-partitioned when joining. This ensures that input records with the same key from both sides of the join, are delivered to the same stream task during processing. It is your responsibility to ensure data co-partitioning when joining. Co-partitioning is not required when performing KTable-KTable Foreign-Key joins and Global KTable joins.

The requirements for data co-partitioning are:

  • The input topics of the join (left side and right side) must have the same number of partitions.
  • All applications that write to the input topics must have the same partitioning strategy so that records with the same key are delivered to same partition number. In other words, the keyspace of the input data must be distributed across partitions in the same manner. This means that, for example, applications that use Kafka’s Java Producer API must use the same partitioner (cf. the producer setting "partitioner.class" aka ProducerConfig.PARTITIONER_CLASS_CONFIG), and applications that use the Kafka’s Streams API must use the same StreamPartitioner for operations such as KStream#to(). The good news is that, if you happen to use the default partitioner-related settings across all applications, you do not need to worry about the partitioning strategy.

Why is data co-partitioning required? Because KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on the keys of records (e.g., leftRecord.key == rightRecord.key), it is required that the input streams/tables of a join are co-partitioned by key.

There are two exceptions where co-partitioning is not required. For KStream-GlobalKTable joins joins, co-partitioning is not required because all partitions of the GlobalKTable’s underlying changelog stream are made available to each KafkaStreams instance. That is, each instance has a full copy of the changelog stream. Further, a KeyValueMapper allows for non-key based joins from the KStream to the GlobalKTable. KTable-KTable Foreign-Key joins also do not require co-partitioning. Kafka Streams internally ensures co-partitioning for Foreign-Key joins.

Note

Kafka Streams partly verifies the co-partitioning requirement: During the partition assignment step, i.e. at runtime, Kafka Streams verifies whether the number of partitions for both sides of a join are the same. If they are not, a TopologyBuilderException (runtime exception) is being thrown. Note that Kafka Streams cannot verify whether the partitioning strategy matches between the input streams/tables of a join - it is up to the user to ensure that this is the case.

Ensuring data co-partitioning: If the inputs of a join are not co-partitioned yet, you must ensure this manually. You may follow a procedure such as outlined below. It is recommended to repartition the topic with fewer partitions to match the larger partition number of avoid bottlenecks. Technically it would also be possible to repartition the topic with more partitions to the smaller partition number. For stream-table joins, it’s recommended to repartition the KStream because repartitioning a KTable might result in a second state store. For table-table joins, you might also consider to size of the KTables and repartition the smaller KTable.

  1. Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions. Let’s call this stream/table “SMALLER”, and the other side of the join “LARGER”. To learn about the number of partitions of a Kafka topic you can use, for example, the CLI tool bin/kafka-topics with the --describe option.

  2. Within your application, re-partition the data of “SMALLER”. You must ensure that, when repartitioning the data with repartition, the same partitioner is used as for “LARGER”.

 * If "SMALLER" is a KStream: `KStream#repartition(Repartitioned.numberOfPartitions(...))`.
 * If "SMALLER" is a KTable: `KTable#toStream#repartition(Repartitioned.numberOfPartitions(...).toTable())`.
  1. Within your application, perform the join between “LARGER” and the new stream/table.

KStream-KStream Join

KStream-KStream joins are always windowed joins, because otherwise the size of the internal state store used to perform the join - e.g., a sliding window or “buffer” - would grow indefinitely. For stream-stream joins it’s important to highlight that a new input record on one side will produce a join output for each matching record on the other side, and there can be multiple such matching records in a given join window (cf. the row with timestamp 15 in the join semantics table below, for example).

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join (windowed)
  • (KStream, KStream) -> KStream

| Performs an INNER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of join exists, see the Javadocs for details.

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key or a `null` value are ignored and do not trigger the join.

See the semantics overview at the bottom of this section for a detailed description.
Left Join (windowed)

  • (KStream, KStream) -> KStream

| Performs a LEFT JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of leftJoin exists, see the Javadocs for details.

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` value are ignored and do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=60 and timestampe=80 in the table below, which lists [E, null] and [F, null]in the LEFT JOIN column. Note that these left results are emitted after the specified grace period passed. Caution: using the deprecated JoinWindows.of(...).grace(...) API might result in eagerly emitted spurious left results.

See the semantics overview at the bottom of this section for a detailed description.
Outer Join (windowed)

  • (KStream, KStream) -> KStream

| Performs an OUTER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type KStream<K, ...> rather than KStream<Windowed<K>, ...>. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of a stream if and only if the stream was marked for re-partitioning (if both are marked, both are re-partitioned). Several variants of outerJoin exists, see the Javadocs for details.

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

KStream<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key, and window-based , i.e. two input records are joined if and only if their timestamps are “close” to each other as defined by the user-supplied JoinWindows, i.e. the window defines an additional join predicate over the record timestamps.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` value are ignored and do not trigger the join.
  • For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively; this explains the row with timestamp=60, timestamp=80, and timestamp=100 in the table below, which lists [E, null], [F, null], and [null, f] in the OUTER JOIN column. Note that these left and right results are emitted after the specified grace period passed. Caution: using the deprecated JoinWindows.of(...).grace(...) API might result in eagerly emitted spurious left/right results.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of stream-stream joins: The semantics of the various stream-stream join variants are explained below. To improve the readability of the table, assume that (1) all records have the same key (and thus the key in the table is omitted), and (2) all records are processed in timestamp order. We assume a join window size of 15 seconds with a grace period of 5 seconds.

Note: If you use the old and now deprecated API to specify the grace period, i.e., JoinWindows.of(...).grace(...), left/outer join results are emitted eagerly, and the observed result might differ from the result shown below.

The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join, leftJoin, and outerJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KStream)Right (KStream)(INNER) JOINLEFT JOINOUTER JOIN
1null
2null
3A
4a[A, a][A, a][A, a]
5B[B, a][B, a][B, a]
6b[A, b], [B, b][A, b], [B, b][A, b], [B, b]
7null
8null
9C[C, a], [C, b][C, a], [C, b][C, a], [C, b]
10c[A, c], [B, c], [C, c][A, c], [B, c], [C, c][A, c], [B, c], [C, c]
11null
12null
13null
14d[A, d], [B, d], [C, d][A, d], [B, d], [C, d][A, d], [B, d], [C, d]
15D[D, a], [D, b], [D, c], [D, d][D, a], [D, b], [D, c], [D, d][D, a], [D, b], [D, c], [D, d]
40E
60F[E, null][E, null]
80f[F, null][F, null]
100G[null, f]

KTable-KTable Equi-Join

KTable-KTable equi-joins are always non-windowed joins. They are designed to be consistent with their counterparts in relational databases. The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. The join result is a new KTable that represents the changelog stream of the join operation.

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KTable, KTable) -> KTable

| Performs an INNER JOIN of this table with another table. The result is an ever-updating KTable that represents the “current” result of the join. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

KTable<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
* When joining versioned tables, out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, has already been processed, are ignored and do not trigger the join.

See the semantics overview at the bottom of this section for a detailed description.
Left Join

  • (KTable, KTable) -> KTable

| Performs a LEFT JOIN of this table with another table. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

KTable<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Right-tombstones trigger the join, but left-tombstones don't: when an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
* When joining versioned tables, out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, has already been processed, are ignored and do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.

See the semantics overview at the bottom of this section for a detailed description.
Outer Join

  • (KTable, KTable) -> KTable

| Performs an OUTER JOIN of this table with another table. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned.

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

KTable<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Input records with a `null` key are ignored and do not trigger the join.
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones may trigger joins, depending on the content in the left and right tables. When an input tombstone is received, an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
* When joining versioned tables, out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, has already been processed, are ignored and do not trigger the join.
  • For each input record on one side that does not have any match on the other side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null) or ValueJoiner#apply(null, rightRecord.value), respectively; this explains the rows with timestamp=3 and timestamp=7 in the table below, which list [A, null] and [null, b], respectively, in the OUTER JOIN column.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of table-table equi-joins: The semantics of the various table-table equi-join variants are explained below. To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order. The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join, leftJoin, and outerJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KTable)Right (KTable)(INNER) JOINLEFT JOINOUTER JOIN
1null
2null
3A[A, null][A, null]
4a[A, a][A, a][A, a]
5B[B, a][B, a][B, a]
6b[B, b][B, b][B, b]
7nullnullnull[null, b]
8nullnull
9C[C, null][C, null]
10c[C, c][C, c][C, c]
11nullnull[C, null][C, null]
12nullnullnull
13null
14d[null, d]
15D[D, d][D, d][D, d]

KTable-KTable Foreign-Key Join

KTable-KTable foreign-key joins are always non-windowed joins. Foreign-key joins are analogous to joins in SQL. As a rough example:

SELECT ... FROM {this KTable} JOIN {other KTable} ON {other.key} = {result of foreignKeyExtractor(this.value)} ...

The output of the operation is a new KTable containing the join result.

The changelog streams of both KTables are materialized into local state stores to represent the latest snapshot of their table duals. A foreign-key extractor function is applied to the left record, with a new intermediate record created and is used to lookup and join with the corresponding primary key on the right hand side table. The result is a new KTable that represents the changelog stream of the join operation.

The left KTable can have multiple records which map to the same key on the right KTable. An update to a single left KTable entry may result in a single output event, provided the corresponding key exists in the right KTable. Consequently, a single update to a right KTable entry will result in an update for each record in the left KTable that has the same foreign key.

TransformationDescription
Inner Join
  • (KTable, KTable) -> KTable

| Performs a foreign-key INNER JOIN of this table with another table. The result is an ever-updating KTable that represents the “current” result of the join. (details)

KTable<String, Long> left = ...;
                KTable<Long, Double> right = ...;
//This foreignKeyExtractor simply uses the left-value to map to the right-key.
Function<Long, Long> foreignKeyExtractor = (v) -> v;
//Alternative: with access to left table key
BiFunction<String, Long, Long> foreignKeyExtractor = (k, v) -> v;

                KTable<String, String> joined = left.join(right, foreignKeyExtractor,
                    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
                  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate:

    foreignKeyExtractor.apply(leftRecord.value) == rightRecord.key
    
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.

* Records for which the `foreignKeyExtractor` produces `null` are ignored and do not trigger a join. If you want to join with `null` foreign keys, use a suitable sentinel value to do so (i.e. `"NULL"` for a String field, or `-1` for an auto-incrementing integer field). 
* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join. When an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
* When joining versioned tables, out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, has already been processed, are ignored and do not trigger the join.

See the semantics overview at the bottom of this section for a detailed description.
Left Join

  • (KTable, KTable) -> KTable

| Performs a foreign-key LEFT JOIN of this table with another table. (details)

KTable<String, Long> left = ...;
                KTable<Long, Double> right = ...;
//This foreignKeyExtractor simply uses the left-value to map to the right-key.
Function<Long, Long> foreignKeyExtractor = (v) -> v;
//Alternative: with access to left table key
BiFunction<String, Long, Long> foreignKeyExtractor = (k, v) -> v;

                KTable<String, String> joined = left.leftJoin(right, foreignKeyExtractor,
                    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
                  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate:

    foreignKeyExtractor.apply(leftRecord.value) == rightRecord.key
    
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.

* Input records with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Right-tombstones trigger the join, but left-tombstones don't: when an input tombstone is received, then an output tombstone is forwarded directly to the join result KTable if required (i.e. only if the corresponding key actually exists already in the join result KTable).
* When joining versioned tables, out-of-order input records, i.e., those for which another record from the same table, with the same key and a larger timestamp, has already been processed, are ignored and do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=7 & 8 in the table below, which lists (q,10,null) and (r,10,null) in the LEFT JOIN column.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of table-table foreign-key joins: The semantics of the table-table foreign-key INNER and LEFT JOIN variants are demonstrated below. The key is shown alongside the value for each record. Records are processed in incrementing offset order. The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join and leftJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all. For the purpose of this example, Function foreignKeyExtractor simply uses the left-value as the output.

Record OffsetLeft KTable (K, extracted-FK)Right KTable (FK, VR)(INNER) JOINLEFT JOIN
1(k,1)(1,foo)(k,1,foo)
(k,1,foo)
2(k,2)
(k,null)(k,2,null)

3 | (k,3)
| | (k,null) | (k,3,null)

4 | | (3,bar)
| (k,3,bar)
| (k,3,bar)

5 | (k,null)
| | (k,null)
| (k,null,null)
6 | (k,1) |
| (k,1,foo)
| (k,1,foo)

7 | (q,10)
| |
| (q,10,null)
8 | (r,10) |
| | (r,10,null)
9 |
| (10,baz) | (q,10,baz), (r,10,baz) | (q,10,baz), (r,10,baz)

KStream-KTable Join

KStream-KTable joins are always non-windowed joins. They allow you to perform table lookups against a KTable (changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be to enrich a stream of user activities (KStream) with the latest user profile information (KTable).

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KStream, KTable) -> KStream

| Performs an INNER JOIN of this stream with the table, effectively doing a table lookup. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning. Several variants of join exists, see the Javadocs for details.

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
      .withGracePeriod(Duration.ZERO) /* grace period */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.
  • When the table is versioned, the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp less than or equal to the stream record timestamp. If the stream record timestamp is older than the table’s history retention, then the record is dropped.
  • To use the grace period, the table needs to be versioned. This will cause the stream to buffer for the specified grace period before trying to find a matching record with the right timestamp in the table. The case where the grace period would be used for is if a record in the table has a timestamp less than or equal to the stream record timestamp but arrives after the stream record. If the table record arrives within the grace period the join will still occur. If the table record does not arrive before the grace period the join will continue as normal.

See the semantics overview at the bottom of this section for a detailed description.
Left Join

  • (KStream, KTable) -> KStream

| Performs a LEFT JOIN of this stream with the table, effectively doing a table lookup. (details) Data must be co-partitioned : The input data for both sides must be co-partitioned. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning. Several variants of leftJoin exists, see the Javadocs for details.

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
      .withGracePeriod(Duration.ZERO) /* grace period */
  );

Detailed behavior:

  • The join is key-based , i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null); this explains the row with timestamp=3 in the table below, which lists [A, null] in the LEFT JOIN column.
  • When the table is versioned, the table record to join with is determined by performing a timestamped lookup, i.e., the table record which is joined will be the latest-by-timestamp record with timestamp less than or equal to the stream record timestamp. If the stream record timestamp is older than the table’s history retention, then the record that is joined will be null.
  • To use the grace period, the table needs to be versioned. This will cause the stream to buffer for the specified grace period before trying to find a matching record with the right timestamp in the table. The case where the grace period would be used for is if a record in the table has a timestamp less than or equal to the stream record timestamp but arrives after the stream record. If the table record arrives within the grace period the join will still occur. If the table record does not arrive before the grace period the join will continue as normal.

See the semantics overview at the bottom of this section for a detailed description.

Semantics of stream-table joins: The semantics of the various stream-table join variants are explained below. To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in the table) and that (2) all records are processed in timestamp order. The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied ValueJoiner for the join and leftJoin methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the ValueJoiner is not called at all.

TimestampLeft (KStream)Right (KTable)(INNER) JOINLEFT JOIN
1null
2null
3A[A, null]
4a
5B[B, a][B, a]
6b
7null
8null
9C[C, null]
10c
11null
12null
13null
14d
15D[D, d][D, d]

KStream-GlobalKTable Join

KStream-GlobalKTable joins are always non-windowed joins. They allow you to perform table lookups against a GlobalKTable (entire changelog stream) upon receiving a new record from the KStream (record stream). An example use case would be “star queries” or “star joins”, where you would enrich a stream of user activities (KStream) with the latest user profile information (GlobalKTable) and further context information (further GlobalKTables). However, because GlobalKTables have no notion of time, a KStream-GlobalKTable join is not a temporal join, and there is no event-time synchronization between updates to a GlobalKTable and processing of KStream records.

At a high-level, KStream-GlobalKTable joins are very similar to KStream-KTable joins. However, global tables provide you with much more flexibility at the some expense when compared to partitioned tables:

  • They do not require data co-partitioning.
  • They allow for efficient “star joins”; i.e., joining a large-scale “facts” stream against “dimension” tables
  • They allow for joining against foreign keys; i.e., you can lookup data in the table not just by the keys of records in the stream, but also by data in the record values.
  • They make many use cases feasible where you must work on heavily skewed data and thus suffer from hot partitions.
  • They are often more efficient than their partitioned KTable counterpart when you need to perform multiple joins in succession.

Join output records are effectively created as follows, leveraging the user-supplied ValueJoiner:

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
TransformationDescription
Inner Join
  • (KStream, GlobalKTable) -> KStream

| Performs an INNER JOIN of this stream with the global table, effectively doing a table lookup. (details) The GlobalKTable is fully bootstrapped upon (re)start of a KafkaStreams instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

KStream<String, String> joined = left.join(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

Detailed behavior:

  • The join is indirectly key-based , i.e. with the join predicate KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` key or a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ , which indicate the deletion of a record key from the table. Tombstones do not trigger the join.

Left Join

  • (KStream, GlobalKTable) -> KStream

| Performs a LEFT JOIN of this stream with the global table, effectively doing a table lookup. (details) The GlobalKTable is fully bootstrapped upon (re)start of a KafkaStreams instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. Causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning.

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

KStream<String, String> joined = left.leftJoin(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

Detailed behavior:

  • The join is indirectly key-based , i.e. with the join predicate KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
* Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
* Input records for the stream with a `null` value are ignored and do not trigger the join.
* Input records for the table with a `null` value are interpreted as _tombstones_ , which indicate the deletion of a record key from the table. Tombstones do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null).

Semantics of stream-global-table joins: The join semantics are different to KStream-KTable joins because it’s not a temporal join. Another difference is that, for KStream-GlobalKTable joins, the left input record is first “mapped” with a user-supplied KeyValueMapper into the table’s keyspace prior to the table lookup.

Windowing

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key.

Note

A related operation is grouping, which groups all records that have the same key to ensure that data is properly partitioned (“keyed”) for subsequent operations. Once grouped, windowing allows you to further sub-group the records of a key.

For example, in join operations, a windowing state store is used to store all the records received so far within the defined window boundary. In aggregating operations, a windowing state store is used to store the latest aggregation results per window. Old records in the state store are purged after the specified window retention period. Kafka Streams guarantees to keep a window for at least this specified time; the default value is one day and can be changed via Materialized#withRetention().

The DSL supports the following types of windows:

Window nameBehaviorShort description
Hopping time windowTime-basedFixed-size, overlapping windows
Tumbling time windowTime-basedFixed-size, non-overlapping, gap-less windows
Sliding time windowTime-basedFixed-size, overlapping windows that work on differences between record timestamps
Session windowSession-basedDynamically-sized, non-overlapping, data-driven windows

Hopping time windows

Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap - and in general they do - a data record may belong to more than one such windows.

Note

Hopping windows vs. sliding windows: Hopping windows are sometimes called “sliding windows” in other stream processing tools. Kafka Streams follows the terminology in academic literature, where the semantics of sliding windows are different to those of hopping windows.

The following code defines a hopping window with a size of 5 minutes and an advance interval of 1 minute:

import java.time.Duration;
import org.apache.kafka.streams.kstream.TimeWindows;

// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.
// The window's name -- the string parameter -- is used to e.g. name the backing state store.
Duration windowSize = Duration.ofMinutes(5);
Duration advance = Duration.ofMinutes(1);
TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

This diagram shows windowing a stream of data records with hopping windows. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

Hopping time windows are aligned to the epoch , with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries [0;5000),[3000;8000),... – and not [1000;6000),[4000;9000),... or even something “random” like [1452;6452),[4452;9452),....

Unlike non-windowed aggregates that we have seen previously, windowed aggregates return a windowed KTable whose keys type is Windowed<K>. This is to differentiate aggregate values with the same key from different windows. The corresponding window instance and the embedded key can be retrieved as Windowed#window() and Windowed#key(), respectively.

Tumbling time windows

Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.

This diagram shows windowing a stream of data records with tumbling windows. Windows do not overlap because, by definition, the advance interval is identical to the window size. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

Tumbling time windows are aligned to the epoch , with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... – and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....

The following code defines a tumbling window with a size of 5 minutes:

import java.time.Duration;
import org.apache.kafka.streams.kstream.TimeWindows;

// A tumbling time window with a size of 5 minutes (and, by definition, an implicit
// advance interval of 5 minutes), and grace period of 1 minute.
Duration windowSize = Duration.ofMinutes(5);
Duration gracePeriod = Duration.ofMinutes(1);
TimeWindows.ofSizeAndGrace(windowSize, gracePeriod);

// The above is equivalent to the following code:
TimeWindows.ofSizeAndGrace(windowSize, gracePeriod).advanceBy(windowSize);

Sliding time windows

Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows are used for join operations, specified by using the JoinWindows class, and windowed aggregations, specified by using the SlidingWindows class.

A sliding window models a fixed-size window that slides continuously over the time axis. In this model, two data records are said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is within the window size. As a sliding window moves along the time axis, records may fall into multiple snapshots of the sliding window, but each unique combination of records appears only in one sliding window snapshot.

The following code defines a sliding window with a time difference of 10 minutes and a grace period of 30 minutes:

import org.apache.kafka.streams.kstream.SlidingWindows;

// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes
Duration timeDifference = Duration.ofMinutes(10);
Duration gracePeriod = Duration.ofMinutes(30);
SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, gracePeriod);

This diagram shows windowing a stream of data records with sliding windows. The overlap of the sliding window snapshots varies depending on the record times. In this diagram, the time numbers represent milliseconds. For example, t=5 means “at the five millisecond mark”.

Sliding windows are aligned to the data record timestamps, not to the epoch. In contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are both inclusive.

Session Windows

Session windows are used to aggregate key-based events into so-called sessions , the process of which is referred to as sessionization. Sessions represent a period of activity separated by a defined gap of inactivity (or “idleness”). Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If an event falls outside of the session gap, then a new session will be created.

Session windows are different from the other window types in that:

  • all windows are tracked independently across keys - e.g. windows of different keys typically have different start and end times
  • their window sizes sizes vary - even windows for the same key typically have different sizes

The prime area of application for session windows is user behavior analysis. Session-based analyses can range from simple metrics (e.g. count of user visits on a news website or social platform) to more complex metrics (e.g. customer conversion funnel and event flows).

The following code defines a session window with an inactivity gap of 5 minutes:

import java.time.Duration;
import org.apache.kafka.streams.kstream.SessionWindows;

// A session window with an inactivity gap of 5 minutes.
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5));

Given the previous session window example, here’s what would happen on an input stream of six records. When the first three records arrive (upper part of in the diagram below), we’d have three sessions (see lower part) after having processed those records: two for the green record key, with one session starting and ending at the 0-minute mark (only due to the illustration it looks as if the session goes from 0 to 1), and another starting and ending at the 6-minute mark; and one session for the blue record key, starting and ending at the 2-minute mark.

Detected sessions after having received three input records: two records for the green record key at t=0 and t=6, and one record for the blue record key at t=2. In this diagram the time numbers represent minutes; e.g. t=5 means “at the five-minute mark”. In reality, the unit of time in Kafka Streams is milliseconds, which means the time numbers would need to be multiplied with 60 * 1,000 to convert from minutes to milliseconds (e.g. t=5 would become t=300,000).

If we then receive three additional records (including two out-of-order records), what would happen is that the two existing sessions for the green record key will be merged into a single session starting at time 0 and ending at time 6, consisting of a total of three records. The existing session for the blue record key will be extended to end at time 5, consisting of a total of two records. And, finally, there will be a new session for the blue key starting and ending at time 11.

Detected sessions after having received six input records. Note the two out-of-order data records at t=4 (green) and t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively.

Window Final Results

In Kafka Streams, windowed computations update their results continuously. As new data arrives for a window, freshly computed results are emitted downstream. For many applications, this is ideal, since fresh results are always available. and Kafka Streams is designed to make programming continuous computations seamless. However, some applications need to take action only on the final result of a windowed computation. Common examples of this are sending alerts or delivering results to a system that doesn’t support updates.

Suppose that you have an hourly windowed count of events per user. If you want to send an alert when a user has less than three events in an hour, you have a real challenge. All users would match this condition at first, until they accrue enough events, so you cannot simply send an alert when someone matches the condition; you have to wait until you know you won’t see any more events for a particular window and then send the alert.

Kafka Streams offers a clean way to define this logic: after defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.

For example:

KGroupedStream<UserId, Event> grouped = ...;
grouped
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream()
    .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

The key parts of this program are:

ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10)) The specified grace period of 10 minutes (i.e., the Duration.ofMinutes(10) argument) allows us to bound the lateness of events the window will accept. For example, the 09:00 to 10:00 window will accept out-of-order records until 10:10, at which point, the window is closed. .suppress(Suppressed.untilWindowCloses(...)) This configures the suppression operator to emit nothing for a window until it closes, and then emit the final result. For example, if user U gets 10 events between 09:00 and 10:10, the filter downstream of the suppression will get no events for the windowed key U@09:00-10:00 until 10:10, and then it will get exactly one with the value 10. This is the final result of the windowed count. unbounded() This configures the buffer used for storing events until their windows close. Production code is able to put a cap on the amount of memory to use for the buffer, but this simple example creates a buffer with no upper bound.

One thing to note is that suppression is just like any other Kafka Streams operator, so you can build a topology with two branches emerging from the count, one suppressed, and one not, or even multiple differently configured suppressions. This allows you to apply suppressions where they are needed and otherwise rely on the default continuous update behavior.

For more detailed information, see the JavaDoc on the Suppressed config object and KIP-328.

Applying processors (Processor API integration)

Beyond the aforementioned stateless and stateful transformations, you may also leverage the Processor API from the DSL. There are a number of scenarios where this may be helpful:

  • Customization: You need to implement special, customized logic that is not or not yet available in the DSL.
  • Combining ease-of-use with full flexibility where it’s needed: Even though you generally prefer to use the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a record’s metadata such as its topic, partition, and offset information. However, you don’t want to switch completely to the Processor API just because of that; and
  • Migrating from other tools: You are migrating from other stream processing technologies that provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to migrate completely to the DSL right away.

Operations and concepts

  • KStream#process: Process all records in a stream, one record at a time, by applying a Processor (provided by a given ProcessorSupplier);
  • KStream#processValues: Process all records in a stream, one record at a time, by applying a FixedKeyProcessor (provided by a given FixedKeyProcessorSupplier);
  • Processor: A processor of key-value pair records;
  • ContextualProcessor: An abstract implementation of Processor that manages the ProcessorContext instance.
  • FixedKeyProcessor: A processor of key-value pair records where keys are immutable;
  • ContextualFixedKeyProcessor: An abstract implementation of FixedKeyProcessor that manages the FixedKeyProcessorContext instance.
  • ProcessorSupplier: A processor supplier that can create one or more Processor instances; and
  • FixedKeyProcessorSupplier: A processor supplier that can create one or more FixedKeyProcessor instances.

Examples

Follow the examples below to learn how to apply process and processValues to your KStream.

ExampleOperationState Type
Categorizing Logs by SeverityprocessStateless
Replacing Slang in Text MessagesprocessValuesStateless
Cumulative Discounts for a Loyalty ProgramprocessStateful
Traffic Radar Monitoring Car CountprocessValuesStateful

Categorizing Logs by Severity

  • Idea: You have a stream of log messages. Each message contains a severity level (e.g., INFO, WARN, ERROR) in the value. The processor filters messages, routing ERROR messages to a dedicated topic and discarding INFO messages. The rest (WARN) are forwarded to a dedicated topic too.

  • Real-World Context: In a production monitoring system, categorizing logs by severity ensures ERROR logs are sent to a critical incident management system, WARN logs are analyzed for potential risks, and INFO logs are stored for basic reporting purposes.

    public class CategorizingLogsBySeverityExample { private static final String ERROR_LOGS_TOPIC = “error-logs-topic”; private static final String INPUT_LOGS_TOPIC = “input-logs-topic”; private static final String UNKNOWN_LOGS_TOPIC = “unknown-logs-topic”; private static final String WARN_LOGS_TOPIC = “warn-logs-topic”;

    public static void categorizeWithProcess(final StreamsBuilder builder) {
        final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC);
        logStream.process(LogSeverityProcessor::new)
                .to((key, value, recordContext) -> {
                    // Determine the target topic dynamically
                    if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
                    if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
                    return UNKNOWN_LOGS_TOPIC;
                });
    }
    
    private static class LogSeverityProcessor extends ContextualProcessor<String, String, String, String> {
        @Override
        public void process(final Record<String, String> record) {
            if (record.value() == null) {
                return; // Skip null values
            }
    
            // Assume the severity is the first word in the log message
            // For example: "ERROR: Disk not found" -> "ERROR"
            final int colonIndex = record.value().indexOf(':');
            final String severity = colonIndex > 0 ? record.value().substring(0, colonIndex).trim() : "UNKNOWN";
    
            // Route logs based on severity
            switch (severity) {
                case "ERROR":
                    context().forward(record.withKey(ERROR_LOGS_TOPIC));
                    break;
                case "WARN":
                    context().forward(record.withKey(WARN_LOGS_TOPIC));
                    break;
                case "INFO":
                    // INFO logs are ignored
                    break;
                default:
                    // Forward to an "unknown" topic for logs with unrecognized severities
                    context().forward(record.withKey(UNKNOWN_LOGS_TOPIC));
            }
        }
    }
    

    }

Replacing Slang in Text Messages

  • Idea: A messaging stream contains user-generated content, and you want to replace slang words with their formal equivalents (e.g., “u” becomes “you”, “brb” becomes “be right back”). The operation only modifies the message value and keeps the key intact.

  • Real-World Context: In customer support chat systems, normalizing text by replacing slang with formal equivalents ensures that automated sentiment analysis tools work accurately and provide reliable insights.

    public class ReplacingSlangTextInMessagesExample { private static final Map<String, String> SLANG_DICTIONARY = Map.of( “u”, “you”, “brb”, “be right back”, “omg”, “oh my god”, “btw”, “by the way” ); private static final String INPUT_MESSAGES_TOPIC = “input-messages-topic”; private static final String OUTPUT_MESSAGES_TOPIC = “output-messages-topic”;

    public static void replaceWithProcessValues(final StreamsBuilder builder) {
        KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
        messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
    }
    
    private static class SlangReplacementProcessor extends ContextualFixedKeyProcessor<String, String, String> {
        @Override
        public void process(final FixedKeyRecord<String, String> record) {
            if (record.value() == null) {
                return; // Skip null values
            }
    
            // Replace slang words in the message
            final String[] words = record.value().split("\s+");
            for (final String word : words) {
                String replacedWord = SLANG_DICTIONARY.getOrDefault(word, word);
                context().forward(record.withValue(replacedWord));
            }
        }
    }
    

    }

Cumulative Discounts for a Loyalty Program

  • Idea: A stream of purchase events contains user IDs and transaction amounts. Use a state store to accumulate the total spending of each user. When their total crosses a threshold, apply a discount on their next transaction and update their accumulated total.

  • Real-World Context: In a retail loyalty program, tracking cumulative customer spending enables dynamic rewards, such as issuing a discount when a customer’s total purchases exceed a predefined limit.

    public class CumulativeDiscountsForALoyaltyProgramExample { private static final double DISCOUNT_THRESHOLD = 100.0; private static final String CUSTOMER_SPENDING_STORE = “customer-spending-store”; private static final String DISCOUNT_NOTIFICATION_MESSAGE = “Discount applied! You have received a reward for your purchases.”; private static final String DISCOUNT_NOTIFICATIONS_TOPIC = “discount-notifications-topic”; private static final String PURCHASE_EVENTS_TOPIC = “purchase-events-topic”;

    public static void applyDiscountWithProcess(final StreamsBuilder builder) {
        // Define the state store for tracking cumulative spending
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
                        Serdes.String(),
                        Serdes.Double()
                )
        );
        final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC);
        // Apply the Processor with the state store
        final KStream<String, String> notificationStream =
                purchaseStream.process(CumulativeDiscountProcessor::new, CUSTOMER_SPENDING_STORE);
        // Send the notifications to the output topic
        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
    }
    
    private static class CumulativeDiscountProcessor implements Processor<String, Double, String, String> {
        private KeyValueStore<String, Double> spendingStore;
        private ProcessorContext<String, String> context;
    
        @Override
        public void init(final ProcessorContext<String, String> context) {
            this.context = context;
            // Retrieve the state store for cumulative spending
            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
        }
    
        @Override
        public void process(final Record<String, Double> record) {
            if (record.value() == null) {
                return; // Skip null purchase amounts
            }
    
            // Get the current spending total for the customer
            Double currentSpending = spendingStore.get(record.key());
            if (currentSpending == null) {
                currentSpending = 0.0;
            }
            // Update the cumulative spending
            currentSpending += record.value();
            spendingStore.put(record.key(), currentSpending);
    
            // Check if the customer qualifies for a discount
            if (currentSpending >= DISCOUNT_THRESHOLD) {
                // Reset the spending after applying the discount
                spendingStore.put(record.key(), currentSpending - DISCOUNT_THRESHOLD);
                // Send a discount notification
                context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE));
            }
        }
    }
    

    }

Traffic Radar Monitoring Car Count

  • Idea: A radar monitors cars passing along a road stretch. A system counts the cars for each day, maintaining a cumulative total for the current day in a state store. At the end of the day, the count is emitted and the state is cleared for the next day.

  • Real-World Context: A car counting system can be useful for determining measures for widening or controlling traffic depending on the number of cars passing through the monitored stretch.

    public class TrafficRadarMonitoringCarCountExample { private static final String DAILY_COUNT_STORE = “price-state-store”; private static final String DAILY_COUNT_TOPIC = “price-state-topic”; private static final String RADAR_COUNT_TOPIC = “car-radar-topic”;

    public static void countWithProcessValues(final StreamsBuilder builder) {
        // Define a state store for tracking daily car counts
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
                        Serdes.String(),
                        Serdes.Long()
                )
        );
        final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC);
        // Apply the FixedKeyProcessor with the state store
        radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
                .to(DAILY_COUNT_TOPIC);
    }
    
    private static class DailyCarCountProcessor implements FixedKeyProcessor<Void, String, String> {
        private FixedKeyProcessorContext<Void, String> context;
        private KeyValueStore<String, Long> stateStore;
        private static final DateTimeFormatter DATE_FORMATTER =
                DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
    
        @Override
        public void init(final FixedKeyProcessorContext<Void, String> context) {
            this.context = context;
            stateStore = context.getStateStore(DAILY_COUNT_STORE);
        }
    
        @Override
        public void process(final FixedKeyRecord<Void, String> record) {
            if (record.value() == null) {
                return; // Skip null events
            }
    
            // Derive the current day from the event timestamp
            final long timestamp = System.currentTimeMillis(); // Use system time for simplicity
            final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
            // Retrieve the current count for the day
            Long dailyCount = stateStore.get(currentDay);
            if (dailyCount == null) {
                dailyCount = 0L;
            }
            // Increment the count
            dailyCount++;
            stateStore.put(currentDay, dailyCount);
    
            // Emit the current day's count
            context.forward(record.withValue(String.format("Day: %s, Car Count: %s", currentDay, dailyCount)));
        }
    }
    

    }

Keynotes

  • Type Safety and Flexibility: The process and processValues APIs utilize ProcessorContext and Record or FixedKeyRecord objects for better type safety and flexibility of custom processing logic.
  • Clear State and Logic Management: Implementations for Processor or FixedKeyProcessor should manage state and logic clearly. Use context().forward() for emitting records downstream.
  • Unified API: Consolidates multiple methods into a single, versatile API.
  • Future-Proof: Ensures compatibility with the latest Kafka Streams releases.

Transformers removal and migration to processors

As of Kafka 4.0, several deprecated methods in the Kafka Streams API, such as transform, flatTransform, transformValues, flatTransformValues, and process have been removed. These methods have been replaced with the more versatile Processor API. This guide provides detailed steps for migrating existing code to use the new Processor API and explains the benefits of the changes.

The following deprecated methods are no longer available in Kafka Streams:

  • KStream#transform
  • KStream#flatTransform
  • KStream#transformValues
  • KStream#flatTransformValues
  • KStream#process

The Processor API now serves as a unified replacement for all these methods. It simplifies the API surface while maintaining support for both stateless and stateful operations.

Migration Examples

To migrate from the deprecated transform, transformValues, flatTransform, and flatTransformValues methods to the Processor API (PAPI) in Kafka Streams, let’s resume the previouss examples. The new process and processValues methods enable a more flexible and reusable approach by requiring implementations of the Processor or FixedKeyProcessor interfaces.

ExampleMigrating fromMigrating toState Type
Categorizing Logs by SeverityflatTransformprocessStateless
Replacing Slang in Text MessagesflatTransformValuesprocessValuesStateless
Cumulative Discounts for a Loyalty ProgramtransformprocessStateful
Traffic Radar Monitoring Car CounttransformValuesprocessValuesStateful

Categorizing Logs by Severity

Below, methods categorizeWithFlatTransform and categorizeWithProcess show how you can migrate from flatTransform to process.

public class CategorizingLogsBySeverityExample {
    private static final String ERROR_LOGS_TOPIC = "error-logs-topic";
    private static final String INPUT_LOGS_TOPIC = "input-logs-topic";
    private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic";
    private static final String WARN_LOGS_TOPIC = "warn-logs-topic";

    public static void categorizeWithFlatTransform(final StreamsBuilder builder) {
        final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC);
        logStream.flatTransform(LogSeverityTransformer::new)
                .to((key, value, recordContext) -> {
                    // Determine the target topic dynamically
                    if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
                    if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
                    return UNKNOWN_LOGS_TOPIC;
                });
    }

    public static void categorizeWithProcess(final StreamsBuilder builder) {
        final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC);
        logStream.process(LogSeverityProcessor::new)
                .to((key, value, recordContext) -> {
                    // Determine the target topic dynamically
                    if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
                    if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
                    return UNKNOWN_LOGS_TOPIC;
                });
    }

    private static class LogSeverityTransformer implements Transformer<String, String, Iterable<KeyValue<String, String>>> {
        @Override
        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
        }

        @Override
        public Iterable<KeyValue<String, String>> transform(String key, String value) {
            if (value == null) {
                return Collections.emptyList(); // Skip null values
            }

            // Assume the severity is the first word in the log message
            // For example: "ERROR: Disk not found" -> "ERROR"
            int colonIndex = value.indexOf(':');
            String severity = colonIndex > 0 ? value.substring(0, colonIndex).trim() : "UNKNOWN";

            // Create appropriate KeyValue pair based on severity
            return switch (severity) {
                case "ERROR" -> List.of(new KeyValue<>("ERROR", value));
                case "WARN" -> List.of(new KeyValue<>("WARN", value));
                case "INFO" -> Collections.emptyList(); // INFO logs are ignored
                default -> List.of(new KeyValue<>("UNKNOWN", value));
            };
        }

        @Override
        public void close() {
        }
    }

    private static class LogSeverityProcessor extends ContextualProcessor<String, String, String, String> {
        @Override
        public void process(final Record<String, String> record) {
            if (record.value() == null) {
                return; // Skip null values
            }

            // Assume the severity is the first word in the log message
            // For example: "ERROR: Disk not found" -> "ERROR"
            final int colonIndex = record.value().indexOf(':');
            final String severity = colonIndex > 0 ? record.value().substring(0, colonIndex).trim() : "UNKNOWN";

            // Route logs based on severity
            switch (severity) {
                case "ERROR":
                    context().forward(record.withKey(ERROR_LOGS_TOPIC));
                    break;
                case "WARN":
                    context().forward(record.withKey(WARN_LOGS_TOPIC));
                    break;
                case "INFO":
                    // INFO logs are ignored
                    break;
                default:
                    // Forward to an "unknown" topic for logs with unrecognized severities
                    context().forward(record.withKey(UNKNOWN_LOGS_TOPIC));
            }
        }
    }
}

Replacing Slang in Text Messages

Below, methods replaceWithFlatTransformValues and replaceWithProcessValues show how you can migrate from flatTransformValues to processValues.

public class ReplacingSlangTextInMessagesExample {
    private static final Map<String, String> SLANG_DICTIONARY = Map.of(
            "u", "you",
            "brb", "be right back",
            "omg", "oh my god",
            "btw", "by the way"
    );
    private static final String INPUT_MESSAGES_TOPIC = "input-messages-topic";
    private static final String OUTPUT_MESSAGES_TOPIC = "output-messages-topic";

    public static void replaceWithFlatTransformValues(final StreamsBuilder builder) {
        KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
        messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
    }

    public static void replaceWithProcessValues(final StreamsBuilder builder) {
        KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
        messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
    }

    private static class SlangReplacementTransformer implements ValueTransformer<String, Iterable<String>> {

        @Override
        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
        }

        @Override
        public Iterable<String> transform(final String value) {
            if (value == null) {
                return Collections.emptyList(); // Skip null values
            }

            // Replace slang words in the message
            final String[] words = value.split("\s+");
            return Arrays.asList(
                    Arrays.stream(words)
                            .map(word -> SLANG_DICTIONARY.getOrDefault(word, word))
                            .toArray(String[]::new)
            );
        }

        @Override
        public void close() {
        }
    }

    private static class SlangReplacementProcessor extends ContextualFixedKeyProcessor<String, String, String> {
        @Override
        public void process(final FixedKeyRecord<String, String> record) {
            if (record.value() == null) {
                return; // Skip null values
            }

            // Replace slang words in the message
            final String[] words = record.value().split("\s+");
            for (final String word : words) {
                String replacedWord = SLANG_DICTIONARY.getOrDefault(word, word);
                context().forward(record.withValue(replacedWord));
            }
        }
    }
}

Cumulative Discounts for a Loyalty Program

public class CumulativeDiscountsForALoyaltyProgramExample {
    private static final double DISCOUNT_THRESHOLD = 100.0;
    private static final String CUSTOMER_SPENDING_STORE = "customer-spending-store";
    private static final String DISCOUNT_NOTIFICATION_MESSAGE =
            "Discount applied! You have received a reward for your purchases.";
    private static final String DISCOUNT_NOTIFICATIONS_TOPIC = "discount-notifications-topic";
    private static final String PURCHASE_EVENTS_TOPIC = "purchase-events-topic";

    public static void applyDiscountWithTransform(final StreamsBuilder builder) {
        // Define the state store for tracking cumulative spending
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
                        Serdes.String(),
                        Serdes.Double()
                )
        );
        final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC);
        // Apply the Transformer with the state store
        final KStream<String, String> notificationStream =
                purchaseStream.transform(CumulativeDiscountTransformer::new, CUSTOMER_SPENDING_STORE);
        // Send the notifications to the output topic
        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
    }

    public static void applyDiscountWithProcess(final StreamsBuilder builder) {
        // Define the state store for tracking cumulative spending
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
                        org.apache.kafka.common.serialization.Serdes.String(),
                        org.apache.kafka.common.serialization.Serdes.Double()
                )
        );
        final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC);
        // Apply the Processor with the state store
        final KStream<String, String> notificationStream =
                purchaseStream.process(CumulativeDiscountProcessor::new, CUSTOMER_SPENDING_STORE);
        // Send the notifications to the output topic
        notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
    }

    private static class CumulativeDiscountTransformer implements Transformer<String, Double, KeyValue<String, String>> {
        private KeyValueStore<String, Double> spendingStore;

        @Override
        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
            // Retrieve the state store for cumulative spending
            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
        }

        @Override
        public KeyValue<String, String> transform(final String key, final Double value) {
            if (value == null) {
                return null; // Skip null purchase amounts
            }

            // Get the current spending total for the customer
            Double currentSpending = spendingStore.get(key);
            if (currentSpending == null) {
                currentSpending = 0.0;
            }
            // Update the cumulative spending
            currentSpending += value;
            spendingStore.put(key, currentSpending);

            // Check if the customer qualifies for a discount
            if (currentSpending >= DISCOUNT_THRESHOLD) {
                // Reset the spending after applying the discount
                spendingStore.put(key, currentSpending - DISCOUNT_THRESHOLD);
                // Return a notification message
                return new KeyValue<>(key, DISCOUNT_NOTIFICATION_MESSAGE);
            }
            return null; // No discount, so no output for this record
        }

        @Override
        public void close() {
        }
    }

    private static class CumulativeDiscountProcessor implements Processor<String, Double, String, String> {
        private KeyValueStore<String, Double> spendingStore;
        private ProcessorContext<String, String> context;

        @Override
        public void init(final ProcessorContext<String, String> context) {
            this.context = context;
            // Retrieve the state store for cumulative spending
            spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
        }

        @Override
        public void process(final Record<String, Double> record) {
            if (record.value() == null) {
                return; // Skip null purchase amounts
            }

            // Get the current spending total for the customer
            Double currentSpending = spendingStore.get(record.key());
            if (currentSpending == null) {
                currentSpending = 0.0;
            }
            // Update the cumulative spending
            currentSpending += record.value();
            spendingStore.put(record.key(), currentSpending);

            // Check if the customer qualifies for a discount
            if (currentSpending >= DISCOUNT_THRESHOLD) {
                // Reset the spending after applying the discount
                spendingStore.put(record.key(), currentSpending - DISCOUNT_THRESHOLD);
                // Send a discount notification
                context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE));
            }
        }
    }
}

Traffic Radar Monitoring Car Count

Below, methods countWithTransformValues and countWithProcessValues show how you can migrate from transformValues to processValues.

public class TrafficRadarMonitoringCarCountExample {
    private static final String DAILY_COUNT_STORE = "price-state-store";
    private static final String DAILY_COUNT_TOPIC = "price-state-topic";
    private static final String RADAR_COUNT_TOPIC = "car-radar-topic";

    public static void countWithTransformValues(final StreamsBuilder builder) {
        // Define a state store for tracking daily car counts
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
                        org.apache.kafka.common.serialization.Serdes.String(),
                        org.apache.kafka.common.serialization.Serdes.Long()
                )
        );
        final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC);
        // Apply the ValueTransformer with the state store
        radarStream.transformValues(DailyCarCountTransformer::new, DAILY_COUNT_STORE)
                .to(DAILY_COUNT_TOPIC);
    }

    public static void countWithProcessValues(final StreamsBuilder builder) {
        // Define a state store for tracking daily car counts
        builder.addStateStore(
                Stores.keyValueStoreBuilder(
                        Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
                        org.apache.kafka.common.serialization.Serdes.String(),
                        org.apache.kafka.common.serialization.Serdes.Long()
                )
        );
        final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC);
        // Apply the FixedKeyProcessor with the state store
        radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
                .to(DAILY_COUNT_TOPIC);
    }

    private static class DailyCarCountTransformer implements ValueTransformerWithKey<Void, String, String> {
        private KeyValueStore<String, Long> stateStore;
        private static final DateTimeFormatter DATE_FORMATTER =
                DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());

        @Override
        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
            // Access the state store
            stateStore = context.getStateStore(DAILY_COUNT_STORE);
        }

        @Override
        public String transform(Void readOnlyKey, String value) {
            if (value == null) {
                return null; // Skip null events
            }

            // Derive the current day from the event timestamp
            final long timestamp = System.currentTimeMillis(); // Use system time for simplicity
            final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
            // Retrieve the current count for the day
            Long dailyCount = stateStore.get(currentDay);
            if (dailyCount == null) {
                dailyCount = 0L;
            }
            // Increment the count
            dailyCount++;
            stateStore.put(currentDay, dailyCount);

            // Return the current day's count
            return String.format("Day: %s, Car Count: %s", currentDay, dailyCount);
        }

        @Override
        public void close() {
        }
    }

    private static class DailyCarCountProcessor implements FixedKeyProcessor<Void, String, String> {
        private FixedKeyProcessorContext<Void, String> context;
        private KeyValueStore<String, Long> stateStore;
        private static final DateTimeFormatter DATE_FORMATTER =
                DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());

        @Override
        public void init(final FixedKeyProcessorContext<Void, String> context) {
            this.context = context;
            stateStore = context.getStateStore(DAILY_COUNT_STORE);
        }

        @Override
        public void process(final FixedKeyRecord<Void, String> record) {
            if (record.value() == null) {
                return; // Skip null events
            }

            // Derive the current day from the event timestamp
            final long timestamp = System.currentTimeMillis(); // Use system time for simplicity
            final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
            // Retrieve the current count for the day
            Long dailyCount = stateStore.get(currentDay);
            if (dailyCount == null) {
                dailyCount = 0L;
            }
            // Increment the count
            dailyCount++;
            stateStore.put(currentDay, dailyCount);

            // Emit the current day's count
            context.forward(record.withValue(String.format("Day: %s, Car Count: %s", currentDay, dailyCount)));
        }
    }
}

Keynotes

  • Type Safety and Flexibility: The process and processValues APIs utilize ProcessorContext and Record or FixedKeyRecord objects for better type safety and flexibility of custom processing logic.
  • Clear State and Logic Management: Implementations for Processor or FixedKeyProcessor should manage state and logic clearly. Use context().forward() for emitting records downstream.
  • Unified API: Consolidates multiple methods into a single, versatile API.
  • Future-Proof: Ensures compatibility with the latest Kafka Streams releases.

Removal of Old process Method

It is worth mentioning that, in addition to the methods mentioned above, the process method, which integrated the ‘old’ Processor API (i.e., Processor as opposed to the new api.Processor) into the DSL, has also been removed. The following example shows how to migrate to the new process.

Example

  • Idea: The system monitors page views for a website in real-time. When a page reaches a predefined popularity threshold (e.g., 1000 views), the system automatically sends an email alert to the site administrator or marketing team to notify them of the page’s success. This helps teams quickly identify high-performing content and act on it, such as promoting the page further or analyzing the traffic source.

  • Real-World Context: In a content management system (CMS) for a news or blogging platform, it’s crucial to track the popularity of articles or posts. For example:

    • Marketing Teams: Use the notification to highlight trending content on social media or email newsletters.
    • Operations Teams: Use the alert to ensure the site can handle increased traffic for popular pages.
    • Ad Managers: Identify pages where additional ad placements might maximize revenue.

By automating the detection of popular pages, the system eliminates the need for manual monitoring and ensures timely actions to capitalize on the content’s performance.

public class PopularPageEmailAlertExample {
    private static final String ALERTS_EMAIL = "alerts@yourcompany.com";
    private static final String PAGE_VIEWS_TOPIC = "page-views-topic";

    public static void alertWithOldProcess(StreamsBuilder builder) {
        KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
        // Filter pages with exactly 1000 views and process them using the old API
        pageViews.filter((pageId, viewCount) -> viewCount == 1000)
                .process(PopularPageEmailAlertOld::new);
    }

    public static void alertWithNewProcess(StreamsBuilder builder) {
        KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
        // Filter pages with exactly 1000 views and process them using the new API
        pageViews.filter((pageId, viewCount) -> viewCount == 1000)
                .process(PopularPageEmailAlertNew::new);
    }

    private static class PopularPageEmailAlertOld extends AbstractProcessor<String, Long> {
        @Override
        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
            super.init(context);
            System.out.println("Initialized email client for: " + ALERTS_EMAIL);
        }

        @Override
        public void process(String key, Long value) {
            if (value == null) return;

            if (value == 1000) {
                // Send an email alert
                System.out.printf("ALERT (Old API): Page %s has reached 1000 views. Sending email to %s%n", key, ALERTS_EMAIL);
            }
        }

        @Override
        public void close() {
            System.out.println("Tearing down email client for: " + ALERTS_EMAIL);
        }
    }

    private static class PopularPageEmailAlertNew implements Processor<String, Long, Void, Void> {
        @Override
        public void init(ProcessorContext<Void, Void> context) {
            System.out.println("Initialized email client for: " + ALERTS_EMAIL);
        }

        @Override
        public void process(Record<String, Long> record) {
            if (record.value() == null) return;

            if (record.value() == 1000) {
                // Send an email alert
                System.out.printf("ALERT (New API): Page %s has reached 1000 views. Sending email to %s%n", record.key(), ALERTS_EMAIL);
            }
        }

        @Override
        public void close() {
            System.out.println("Tearing down email client for: " + ALERTS_EMAIL);
        }
    }
}

Naming Operators in a Streams DSL application Kafka Streams allows you to name processors created via the Streams DSL

Controlling KTable emit rate

A KTable is logically a continuously updated table. These updates make their way to downstream operators whenever new data is available, ensuring that the whole computation is as fresh as possible. Logically speaking, most programs describe a series of transformations, and the update rate is not a factor in the program behavior. In these cases, the rate of update is more of a performance concern. Operators are able to optimize both the network traffic (to the Kafka brokers) and the disk traffic (to the local state stores) by adjusting commit interval and batch size configurations.

However, some applications need to take other actions, such as calling out to external systems, and therefore need to exercise some control over the rate of invocations, for example of KStream#foreach.

Rather than achieving this as a side-effect of the KTable record cache, you can directly impose a rate limit via the KTable#suppress operator.

For example:

KGroupedTable<String, String> groupedTable = ...;
groupedTable
    .count()
    .suppress(untilTimeLimit(ofMinutes(5), maxBytes(1_000_000L).emitEarlyWhenFull()))
    .toStream()
    .foreach((key, count) -> updateCountsDatabase(key, count));

This configuration ensures that updateCountsDatabase gets events for each key no more than once every 5 minutes. Note that the latest state for each key has to be buffered in memory for that 5-minute period. You have the option to control the maximum amount of memory to use for this buffer (in this case, 1MB). There is also an option to impose a limit in terms of number of records (or to leave both limits unspecified).

Additionally, it is possible to choose what happens if the buffer fills up. This example takes a relaxed approach and just emits the oldest records before their 5-minute time limit to bring the buffer back down to size. Alternatively, you can choose to stop processing and shut the application down. This may seem extreme, but it gives you a guarantee that the 5-minute time limit will be absolutely enforced. After the application shuts down, you could allocate more memory for the buffer and resume processing. Emitting early is preferable for most applications.

For more detailed information, see the JavaDoc on the Suppressed config object and KIP-328.

Using timestamp-based semantics for table processors

By default, tables in Kafka Streams use offset-based semantics. When multiple records arrive for the same key, the one with the largest record offset is considered the latest record for the key, and is the record that appears in aggregation and join results computed on the table. This is true even in the event of out-of-order data. The record with the largest offset is considered to be the latest record for the key, even if this record does not have the largest timestamp.

An alternative to offset-based semantics is timestamp-based semantics. With timestamp-based semantics, the record with the largest timestamp is considered the latest record, even if there is another record with a larger offset (and smaller timestamp). If there is no out-of-order data (per key), then offset-based semantics and timestamp-based semantics are equivalent; the difference only appears when there is out-of-order data.

Starting with Kafka Streams 3.5, Kafka Streams supports timestamp-based semantics through the use of versioned state stores. When a table is materialized with a versioned state store, it is a versioned table and will result in different processor semantics in the presence of out-of-order data.

  • When performing a stream-table join, stream-side records will join with the latest-by-timestamp table record which has a timestamp less than or equal to the stream record’s timestamp. This is in contrast to joining a stream to an unversioned table, in which case the latest-by-offset table record will be joined, even if the stream-side record is out-of-order and has a lower timestamp.
  • Aggregations computed on the table will include the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order updates (per key) will not trigger a new aggregation result. This is true for count and reduce operations as well, in addition to aggregate operations.
  • Table joins will use the latest-by-timestamp record for each key, instead of the latest-by-offset record. Out-of-order updates (per key) will not trigger a new join result. This is true for both primary-key table-table joins and also foreign-key table-table joins. If a versioned table is joined with an unversioned table, the result will be the join of the latest-by-timestamp record from the versioned table with the latest-by-offset record from the unversioned table.
  • Table filter operations will no longer suppress consecutive tombstones, so users may observe more null records downstream of the filter than compared to when filtering an unversioned table. This is done in order to preserve a complete version history downstream, in the event of out-of-order data.
  • suppress operations are not allowed on versioned tables, as this would collapse the version history and lead to undefined behavior.

Once a table is materialized with a versioned store, downstream tables are also considered versioned until any of the following occurs:

  • A downstream table is explicitly materialized, either with an unversioned store supplier or with no store supplier (all stores are unversioned by default, including the default store supplier)
  • Any stateful transformation occurs, including aggregations and joins
  • A table is converted to a stream and back.

The results of certain processors should not be materialized with versioned stores, as these processors do not produce a complete older version history, and therefore materialization as a versioned table would lead to unpredictable results:

  • Aggregate processors, for both table and stream aggregations. This includes aggregate, count and reduce operations.
  • Table-table join processors, including both primary-key and foreign-key joins.

For more on versioned stores and how to start using them in your application, see here.

Writing streams back to Kafka

Any streams and tables may be (continuously) written back to a Kafka topic. As we will describe in more detail below, the output data might be re-partitioned on its way to Kafka, depending on the situation.

Writing to KafkaDescription
To
  • KStream -> void

| Terminal operation. Write the records to Kafka topic(s). (KStream details) When to provide serdes explicitly:

  • If you do not specify Serdes explicitly, the default Serdes from the configuration are used.
  • You must specify Serdes explicitly via the Produced class if the key and/or value types of the KStream do not match the configured default Serdes.
  • See Data Types and Serialization for information about configuring default Serdes, available Serdes, and implementing your own custom Serdes.

A variant of to exists that enables you to specify how the data is produced by using a Produced instance to specify, for example, a StreamPartitioner that gives you control over how output records are distributed across the partitions of the output topic. Another variant of to exists that enables you to dynamically choose which topic to send to for each record via a TopicNameExtractor instance.

KStream<String, Long> stream = ...;

// Write the stream to the output topic, using the configured default key
// and value serdes.
stream.to("my-stream-output-topic");

// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults in the config properties).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long());

Causes data re-partitioning if any of the following conditions is true:

  1. If the output topic has a different number of partitions than the stream/table.
  2. If the KStream was marked for re-partitioning.
  3. If you provide a custom StreamPartitioner to explicitly control how to distribute the output records across the partitions of the output topic.
  4. If the key of an output record is null.

Note

When you want to write to systems other than Kafka: Besides writing the data back to Kafka, you can also apply a custom processor as a stream sink at the end of the processing to, for example, write to external databases. First, doing so is not a recommended pattern - we strongly suggest to use the Kafka Connect API instead. However, if you do use such a sink processor, please be aware that it is now your responsibility to guarantee message delivery semantics when talking to such external systems (e.g., to retry on delivery failure or to prevent message duplication).

Testing a Streams application

Kafka Streams comes with a test-utils module to help you test your application here.

Kafka Streams DSL for Scala

The Kafka Streams DSL Java APIs are based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs. These APIs can be called from Scala, but there are several issues:

  1. Additional type annotations - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.
  2. Verbosity - In some cases the Java APIs appear too verbose compared to idiomatic Scala.
  3. Type Unsafety - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the Serdes defined as part of config are not type checked during compile time. Hence any missing Serdes can result in runtime errors.

The Kafka Streams DSL for Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL that addresses the concerns raised above. It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates.

The library wraps Java Stream DSL APIs in Scala thereby providing:

  1. Better type inference in Scala.
  2. Less boilerplate in application code.
  3. The usual builder-style composition that developers get with the original Java API.
  4. Implicit serializers and de-serializers leading to better abstraction and less verbosity.
  5. Better type safety during compile time.

All functionality provided by Kafka Streams DSL for Scala are under the root package name of org.apache.kafka.streams.scala.

Many of the public facing types from the Java API are wrapped. The following Scala abstractions are available to the user:

  • org.apache.kafka.streams.scala.StreamsBuilder
  • org.apache.kafka.streams.scala.kstream.KStream
  • org.apache.kafka.streams.scala.kstream.KTable
  • org.apache.kafka.streams.scala.kstream.KGroupedStream
  • org.apache.kafka.streams.scala.kstream.KGroupedTable
  • org.apache.kafka.streams.scala.kstream.SessionWindowedKStream
  • org.apache.kafka.streams.scala.kstream.TimeWindowedKStream

The library also has several utility abstractions and modules that the user needs to use for proper semantics.

  • org.apache.kafka.streams.scala.ImplicitConversions: Module that brings into scope the implicit conversions between the Scala and Java classes.
  • org.apache.kafka.streams.scala.serialization.Serdes: Module that contains all primitive Serdes that can be imported as implicits and a helper to create custom Serdes.

The library is cross-built with Scala 2.12 and 2.13. To reference the library compiled against Scala 2.13 include the following in your maven pom.xml add the following:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams-scala_2.13</artifactId>
  <version>4.0.0</version>
</dependency>

To use the library compiled against Scala 2.12 replace the artifactId with kafka-streams-scala_2.12.

When using SBT then you can reference the correct library using the following:

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "4.0.0"

Sample Usage

The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction, but they reside in a different package of the library e.g. the Scala class org.apache.kafka.streams.scala.StreamsBuilder is a wrapper around org.apache.kafka.streams.StreamsBuilder, org.apache.kafka.streams.scala.kstream.KStream is a wrapper around org.apache.kafka.streams.kstream.KStream, and so on.

Here’s an example of the classic WordCount program that uses the Scala StreamsBuilder that builds an instance of KStream which is a wrapper around Java KStream. Then we reify to a table and get a KTable, which, again is a wrapper around Java KTable.

The net result is that the following code is structured just like using the Java API, but with Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage is more obvious when given an example. Below is an example WordCount implementation that will be used to demonstrate the differences between the Scala and Java API.

import java.time.Duration
import java.util.Properties

import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

object WordCountApplication extends App {
  import Serdes._

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\W+"))
    .groupBy((_, word) => word)
    .count(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(Duration.ofSeconds(10))
  }
}

In the above code snippet, we don’t have to provide any Serdes, Grouped, Produced, Consumed or Joined explicitly. They will also not be dependent on any Serdes specified in the config. In fact all Serdes specified in the config will be ignored by the Scala APIs. All Serdes and Grouped, Produced, Consumed or Joined will be handled through implicit Serdes as discussed later in the Implicit Serdes section. The complete independence from configuration based Serdes is what makes this library completely typesafe. Any missing instances of Serdes, Grouped, Produced, Consumed or Joined will be flagged as a compile time error.

Implicit Serdes

One of the common complaints of Scala users with the Java API has been the repetitive usage of the Serdes in API invocations. Many of the APIs need to take the Serdes through abstractions like Grouped, Produced, Repartitioned, Consumed or Joined. And the user has to supply them every time through the with function of these classes.

The library uses the power of Scala implicit parameters to alleviate this concern. As a user you can provide implicit Serdes or implicit values of Grouped, Produced, Repartitioned, Consumed or Joined once and make your code less verbose. In fact you can just have the implicit Serdes in scope and the library will make the instances of Grouped, Produced, Consumed or Joined available in scope.

The library also bundles all implicit Serdes of the commonly used primitive types in a Scala module - so just import the module vals and have all Serdes in scope. A similar strategy of modular implicits can be adopted for any user-defined Serdes as well (User-defined Serdes are discussed in the next section).

Here’s an example:

// DefaultSerdes brings into scope implicit Serdes (mostly for primitives)
// that will set up all Grouped, Produced, Consumed and Joined instances.
// So all APIs below that accept Grouped, Produced, Consumed or Joined will
// get these instances automatically
import Serdes._

val builder = new StreamsBuilder()

val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)

val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)

// The following code fragment does not have a single instance of Grouped,
// Produced, Consumed or Joined supplied explicitly.
// All of them are taken care of by the implicit Serdes imported by DefaultSerdes
val clicksPerRegion: KTable[String, Long] =
  userClicksStream
    .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
    .map((_, regionWithClicks) => regionWithClicks)
    .groupByKey
    .reduce(_ + _)

clicksPerRegion.toStream.to(outputTopic)

Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:

  1. The code snippet does not depend on any config defined Serdes. In fact any Serdes defined as part of the config will be ignored.
  2. All Serdes are picked up from the implicits in scope. And import Serdes._ brings all necessary Serdes in scope.
  3. This is an example of compile time type safety that we don’t have in the Java APIs.
  4. The code looks less verbose and more focused towards the actual transformation that it does on the data stream.

User-Defined Serdes

When the default primitive Serdes are not enough and we need to define custom Serdes, the usage is exactly the same as above. Just define the implicit Serdes and start building the stream transformation. Here’s an example with AvroSerde:

// domain object as a case class
case class UserClicks(clicks: Long)

// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde

// Primitive Serdes
import Serdes._

// And then business as usual ..

val userClicksStream: KStream[String, UserClicks] = builder.stream(userClicksTopic)

val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)

// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTable[String, Long] =
 userClicksStream

   // Join the stream against the table.
   .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks.clicks))

   // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
   .map((_, regionWithClicks) => regionWithClicks)

   // Compute the total per region by summing the individual click counts per region.
   .groupByKey
   .reduce(_ + _)

// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)

A complete example of user-defined Serdes can be found in a test class within the library.

Previous Next

4 - Processor API

Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic.

Table of Contents

  • Overview
  • Defining a Stream Processor
  • Unit Testing Processors
  • State Stores
    • Defining and creating a State Store
    • Fault-tolerant State Stores
    • Enable or Disable Fault Tolerance of State Stores (Store Changelogs)
    • Timestamped State Stores
    • Versioned Key-Value State Stores
    • Readonly State Stores
    • Implementing Custom State Stores
  • Connecting Processors and State Stores
  • Accessing Processor Context

Overview

The Processor API can be used to implement both stateless as well as stateful operations, where the latter is achieved through the use of state stores.

Tip

Combining the DSL and the Processor API: You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the section Applying processors (Processor API integration).

For a complete list of available API functionality, see the Streams API docs.

Defining a Stream Processor

A stream processor is a node in the processor topology that represents a single processing step. With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect these processors with their associated state stores to compose the processor topology.

You can define a customized stream processor by implementing the Processor interface, which provides the process() API method. The process() method is called on each of the received records.

The Processor interface also has an init() method, which is called by the Kafka Streams library during task construction phase. Processor instances should perform any required initialization in this method. The init() method passes in a ProcessorContext instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition, its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation function (via ProcessorContext#schedule()), to forward a new record to the downstream processors (via ProcessorContext#forward()), and to request a commit of the current processing progress (via ProcessorContext#commit()). Any resources you set up in init() can be cleaned up in the close() method. Note that Kafka Streams may re-use a single Processor object by calling init() on it again after close().

The Processor interface takes four generic parameters: KIn, VIn, KOut, VOut. These define the input and output types that the processor implementation can handle. KIn and VIn define the key and value types of the Record that will be passed to process(). Likewise, KOut and VOut define the forwarded key and value types for the result Record that ProcessorContext#forward() will accept. If your processor does not forward any records at all (or if it only forwards null keys or values), a best practice is to set the output generic type argument to Void. If it needs to forward multiple types that don’t share a common superclass, you will have to set the output generic type argument to Object.

Both the Processor#process() and the ProcessorContext#forward() methods handle records in the form of the Record<K, V> data class. This class gives you access to the main components of a Kafka record: the key, value, timestamp and headers. When forwarding records, you can use the constructor to create a new Record from scratch, or you can use the convenience builder methods to replace one of the Record’s properties and copy over the rest. For example, inputRecord.withValue(newValue) would copy the key, timestamp, and headers from inputRecord while setting the output record’s value to newValue. Note that this does not mutate inputRecord, but instead creates a shallow copy. Beware that this is only a shallow copy, so if you plan to mutate the key, value, or headers elsewhere in the program, you will want to create a deep copy of those fields yourself.

In addition to handling incoming records via Processor#process(), you have the option to schedule periodic invocation (called “punctuation”) in your processor’s init() method by calling ProcessorContext#schedule() and passing it a Punctuator. The PunctuationType determines what notion of time is used for the punctuation scheduling: either stream-time or wall-clock-time (by default, stream-time is configured to represent event-time via TimestampExtractor). When stream-time is used, punctuate() is triggered purely by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there is no new input data arriving, stream-time is not advanced and thus punctuate() is not called.

For example, if you schedule a Punctuator function every 10 seconds based on PunctuationType.STREAM_TIME and if you process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record), then punctuate() would be called 6 times. This happens regardless of the time required to actually process those records. punctuate() would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.

When wall-clock-time (i.e. PunctuationType.WALL_CLOCK_TIME) is used, punctuate() is triggered purely by the wall-clock time. Reusing the example above, if the Punctuator function is scheduled based on PunctuationType.WALL_CLOCK_TIME, and if these 60 records were processed within 20 seconds, punctuate() is called 2 times (one time every 10 seconds). If these 60 records were processed within 5 seconds, then no punctuate() is called at all. Note that you can schedule multiple Punctuator callbacks with different PunctuationType types within the same processor by calling ProcessorContext#schedule() multiple times inside init() method.

Attention

Stream-time is only advanced when Streams processes records. If there are no records to process, or if Streams is waiting for new records due to the Task Idling configuration, then the stream time will not advance and punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().

Example

The following example Processor defines a simple word-count algorithm and the following actions are performed:

  • In the init() method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.

  • In the process() method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).

  • In the punctuate() method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.

    public class WordCountProcessor implements Processor<String, String, String, String> { private KeyValueStore<String, Integer> kvStore;

    @Override
    public void init(final ProcessorContext<String, String> context) {
        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
            try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue<String, Integer> entry = iter.next();
                    context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                }
            }
        });
        kvStore = context.getStateStore("Counts");
    }
    
    @Override
    public void process(final Record<String, String> record) {
        final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\W+");
    
        for (final String word : words) {
            final Integer oldValue = kvStore.get(word);
    
            if (oldValue == null) {
                kvStore.put(word, 1);
            } else {
                kvStore.put(word, oldValue + 1);
            }
        }
    }
    
    @Override
    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }
    

    }

Note

Stateful processing with state stores: The WordCountProcessor defined above can access the currently received record in its process() method, and it can leverage state stores to maintain processing states to, for example, remember recently arrived records for stateful processing needs like aggregations and joins. For more information, see the state stores documentation.

Unit Testing Processors

Kafka Streams comes with a test-utils module to help you write unit tests for your processors here.

State Stores

To implement a stateful Processor, you must provide one or more state stores to the processor (stateless processors do not need state stores). State stores can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, and more. Another feature of state stores is that they can be interactively queried from other applications, such as a NodeJS-based dashboard or a microservice implemented in Scala or Go.

The available state store types in Kafka Streams have fault tolerance enabled by default.

Defining and creating a State Store

You can either use one of the available store types or implement your own custom store type. It’s common practice to leverage an existing store type via the Stores factory.

Note that, when using Kafka Streams, you normally don’t create or instantiate state stores directly in your code. Rather, you define state stores indirectly by creating a so-called StoreBuilder. This builder is used by Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where needed.

The following store types are available out of the box.

Store TypeStorage EngineFault-tolerant?Description
Persistent KeyValueStore<K, V>RocksDBYes (enabled by default)
  • The recommended store type for most use cases.

  • Stores its data on local disk.

  • Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.

  • RocksDB settings can be fine-tuned, see RocksDB configuration.

  • Available store variants: timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.

  • Use persistentTimestampedKeyValueStore when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.

  • Use persistentVersionedKeyValueStore when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.

  • Use persistentWindowStore or persistentTimestampedWindowStore when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.

  • Use persistentSessionStore when you need a persistent sessionWindowedKey-value store.

    // Creating a persistent key-value store: // here, we create a KeyValueStore<String, Long> named “persistent-counts”. import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores;

    // Using a KeyValueStoreBuilder to build a KeyValueStore. StoreBuilder<KeyValueStore<String, Long» countStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(“persistent-counts”), Serdes.String(), Serdes.Long()); KeyValueStore<String, Long> countStore = countStoreSupplier.build();

In-memory KeyValueStore<K, V> | - | Yes (enabled by default) |

  • Stores its data in memory.

  • Storage capacity: managed local state must fit into memory (heap space) of an application instance.

  • Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts.

  • Available store variants: time window key-value store, session window key-value store.

  • Use TimestampedKeyValueStore when you need a key-(value/timestamp) store that supports put/get/delete and range queries.

  • Use TimestampedWindowStore when you need to store windowedKey-(value/timestamp) pairs.

  • There is no built-in in-memory, versioned key-value store at this time.

    // Creating an in-memory key-value store: // here, we create a KeyValueStore<String, Long> named “inmemory-counts”. import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores;

    // Using a KeyValueStoreBuilder to build a KeyValueStore. StoreBuilder<KeyValueStore<String, Long» countStoreSupplier = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(“inmemory-counts”), Serdes.String(), Serdes.Long()); KeyValueStore<String, Long> countStore = countStoreSupplier.build();

Fault-tolerant State Stores

To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one machine to another when elastically adding or removing capacity from your application. This topic is sometimes referred to as the state store’s associated changelog topic , or its changelog. For example, if you experience machine failure, the state store and the application’s state can be fully restored from its changelog. You can enable or disable this backup feature for a state store.

Fault-tolerant state stores are backed by a compacted changelog topic. The purpose of compacting this topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster, and to minimize recovery time if a state store needs to be restored from its changelog topic.

Fault-tolerant windowed state stores are backed by a topic that uses both compaction and deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are composite keys that include the “normal” key and window timestamps. For these types of composite keys it would not be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion enabled, old windows that have expired will be cleaned up by Kafka’s log cleaner as the log segments expire. The default retention setting is Windows#maintainMs() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.

When you open an Iterator from a state store you must call close() on the iterator when you are done working with it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator, you may encounter an OOM error.

Enable or Disable Fault Tolerance of State Stores (Store Changelogs)

You can enable or disable fault tolerance for a state store by enabling or disabling the change logging of the store through enableLogging() and disableLogging(). You can also fine-tune the associated topic’s configuration if needed.

Example for disabling fault-tolerance:

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingDisabled(); // disable backing up the store to a changelog topic

Attention

If the changelog is disabled then the attached state store is no longer fault tolerant and it can’t have any standby replicas.

Here is an example for enabling fault tolerance, with additional changelog-topic configuration: You can add any log config from kafka.log.LogConfig. Unrecognized configs will be ignored.

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

Map<String, String> changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings

Timestamped State Stores

KTables always store timestamps by default. A timestamped state store improves stream processing semantics and enables handling out-of-order data in source KTables, detecting out-of-order joins and aggregations, and getting the timestamp of the latest update in an Interactive Query.

You can query timestamped state stores both with and without a timestamp.

Upgrade note: All users upgrade with a single rolling bounce per instance.

  • For Processor API users, nothing changes in existing applications, and you have the option of using the timestamped stores.
  • For DSL operators, store data is upgraded lazily in the background.
  • No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the TimestampedBytesStore interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write.

Versioned Key-Value State Stores

Versioned key-value state stores are available since Kafka Streams 3.5. Rather than storing a single record version (value and timestamp) per key, versioned state stores may store multiple record versions per key. This allows versioned state stores to support timestamped retrieval operations to return the latest record (per key) as of a specified timestamp.

You can create a persistent, versioned state store by passing a VersionedBytesStoreSupplier to the versionedKeyValueStoreBuilder, or by implementing your own VersionedKeyValueStore.

Each versioned store has an associated, fixed-duration history retention parameter which specifies long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for timestamped retrieval operations where the timestamp being queried is within history retention of the current observed stream time.

History retention also doubles as its grace period , which determines how far back in time out-of-order writes to the store will be accepted. A versioned store will not accept writes (inserts, updates, or deletions) if the timestamp associated with the write is older than the current observed stream time by more than the grace period. Stream time in this context is tracked per-partition, rather than per-key, which means it’s important that grace period (i.e., history retention) be set high enough to accommodate a record with one key arriving out-of-order relative to a record for another key.

Because the memory footprint of versioned key-value stores is higher than that of non-versioned key-value stores, you may want to adjust your RocksDB memory settings accordingly. Benchmarking your application with versioned stores is also advised as performance is expected to be worse than when using non-versioned stores.

Versioned stores do not support caching or interactive queries at this time. Also, window stores and global tables may not be versioned.

Upgrade note: Versioned state stores are opt-in only; no automatic upgrades from non-versioned to versioned stores will take place.

Upgrades are supported from persistent, non-versioned key-value stores to persistent, versioned key-value stores as long as the original store has the same changelog topic format as the versioned store being upgraded to. Both persistent key-value stores and timestamped key-value stores share the same changelog topic format as persistent versioned key-value stores, and therefore both are eligible for upgrades.

If you wish to upgrade an application using persistent, non-versioned key-value stores to use persistent, versioned key-value stores instead, you can perform the following procedure:

  • Stop all application instances, and clear any local state directories for the store(s) being upgraded.
  • Update your application code to use versioned stores where desired.
  • Update your changelog topic configs, for the relevant state stores, to set the value of min.compaction.lag.ms to be at least your desired history retention. History retention plus one day is recommended as buffer for the use of broker wall clock time during compaction.
  • Restart your application instances and allow time for the versioned stores to rebuild state from changelog.

ReadOnly State Stores

A read-only state store materialized the data from its input topic. It also uses the input topic for fault-tolerance, and thus does not have an additional changelog topic (the input topic is re-used as changelog). Thus, the input topic should be configured with log compaction. Note that no other processor should modify the content of the state store, and the only writer should be the associated “state update processor”; other processors may read the content of the read-only store.

note: beware of the partitioning requirements when using read-only state stores for lookups during processing. You might want to make sure the original changelog topic is co-partitioned with the processors reading the read-only statestore.

Implementing Custom State Stores

You can use the built-in state store types or implement your own. The primary interface to implement for the store is org.apache.kafka.streams.processor.StateStore. Kafka Streams also has a few extended interfaces such as KeyValueStore and VersionedKeyValueStore.

Note that your customized org.apache.kafka.streams.processor.StateStore implementation also needs to provide the logic on how to restore the state via the org.apache.kafka.streams.processor.StateRestoreCallback or org.apache.kafka.streams.processor.BatchingStateRestoreCallback interface. Details on how to instantiate these interfaces can be found in the javadocs.

You also need to provide a “builder” for the store by implementing the org.apache.kafka.streams.state.StoreBuilder interface, which Kafka Streams uses to create instances of your store.

Accessing Processor Context

As we have mentioned in the Defining a Stream Processor section, a ProcessorContext control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.

This object can also be used to access the metadata related with the application like applicationId, taskId, and stateDir, and also RecordMetadata such as topic, partition, and offset.

Connecting Processors and State Stores

Now that a processor (WordCountProcessor) and the state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by using the Topology instance. In addition, you can add source processors with the specified Kafka topics to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate output data streams out of the topology.

Here is an example implementation:

Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")
    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreBuilder, "Process")
    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

Here is a quick explanation of this example:

  • A source processor node named "Source" is added to the topology using the addSource method, with one Kafka topic "source-topic" fed to it.
  • A processor node named "Process" with the pre-defined WordCountProcessor logic is then added as the downstream processor of the "Source" node using the addProcessor method.
  • A predefined persistent key-value state store is created and associated with the "Process" node, using countStoreBuilder.
  • A sink processor node is then added to complete the topology using the addSink method, taking the "Process" node as its upstream processor and writing to a separate "sink-topic" Kafka topic (note that users can also use another overloaded variant of addSink to dynamically determine the Kafka topic to write to for each received record from the upstream processor).

In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology. This can be done by implementing ConnectedStoreProvider#stores() on the ProcessorSupplier instead of calling Topology#addStateStore(), like this:

Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
    // add the WordCountProcessor node which takes the source processor as its upstream processor.
    // the ProcessorSupplier provides the count store associated with the WordCountProcessor
    .addProcessor("Process", new ProcessorSupplier<String, String, String, String>() {
        public Processor<String, String, String, String> get() {
            return new WordCountProcessor();
        }

        public Set<StoreBuilder<?>> stores() {
            final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder =
                Stores
                    .keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("Counts"),
                        Serdes.String(),
                        Serdes.Long()
                    );
            return Collections.singleton(countsStoreBuilder);
        }
    }, "Source")
    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

This allows for a processor to “own” state stores, effectively encapsulating their usage from the user wiring the topology. Multiple processors that share a state store may provide the same store with this technique, as long as the StoreBuilder is the same instance.

In these topologies, the "Process" stream processor node is considered a downstream processor of the "Source" node, and an upstream processor of the "Sink" node. As a result, whenever the "Source" node forwards a newly fetched record from Kafka to its downstream "Process" node, the WordCountProcessor#process() method is triggered to process the record and update the associated state store. Whenever context#forward() is called in the WordCountProcessor#punctuate() method, the aggregate records will be sent via the "Sink" processor node to the Kafka topic "sink-topic". Note that in the WordCountProcessor implementation, you must refer to the same store name "Counts" when accessing the key-value store, otherwise an exception will be thrown at runtime, indicating that the state store cannot be found. If the state store is not associated with the processor in the Topology code, accessing it in the processor’s init() method will also throw an exception at runtime, indicating the state store is not accessible from this processor.

Note that the Topology#addProcessor function takes a ProcessorSupplier as argument, and that the supplier pattern requires that a new Processor instance is returned each time ProcessorSupplier#get() is called. Creating a single Processor object and returning the same object reference in ProcessorSupplier#get() would be a violation of the supplier pattern and leads to runtime exceptions. So remember not to provide a singleton Processor instance to Topology. The ProcessorSupplier should always generate a new instance each time ProcessorSupplier#get() gets called.

Now that you have fully defined your processor topology in your application, you can proceed to running the Kafka Streams application.

Previous Next

5 - Naming Operators in a Streams DSL application

Developer Guide for Kafka Streams

Naming Operators in a Kafka Streams DSL Application

You now can give names to processors when using the Kafka Streams DSL. In the PAPI there are Processors and State Stores and you are required to explicitly name each one.

At the DSL layer, there are operators. A single DSL operator may compile down to multiple Processors and State Stores, and if required repartition topics. But with the Kafka Streams DSL, all these names are generated for you. There is a relationship between the generated processor name state store names (hence changelog topic names) and repartition topic names. Note, that the names of state stores and changelog/repartition topics are “stateful” while processor names are “stateless”.

This distinction of stateful vs. stateless names has important implications when updating your topology. While the internal naming makes creating a topology with the DSL much more straightforward, there are a couple of trade-offs. The first trade-off is what we could consider a readability issue. The other more severe trade-off is the shifting of names due to the relationship between the DSL operator and the generated Processors, State Stores changelog topics and repartition topics.

Readability Issues

By saying there is a readability trade-off, we are referring to viewing a description of the topology. When you render the string description of your topology via the Topology#describe() method, you can see what the processor is, but you don’t have any context for its business purpose. For example, consider the following simple topology:

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v) -> !v.equals("invalid_txn"))
	  .mapValues((v) -> v.substring(0,5))
	  .to("output");

Running Topology#describe() yields this string:

Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	Processor: KSTREAM-FILTER-0000000001 (stores: [])
	  --> KSTREAM-MAPVALUES-0000000002
	  <-- KSTREAM-SOURCE-0000000000
	Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
	  --> KSTREAM-SINK-0000000003
	  <-- KSTREAM-FILTER-0000000001
	Sink: KSTREAM-SINK-0000000003 (topic: output)
	  <-- KSTREAM-MAPVALUES-0000000002

From this report, you can see what the different operators are, but what is the broader context here? For example, consider KSTREAM-FILTER-0000000001, we can see that it’s a filter operation, which means that records are dropped that don’t match the given predicate. But what is the meaning of the predicate? Additionally, you can see the topic names of the source and sink nodes, but what if the topics aren’t named in a meaningful way? Then you’re left to guess the business purpose behind these topics.

Also notice the numbering here: the source node is suffixed with 0000000000 indicating it’s the first processor in the topology. The filter is suffixed with 0000000001, indicating it’s the second processor in the topology. In Kafka Streams, there are now overloaded methods for both KStream and KTable that accept a new parameter Named. By using the Named class DSL users can provide meaningful names to the processors in their topology.

Now let’s take a look at your topology with all the processors named:

KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
	  .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
	  .to("output", Produced.as("Mapped_transactions_output_topic"));


Topologies:
   Sub-topology: 0
	Source: Customer_transactions_input_topic (topics: [input])
	  --> filter_out_invalid_txns
	Processor: filter_out_invalid_txns (stores: [])
	  --> Map_values_to_first_6_characters
	  <-- Customer_transactions_input_topic
	Processor: Map_values_to_first_6_characters (stores: [])
	  --> Mapped_transactions_output_topic
	  <-- filter_out_invalid_txns
	Sink: Mapped_transactions_output_topic (topic: output)
	  <-- Map_values_to_first_6_characters

Now you can look at the topology description and easily understand what role each processor plays in the topology. But there’s another reason for naming your processor nodes when you have stateful operators that remain between restarts of your Kafka Streams applications, state stores, changelog topics, and repartition topics.

Changing Names

Generated names are numbered where they are built in the topology. The name generation strategy is KSTREAM|KTABLE->operator name<->number suffix<. The number is a globally incrementing number that represents the operator’s order in the topology. The generated number is prefixed with a varying number of “0"s to create a string that is consistently 10 characters long. This means that if you add/remove or shift the order of operations, the position of the processor shifts, which shifts the name of the processor. Since most processors exist in memory only, this name shifting presents no issue for many topologies. But the name shifting does have implications for topologies with stateful operators or repartition topics. Here’s a different topology with some state:

KStream<String,String> stream = builder.stream("input");
 stream.groupByKey()
	   .count()
	   .toStream()
	   .to("output");

This topology description yields the following:

Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	 --> KSTREAM-AGGREGATE-0000000002
	Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
	 --> KTABLE-TOSTREAM-0000000003
	 <-- KSTREAM-SOURCE-0000000000
	Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
	 --> KSTREAM-SINK-0000000004
	 <-- KSTREAM-AGGREGATE-0000000002
	Sink: KSTREAM-SINK-0000000004 (topic: output)
	 <-- KTABLE-TOSTREAM-0000000003

You can see from the topology description above that the state store is named KSTREAM-AGGREGATE-STATE-STORE-0000000002. Here’s what happens when you add a filter to keep some of the records out of the aggregation:

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v)-> v !=null && v.length() >= 6 )
      .groupByKey()
      .count()
      .toStream()
      .to("output");

And the corresponding topology:

Topologies:
	Sub-topology: 0
	 Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	 Processor: KSTREAM-FILTER-0000000001 (stores: [])
	   --> KSTREAM-AGGREGATE-0000000003
	   <-- KSTREAM-SOURCE-0000000000
	 Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
	   --> KTABLE-TOSTREAM-0000000004
	   <-- KSTREAM-FILTER-0000000001
	 Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
	   --> KSTREAM-SINK-0000000005
	   <-- KSTREAM-AGGREGATE-0000000003
	  Sink: KSTREAM-SINK-0000000005 (topic: output)
	   <-- KTABLE-TOSTREAM-0000000004

Notice that since you’ve added an operation before the count operation, the state store (and the changelog topic) names have changed. This name change means you can’t do a rolling re-deployment of your updated topology. Also, you must use the Streams Reset Tool to re-calculate the aggregations, because the changelog topic has changed on start-up and the new changelog topic contains no data. Fortunately, there’s an easy solution to remedy this situation. Give the state store a user-defined name instead of relying on the generated one, so you don’t have to worry about topology changes shifting the name of the state store. You’ve had the ability to name repartition topics with the Joined, StreamJoined, andGrouped classes, and name state store and changelog topics with Materialized. But it’s worth reiterating the importance of naming these DSL topology operations again. Here’s how your DSL code looks now giving a specific name to your state store:

KStream<String,String> stream = builder.stream("input");
stream.filter((k, v) -> v != null && v.length() >= 6)
	  .groupByKey()
	  .count(Materialized.as("Purchase_count_store"))
	  .toStream()
	  .to("output");

And here’s the topology

Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	Processor: KSTREAM-FILTER-0000000001 (stores: [])
	  --> KSTREAM-AGGREGATE-0000000002
	  <-- KSTREAM-SOURCE-0000000000
	Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
	  --> KTABLE-TOSTREAM-0000000003
	  <-- KSTREAM-FILTER-0000000001
	Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
	  --> KSTREAM-SINK-0000000004
	  <-- KSTREAM-AGGREGATE-0000000002
	Sink: KSTREAM-SINK-0000000004 (topic: output)
	  <-- KTABLE-TOSTREAM-0000000003

Now, even though you’ve added processors before your state store, the store name and its changelog topic names don’t change. This makes your topology more robust and resilient to changes made by adding or removing processors.

Conclusion

It’s a good practice to name your processing nodes when using the DSL, and it’s even more important to do this when you have “stateful” processors your application such as repartition topics and state stores (and the accompanying changelog topics).

Here are a couple of points to remember when naming your DSL topology:

  1. If you have an existing topology and you haven’t named your state stores (and changelog topics) and repartition topics, we recommended that you do so. But this will be a topology breaking change, so you’ll need to shut down all application instances, make the changes, and run the Streams Reset Tool. Although this may be inconvenient at first, it’s worth the effort to protect your application from unexpected errors due to topology changes.
  2. If you have a new topology , make sure you name the persistent parts of your topology: state stores (changelog topics) and repartition topics. This way, when you deploy your application, you’re protected from topology changes that otherwise would break your Kafka Streams application. If you don’t want to add names to stateless processors at first, that’s fine as you can always go back and add the names later.
Here’s a quick reference on naming the critical parts of your Kafka Streams application to prevent topology name changes from breaking your application: OperationNaming Class
Aggregation repartition topicsGrouped
KStream-KStream Join repartition topicsStreamJoined
KStream-KTable Join repartition topicJoined
KStream-KStream Join state storesStreamJoined
State Stores (for aggregations and KTable-KTable joins)Materialized
Stream/Table non-stateful operationsNamed

6 - Data Types and Serialization

Data Types and Serialization

Every Kafka Streams application must provide Serdes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such Serdes information include: stream(), table(), to(), repartition(), groupByKey(), groupBy().

You can provide Serdes by using either of these methods, but you must use at least one:

  • By setting default Serdes in the java.util.Properties config instance.
  • By specifying explicit Serdes when calling the appropriate API methods, thus overriding the defaults.

Table of Contents

  • Configuring Serdes
  • Overriding default Serdes
  • Available Serdes
    • Primitive and basic types
    • JSON
    • Implementing custom serdes
  • Kafka Streams DSL for Scala Implicit Serdes

Configuring Serdes

Serdes specified in the Streams configuration are used as the default in your Kafka Streams application. Because this config’s default is null, you must either set a default Serde by using this configuration or pass in Serdes explicitly, as described below.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

Overriding default Serdes

You can also specify Serdes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));

If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));

If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error. Since 1.0.x we have introduced an DeserializationExceptionHandler interface which allows you to customize how to handle such records. The customized implementation of the interface can be specified via the StreamsConfig. For more details, please feel free to read the Configuring a Streams Application section.

Available Serdes

Primitive and basic types

Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as byte[] in its kafka-clients Maven artifact:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.

Data typeSerde
byte[]Serdes.ByteArray(), Serdes.Bytes() (see tip below)
ByteBufferSerdes.ByteBuffer()
DoubleSerdes.Double()
IntegerSerdes.Integer()
LongSerdes.Long()
StringSerdes.String()
UUIDSerdes.UUID()
VoidSerdes.Void()
ListSerdes.ListSerde()
BooleanSerdes.Boolean()

Tip

Bytes is a wrapper for Java’s byte[] (byte array) that supports proper equality and ordering semantics. You may want to consider using Bytes instead of byte[] in your applications.

JSON

You can use JsonSerializer and JsonDeserializer from Kafka Connect to construct JSON compatible serializers and deserializers using Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>). Note, that Kafka Connect’s Json (de)serializer requires Java 17.

Implementing custom Serdes

If you need to implement custom Serdes, your best starting point is to take a look at the source code references of existing Serdes (see previous section). Typically, your workflow will be similar to:

  1. Write a serializer for your data type T by implementing org.apache.kafka.common.serialization.Serializer.
  2. Write a deserializer for T by implementing org.apache.kafka.common.serialization.Deserializer.
  3. Write a serde for T by implementing org.apache.kafka.common.serialization.Serde, which you either do manually (see existing Serdes in the previous section) or by leveraging helper functions in Serdes such as Serdes.serdeFrom(Serializer<T>, Deserializer<T>). Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to KafkaStreams. If your serde class has generic types or you use Serdes.serdeFrom(Serializer<T>, Deserializer<T>), you can pass your serde only via methods calls (for example builder.stream("topicName", Consumed.with(...))).

Kafka Streams DSL for Scala Implicit Serdes

When using the Kafka Streams DSL for Scala you’re not required to configure a default Serdes. In fact, it’s not supported. Serdes are instead provided implicitly by default implementations for common primitive datatypes. See the Implicit Serdes and User-Defined Serdes sections in the DSL API documentation for details

Previous Next

7 - Testing a Streams Application

Testing Kafka Streams

Table of Contents

  • Importing the test utilities
  • Testing Streams applications
  • Unit testing Processors

Importing the test utilities

To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base. Example pom.xml snippet when using Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>4.0.0</version>
    <scope>test</scope>
</dependency>

Testing a Streams application

The test-utils package provides a TopologyTestDriver that can be used pipe data through a Topology that is either assembled manually using Processor API or via the DSL using StreamsBuilder. The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores.

// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();

// create test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology);

With the test driver you can create TestInputTopic giving topic name and the corresponding serializers. TestInputTopic provides various methods to pipe new message values, keys and values, or list of KeyValue objects.

TestInputTopic<String, Long> inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
inputTopic.pipeInput("key", 42L);

To verify the output, you can use TestOutputTopic where you configure the topic and the corresponding deserializers during initialization. It offers helper methods to read only certain parts of the result records or the collection of records. For example, you can validate returned KeyValue with standard assertions if you only care about the key and value, but not the timestamp of the result record.

TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer());
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L)));

TopologyTestDriver supports punctuations, too. Event-time punctuations are triggered automatically based on the processed records’ timestamps. Wall-clock-time punctuations can also be triggered by advancing the test driver’s wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).

testDriver.advanceWallClockTime(Duration.ofSeconds(20));

Additionally, you can access state stores via the test driver before or after a test. Accessing stores before a test is useful to pre-populate a store with some initial values. After data was processed, expected updates to the store can be verified.

KeyValueStore store = testDriver.getKeyValueStore("store-name");

Note, that you should always close the test driver at the end to make sure all resources are release properly.

testDriver.close();

Example

The following example demonstrates how to use the test driver and helper classes. The example creates a topology that computes the maximum value per key using a key-value-store. While processing, no output is generated, but only the store is updated. Output is only sent downstream based on event-time and wall-clock punctuations.

private TopologyTestDriver testDriver;
private TestInputTopic<String, Long> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store;

private Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde();

@Before
public void setup() {
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
    topology.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("aggStore"),
            Serdes.String(),
            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
        "aggregator");
    topology.addSink("sinkProcessor", "result-topic", "aggregator");

    // setup test driver
    Properties props = new Properties();
    props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
    testDriver = new TopologyTestDriver(topology, props);

    // setup test topics
    inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
    outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());

    // pre-populate store
    store = testDriver.getKeyValueStore("aggStore");
    store.put("a", 21L);
}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
    inputTopic.pipeInput("a", 1L);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForSmallerValue() {
    inputTopic.pipeInput("a", 1L);
    assertThat(store.get("a"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForLargerValue() {
    inputTopic.pipeInput("a", 42L);
    assertThat(store.get("a"), equalTo(42L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldUpdateStoreForNewKey() {
    inputTopic.pipeInput("b", 21L);
    assertThat(store.get("b"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfEvenTimeAdvances() {
    final Instant recordTime = Instant.now();
    inputTopic.pipeInput("a", 1L,  recordTime);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));

    inputTopic.pipeInput("a", 1L,  recordTime);
    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
    testDriver.advanceWallClockTime(Duration.ofSeconds(60));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
    @Override
    public Processor<String, Long> get() {
        return new CustomMaxAggregator();
    }
}

public class CustomMaxAggregator implements Processor<String, Long> {
    ProcessorContext context;
    private KeyValueStore<String, Long> store;

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
        context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
        store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
    }

    @Override
    public void process(String key, Long value) {
        Long oldValue = store.get(key);
        if (oldValue == null || value > oldValue) {
            store.put(key, value);
        }
    }

    private void flushStore() {
        KeyValueIterator<String, Long> it = store.all();
        while (it.hasNext()) {
            KeyValue<String, Long> next = it.next();
            context.forward(next.key, next.value);
        }
    }

    @Override
    public void close() {}
}

Unit Testing Processors

If you write a Processor, you will want to test it.

Because the Processor forwards its results to the context rather than returning them, Unit testing requires a mocked context capable of capturing forwarded data for inspection. For this reason, we provide a MockProcessorContext in test-utils.

Construction

To begin with, instantiate your processor and initialize it with the mock context:

final Processor processorUnderTest = ...;
final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
processorUnderTest.init(context);

If you need to pass configuration to your processor or set the default serdes, you can create the mock with config:

final Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put("some.other.config", "some config value");
final MockProcessorContext<String, Long> context = new MockProcessorContext<>(props);

Captured data

The mock will capture any values that your processor forwards. You can make assertions on them:

processorUnderTest.process("key", "value");

final Iterator<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded().iterator();
assertEquals(forwarded.next().record(), new Record<>(..., ...));
assertFalse(forwarded.hasNext());

// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
context.resetForwards();

assertEquals(context.forwarded().size(), 0);

If your processor forwards to specific child processors, you can query the context for captured data by child name:

final List<CapturedForward<? extends String, ? extends Long>> captures = context.forwarded("childProcessorName");

The mock also captures whether your processor has called commit() on the context:

assertTrue(context.committed());

// commit captures can also be reset.
context.resetCommit();

assertFalse(context.committed());

Setting record metadata

In case your processor logic depends on the record metadata (topic, partition, offset), you can set them on the context:

context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L);

Once these are set, the context will continue returning the same values, until you set new ones.

State stores

In case your punctuator is stateful, the mock context allows you to register state stores. You’re encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or Session), since the mock context does not manage changelogs, state directories, etc.

final KeyValueStore<String, Integer> store =
    Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("myStore"),
            Serdes.String(),
            Serdes.Integer()
        )
        .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
        .build();
store.init(context, store);
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);

Verifying punctuators

Processors can schedule punctuators to handle periodic tasks. The mock context does not automatically execute punctuators, but it does capture them to allow you to unit test them as well:

final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final long interval = capturedPunctuator.getIntervalMs();
final PunctuationType type = capturedPunctuator.getType();
final boolean cancelled = capturedPunctuator.cancelled();
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(/*timestamp*/ 0L);

If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a simple topology with your processor and using the TopologyTestDriver.

Previous Next

8 - Interactive Queries

Interactive Queries

Interactive queries allow you to leverage the state of your application from outside your application. The Kafka Streams enables your applications to be queryable.

Table of Contents

  • Querying local state stores for an app instance
    • Querying local key-value stores
    • Querying local window stores
    • Querying local custom state stores
  • Querying remote state stores for the entire app
    • Adding an RPC layer to your application
    • Exposing the RPC endpoints of your application
    • Discovering and accessing application instances and their local state stores
  • Demo applications

The full state of your application is typically split across many distributed instances of your application, and across many state stores that are managed locally by these application instances.

There are local and remote components to interactively querying the state of your application.

Local state An application instance can query the locally managed portion of the state and directly query its own local state stores. You can use the corresponding local data in other parts of your application code, as long as it doesn’t require calling the Kafka Streams API. Querying state stores is always read-only to guarantee that the underlying state stores will never be mutated out-of-band (e.g., you cannot add new entries). State stores should only be mutated by the corresponding processor topology and the input data it operates on. For more information, see Querying local state stores for an app instance. Remote state

To query the full state of your application, you must connect the various fragments of the state, including:

  • query local state stores
  • discover all running instances of your application in the network and their state stores
  • communicate with these instances over the network (e.g., an RPC layer)

Connecting these fragments enables communication between instances of the same app and communication from other applications for interactive queries. For more information, see Querying remote state stores for the entire app.

Kafka Streams natively provides all of the required functionality for interactively querying the state of your application, except if you want to expose the full state of your application via interactive queries. To allow application instances to communicate over the network, you must add a Remote Procedure Call (RPC) layer to your application (e.g., REST API).

This table shows the Kafka Streams native communication support for various procedures.

ProcedureApplication instanceEntire application
Query local state stores of an app instanceSupportedSupported
Make an app instance discoverable to othersSupportedSupported
Discover all running app instances and their state storesSupportedSupported
Communicate with app instances over the network (RPC)SupportedNot supported (you must configure)

Querying local state stores for an app instance

A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the application’s entire state. Querying the local stores on an instance will only return data locally available on that particular instance.

The method KafkaStreams#store(...) finds an application instance’s local state stores by name and type. Note that interactive queries are not supported for versioned state stores at this time.

Every application instance can directly query any of its local state stores.

The name of a state store is defined when you create the store. You can create the store explicitly by using the Processor API or implicitly by using stateful operations in the DSL.

The type of a state store is defined by QueryableStoreType. You can access the built-in types via the class QueryableStoreTypes. Kafka Streams currently has two built-in types:

  • A key-value store QueryableStoreTypes#keyValueStore(), see Querying local key-value stores.
  • A window store QueryableStoreTypes#windowStore(), see Querying local window stores.

You can also implement your own QueryableStoreType as described in section Querying local custom state stores.

Note

Kafka Streams materializes one state store per stream partition. This means your application will potentially manage many underlying state stores. The API enables you to query all of the underlying stores without having to know which partition the data is in.

Querying local key-value stores

To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value store named “CountsKeyValueStore”. This store will hold the latest count for any word that is found on the topic “word-count-input”.

Properties  props = ...;
StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

After the application has started, you can get access to “CountsKeyValueStore” and then query it via the ReadOnlyKeyValueStore API:

// Get the key-value store CountsKeyValueStore
ReadOnlyKeyValueStore<String, Long> keyValueStore =
    streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());

// Get value by key
System.out.println("count for hello:" + keyValueStore.get("hello"));

// Get the values for a range of keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + next.value);
}

// Get the values for all of the keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + next.value);
}

You can also materialize the results of stateless operators by using the overloaded methods that take a queryableStoreName as shown in the example below:

StreamsBuilder builder = ...;
KTable<String, Integer> regionCounts = ...;

// materialize the result of filtering corresponding to odd numbers
// the "queryableStoreName" can be subsequently queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 != 0),
  Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>as("queryableStoreName"));

// do not materialize the result of filtering corresponding to even numbers
// this means that these results will not be materialized and cannot be queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 == 0));

Querying local window stores

A window store will potentially have many results for any given key because the key can be present in multiple windows. However, there is only one result per window for a given key.

To query a local window store, you must first create a topology with a window store. This example creates a window store named “CountsWindowStore” that contains the counts for words in 1-minute windows.

StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(60)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

After the application has started, you can get access to “CountsWindowStore” and then query it via the ReadOnlyWindowStore API:

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get *all* available windows we fetch windows from the beginning of time until now.
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
  System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
}

Querying local custom state stores

Note

Only the Processor API supports custom state stores.

Before querying the custom state stores you must implement these interfaces:

  • Your custom state store must implement StateStore.
  • You must have an interface to represent the operations available on the store.
  • You must provide an implementation of StoreBuilder for creating instances of your store.
  • It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.

The class/interface hierarchy for your custom store might look something like:

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder {
  // implementation of the supplier for MyCustomStore
}

To make this store queryable you must:

  • Provide an implementation of QueryableStoreType.
  • Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.

Here is how to implement QueryableStoreType:

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}

A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage multiple local instances of a particular state store. The wrapper class hides this complexity and lets you query a “logical” state store by name without having to know about all of the underlying local instances of that state store.

When implementing your wrapper class you must use the StateStoreProvider interface to get access to the underlying instances of your store. StateStoreProvider#stores(String storeName, QueryableStoreType<T> queryableStoreType) returns a List of state stores with the given storeName and of the type as defined by queryableStoreType.

Here is an example implementation of the wrapper:

// We strongly recommended implementing a read-only interface
// to restrict usage of the store to safe read operations!
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {

  private final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType;
  private final String storeName;
  private final StateStoreProvider provider;

  public CustomStoreTypeWrapper(final StateStoreProvider provider,
                              final String storeName,
                              final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType) {

    // ... assign fields ...
  }

  // Implement a safe read method
  @Override
  public V read(final K key) {
    // Get all the stores with storeName and of customStoreType
    final List<MyReadableCustomStore<K, V>> stores = provider.getStores(storeName, customStoreType);
    // Try and find the value for the given key
    final Optional<V> value = stores.stream().filter(store -> store.read(key) != null).findFirst();
    // Return the value if it exists
    return value.orElse(null);
  }

}

You can now find and query your custom store:

Topology topology = ...;
ProcessorSupplier processorSuppler = ...;

// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store") //...;
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
topology.addStateStore(customStoreBuilder, "the-processor");

KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");

Querying remote state stores for the entire app

To query remote states for the entire app, you must expose the application’s full state to other applications, including applications that are running on different machines.

For example, you have a Kafka Streams application that processes user events in a multi-player video game, and you want to retrieve the latest status of each user directly and display it in a mobile app. Here are the required steps to make the full state of your application queryable:

  1. Add an RPC layer to your application so that the instances of your application can be interacted with via the network (e.g., a REST API, Thrift, a custom protocol, and so on). The instances must respond to interactive queries. You can follow the reference examples provided to get started.
  2. Expose the RPC endpoints of your application’s instances via the application.server configuration setting of Kafka Streams. Because RPC endpoints must be unique within a network, each instance has its own value for this configuration setting. This makes an application instance discoverable by other instances.
  3. In the RPC layer, discover remote application instances and their state stores and query locally available state stores to make the full state of your application queryable. The remote application instances can forward queries to other app instances if a particular instance lacks the local data to respond to a query. The locally available state stores can directly respond to queries.

Discover any running instances of the same application as well as the respective RPC endpoints they expose for interactive queries

Adding an RPC layer to your application

There are many ways to add an RPC layer. The only requirements are that the RPC layer is embedded within the Kafka Streams application and that it exposes an endpoint that other application instances and applications can connect to.

Exposing the RPC endpoints of your application

To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in the config properties. The application.server property defines a unique host:port pair that points to the RPC endpoint of the respective instance of a Kafka Streams application. The value of this configuration property will vary across the instances of your application. When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.

Tip

Consider leveraging the exposed RPC endpoints of your application for further functionality, such as piggybacking additional inter-application communication that goes beyond interactive queries.

This example shows how to configure and run a Kafka Streams application that supports the discovery of its state stores.

Properties props = new Properties();
// Set the unique RPC endpoint of this application instance through which it
// can be interactively queried.  In a real application, the value would most
// probably not be hardcoded but derived dynamically.
String rpcEndpoint = "host1:4460";
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
// ... further settings may follow here ...

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "word-count-input");

final KGroupedStream<String, String> groupedByWord = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
    .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("word-count"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// Then, create and start the actual RPC service for remote access to this
// application instance's local state stores.
//
// This service should be started on the same host and port as defined above by
// the property `StreamsConfig.APPLICATION_SERVER_CONFIG`.  The example below is
// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)
// that showcase how to implement such a service to get you started.
MyRPCService rpcService = ...;
rpcService.listenAt(rpcEndpoint);

Discovering and accessing application instances and their local state stores

The following methods return StreamsMetadata objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.

  • KafkaStreams#allMetadata(): find all instances of this application
  • KafkaStreams#allMetadataForStore(String storeName): find those applications instances that manage local instances of the state store “storeName”
  • KafkaStreams#metadataForKey(String storeName, K key, Serializer<K> keySerializer): using the default stream partitioning strategy, find the one application instance that holds the data for the given key in the given state store
  • KafkaStreams#metadataForKey(String storeName, K key, StreamPartitioner<K, ?> partitioner): using partitioner, find the one application instance that holds the data for the given key in the given state store

Attention

If application.server is not configured for an application instance, then the above methods will not find any StreamsMetadata for it.

For example, we can now find the StreamsMetadata for the state store named “word-count” that we defined in the code example shown in the previous section:

KafkaStreams streams = ...;
// Find all the locations of local instances of the state store named "word-count"
Collection<StreamsMetadata> wordCountHosts = streams.allMetadataForStore("word-count");

// For illustrative purposes, we assume using an HTTP client to talk to remote app instances.
HttpClient http = ...;

// Get the word count for word (aka key) 'alice': Approach 1
//
// We first find the one app instance that manages the count for 'alice' in its local state stores.
StreamsMetadata metadata = streams.metadataForKey("word-count", "alice", Serdes.String().serializer());
// Then, we query only that single app instance for the latest count of 'alice'.
// Note: The RPC URL shown below is fictitious and only serves to illustrate the idea.  Ultimately,
// the URL (or, in general, the method of communication) will depend on the RPC layer you opted to
// implement.  Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase
// how to implement such an RPC layer.
Long result = http.getLong("http://" + metadata.host() + ":" + metadata.port() + "/word-count/alice");

// Get the word count for word (aka key) 'alice': Approach 2
//
// Alternatively, we could also choose (say) a brute-force approach where we query every app instance
// until we find the one that happens to know about 'alice'.
Optional<Long> result = streams.allMetadataForStore("word-count")
    .stream()
    .map(streamsMetadata -> {
        // Construct the (fictituous) full endpoint URL to query the current remote application instance
        String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
        // Read and return the count for 'alice', if any.
        return http.getLong(url);
    })
    .filter(s -> s != null)
    .findFirst();

At this point the full state of the application is interactively queryable:

  • You can discover the running instances of the application and the state stores they manage locally.
  • Through the RPC layer that was added to the application, you can communicate with these application instances over the network and query them for locally available state.
  • The application instances are able to serve such queries because they can directly query their own local state stores and respond via the RPC layer.
  • Collectively, this allows us to query the full state of the entire application.

To see an end-to-end application with interactive queries, review the demo applications.

Previous Next

9 - Memory Management

Memory Management

You can specify the total memory (RAM) size used for internal caching and compacting of records. This caching happens before the records are written to state stores or forwarded downstream to other nodes.

The record caches are implemented slightly different in the DSL and Processor API.

Table of Contents

  • Record caches in the DSL
  • Record caches in the Processor API
  • RocksDB
  • Other memory usage

Record caches in the DSL

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is leveraged by the following KTable instances:

  • Source KTable: KTable instances that are created via StreamsBuilder#table() or StreamsBuilder#globalTable().
  • Aggregation KTable: instances of KTable that are created as a result of aggregations.

For such KTable instances, the record cache is used for:

  • Internal caching and compacting of output records before they are written by the underlying stateful processor node to its internal state stores.
  • Internal caching and compacting of output records before they are forwarded from the underlying stateful processor node to any of its downstream processor nodes.

Use the following example to understand the behaviors with and without record caching. In this example, the input is a KStream<String, Integer> with the records <K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>. The focus in this example is on the records with key == A.

  • An aggregation computes the sum of record values, grouped by key, for the input and returns a KTable<String, Integer>.
* **Without caching** : a sequence of output records is emitted for key `A` that represent changes in the resulting aggregation table. The parentheses (`()`) denote changes, the left number is the new aggregate value and the right number is the old aggregate value: `<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>`.
* **With caching** : a single output record is emitted for key `A` that would likely be compacted in the cache, leading to a single output record of `<A, (321, null)>`. This record is written to the aggregation's internal state store and forwarded to any downstream operations.

The cache size is specified through the cache.max.bytes.buffering parameter, which is a global setting per processing topology:

// Enable record cache of size 10 MB.
Properties props = new Properties();
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);

This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with T threads and C bytes allocated for caching, each thread will have an even C/T bytes to construct its own cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of caches across threads happens.

The basic API for the cache is made of put() and get() calls. Records are evicted using a simple LRU scheme after the cache size is reached. The first time a keyed record R1 = <K1, V1> finishes processing at a node, it is marked as dirty in the cache. Any other keyed record R2 = <K1, V2> with the same key K1 that is processed on that node during that time will overwrite <K1, V1>, this is referred to as “being compacted”. This has the same effect as Kafka’s log compaction, but happens earlier, while the records are still in memory, and within your client-side application, rather than on the server-side (i.e. the Kafka broker). After flushing, R2 is forwarded to the next processing node and then written to the local state store.

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits. Both commit.interval.ms and cache.max.bytes.buffering are global parameters. As such, it is not possible to specify different parameters for individual nodes.

Here are example settings for both parameters based on desired scenarios.

  • To turn off caching the cache size can be set to zero:

    // Disable record cache
    

    Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

  • To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:

    Properties props = new Properties();
    

    // Enable record cache of size 10 MB. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // Set commit interval to 1 second. props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.

  • When the cache is disabled (a), all of the input records will be output.

  • When the cache is enabled (b):

* Most records are output at the end of commit intervals (e.g., at `t1` a single blue record is output, which is the final over-write of the blue key up to that time).
* Some records are output because of cache pressure (i.e. before the end of a commit interval). For example, see the red record before `t2`. With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor.
* The total number of records output has been reduced from 15 to 8.

Record caches in the Processor API

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is used for internal caching and compacting of output records before they are written from a stateful processor node to its state stores.

The record cache in the Processor API does not cache or compact any output records that are being forwarded downstream. This means that all downstream processor nodes can see all records, whereas the state stores see a reduced number of records. This does not impact correctness of the system, but is a performance optimization for the state stores. For example, with the Processor API you can store a record in a state store while forwarding a different value downstream.

Following from the example first shown in section State Stores, to disable caching, you can add the withCachingDisabled call (note that caches are enabled by default, however there is an explicit withCachingEnabled call).

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withCachingEnabled();

Record caches are not supported for versioned state stores.

To avoid reading stale data, you can flush() the store before creating the iterator. Note, that flushing too often can lead to performance degration if RocksDB is used, so we advice to avoid flushing manually in general.

RocksDB

Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size and max_write_buffer_number. These can be specified through the rocksdb.config.setter configuration.

Also, we recommend changing RocksDB’s default memory allocator, because the default allocator may lead to increased memory consumption. To change the memory allocator to jemalloc, you need to set the environment variable LD_PRELOADbefore you start your Kafka Streams application:

# example: install jemalloc (on Debian)
$ apt install -y libjemalloc-dev
# set LD_PRELOAD before you start your Kafka Streams application
$ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so"

As of 2.3.0 the memory usage across all instances can be bounded, limiting the total off-heap memory of your Kafka Streams application. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared WriteBufferManager and count its memory against the block cache, and then pass the same Cache object to each instance. See RocksDB Memory Usage for details. An example RocksDBConfigSetter implementing this is shown below:

public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

   private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);1
   private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

   @Override
   public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

     BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

      // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
     tableConfig.setBlockCache(cache);
     tableConfig.setCacheIndexAndFilterBlocks(true);
     options.setWriteBufferManager(writeBufferManager);

      // These options are recommended to be set when bounding the total memory
     tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);2
     tableConfig.setPinTopLevelIndexAndFilter(true);
     tableConfig.setBlockSize(BLOCK_SIZE);3
     options.setMaxWriteBufferNumber(N_MEMTABLES);
     options.setWriteBufferSize(MEMTABLE_SIZE);

     options.setTableFormatConfig(tableConfig);
   }

   @Override
   public void close(final String storeName, final Options options) {
     // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
   }
}

1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for “high priority” (aka index and filter) blocks, preventing them from being evicted by data blocks. The boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. See the full signature of the LRUCache constructor here.
2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the RocksDB docs
3. You may want to modify the default block size per these instructions from the RocksDB docs. A larger block size means index blocks will be smaller, but the cached data blocks may contain more cold data that would otherwise be evicted.

Note: While we recommend setting at least the above configs, the specific options that yield the best performance are workload dependent and you should consider experimenting with these to determine the best choices for your specific use case. Keep in mind that the optimal configs for one app may not apply to one with a different topology or input topic. In addition to the recommended configs above, you may want to consider using partitioned index filters as described by the RocksDB docs.

Other memory usage

There are other modules inside Apache Kafka that allocate memory during runtime. They include the following:

  • Producer buffering, managed by the producer config buffer.memory.
  • Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e., fetch.max.bytes and fetch.max.wait.ms.
  • Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory. These are controlled by the send.buffer.bytes / receive.buffer.bytes configs.
  • Deserialized objects buffering: after consumer.poll() returns records, they will be deserialized to extract timestamp and buffered in the streams space. Currently this is only indirectly controlled by buffered.records.per.partition.

Tip

Iterators should be closed explicitly to release resources: Store iterators (e.g., KeyValueIterator and WindowStoreIterator) must be closed explicitly upon completeness to release resources such as open file handlers and in-memory read buffers, or use try-with-resources statement (available since JDK7) for this Closeable class.

Otherwise, stream application’s memory usage keeps increasing when running until it hits an OOM.

Previous Next

10 - Running Streams Applications

Running Streams Applications

You can run Java applications that use the Kafka Streams library without any additional configuration or requirements. Kafka Streams also provides the ability to receive notification of the various states of the application. The ability to monitor the runtime status is discussed in the monitoring guide.

Table of Contents

  • Starting a Kafka Streams application
  • Elastic scaling of your application
    • Adding capacity to your application
    • Removing capacity from your application
    • State restoration during workload rebalance
    • Determining how many application instances to run

Starting a Kafka Streams application

You can package your Java application as a fat JAR file and then start the application like this:

# Start the application in class `com.example.MyStreamsApp`
# from the fat JAR named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

When you start your application you are launching a Kafka Streams instance of your application. You can run multiple instances of your application. A common scenario is that there are multiple instances of your application running in parallel. For more information, see Parallelism Model.

When the application instance starts running, the defined processor topology will be initialized as one or more stream tasks. If the processor topology defines any state stores, these are also constructed during the initialization period. For more information, see the State restoration during workload rebalance section).

Elastic scaling of your application

Kafka Streams makes your stream processing applications elastic and scalable. You can add and remove processing capacity dynamically during application runtime without any downtime or data loss. This makes your applications resilient in the face of failures and for allows you to perform maintenance as needed (e.g. rolling upgrades).

For more information about this elasticity, see the Parallelism Model section. Kafka Streams leverages the Kafka group management functionality, which is built right into the Kafka wire protocol. It is the foundation that enables the elasticity of Kafka Streams applications: members of a group coordinate and collaborate jointly on the consumption and processing of data in Kafka. Additionally, Kafka Streams provides stateful processing and allows for fault-tolerant state in environments where application instances may come and go at any time.

Adding capacity to your application

If you need more processing capacity for your stream processing application, you can simply start another instance of your stream processing application, e.g. on another machine, in order to scale out. The instances of your application will become aware of each other and automatically begin to share the processing work. More specifically, what will be handed over from the existing instances to the new instances is (some of) the stream tasks that have been run by the existing instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks (the state of a stream task will be re-created in the target instance by restoring the state from its corresponding changelog topic).

The various instances of your application each run in their own JVM process, which means that each instance can leverage all the processing capacity that is available to their respective JVM process (minus the capacity that any non-Kafka-Streams part of your application may be using). This explains why running additional instances will grant your application additional processing capacity. The exact capacity you will be adding by running a new instance depends of course on the environment in which the new instance runs: available CPU cores, available main memory and Java heap space, local storage, network bandwidth, and so on. Similarly, if you stop any of the running instances of your application, then you are removing and freeing up the respective processing capacity.

Before adding capacity: only a single instance of your Kafka Streams application is running. At this point the corresponding Kafka consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.

After adding capacity: now two additional instances of your Kafka Streams application are running, and they have automatically joined the application’s Kafka consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.

Removing capacity from your application

To remove processing capacity, you can stop running stream processing application instances (e.g., shut down two of the four instances), it will automatically leave the application’s consumer group, and the remaining instances of your application will automatically take over the processing work. The remaining instances take over the stream tasks that were run by the stopped instances. Moving stream tasks from one instance to another results in moving the processing work plus any internal state of these stream tasks. The state of a stream task is recreated in the target instance from its changelog topic.

State restoration during workload rebalance

When a task is migrated, the task processing state is fully restored before the application instance resumes processing. This guarantees the correct processing results. In Kafka Streams, state restoration is usually done by replaying the corresponding changelog topic to reconstruct the state store. To minimize changelog-based restoration latency by using replicated local state stores, you can specify num.standby.replicas. When a stream task is initialized or re-initialized on the application instance, its state store is restored like this:

  • If no local state store exists, the changelog is replayed from the earliest to the current offset. This reconstructs the local state store to the most recent snapshot.
  • If a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog.

For more information, see Standby Replicas.

As of version 2.6, Streams will now do most of a task’s restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task. A stateful active task will only be assigned to an instance once its state is within the configured acceptable.recovery.lag, if one exists. This means that most of the time, a task migration will not result in downtime for that task. It will remain active on the instance that’s already caught up, while the instance that it’s being migrated to works on restoring the state. Streams will regularly probe for warmup tasks that have finished restoring and transition them to active tasks when ready.

Note, the one exception to this task availability is if none of the instances have a caught up version of that task. In that case, we have no choice but to assign the active task to an instance that is not caught up and will have to block further processing on restoration of the task’s state from the changelog. If high availability is important for your application, you are highly recommended to enable standbys.

Determining how many application instances to run

The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.

The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.

To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:

  • Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
  • Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.

Previous Next

11 - Managing Streams Application Topics

Managing Streams Application Topics

A Kafka Streams application continuously reads from Kafka topics, processes the read data, and then writes the processing results back into Kafka topics. The application may also auto-create other Kafka topics in the Kafka brokers, for example state store changelogs topics. This section describes the differences these topic types and how to manage the topics and your applications.

Kafka Streams distinguishes between user topics and internal topics.

User topics

User topics exist externally to an application and are read from or written to by the application, including:

Input topics Topics that are specified via source processors in the application’s topology; e.g. via StreamsBuilder#stream(), StreamsBuilder#table() and Topology#addSource(). Output topics Topics that are specified via sink processors in the application’s topology; e.g. via KStream#to(), KTable.to() and Topology#addSink().

User topics must be created and manually managed ahead of time (e.g., via the topic tools). If user topics are shared among multiple applications for reading and writing, the application users must coordinate topic management. If user topics are centrally managed, then application users then would not need to manage topics themselves but simply obtain access to them.

Note

You should not use the auto-create topic feature on the brokers to create user topics, because:

  • Auto-creation of topics may be disabled in your Kafka cluster.
  • Auto-creation automatically applies the default topic settings such as the replicaton factor. These default settings might not be what you want for certain output topics (e.g., auto.create.topics.enable=true in the Kafka broker configuration).

Internal topics

Internal topics are used internally by the Kafka Streams application while executing, for example the changelog topics for state stores. These topics are created by the application and are only used by that stream application.

If security is enabled on the Kafka brokers, you must grant the underlying clients admin permissions so that they can create internal topics set. For more information, see Streams Security.

Note

The internal topics follow the naming convention <application.id>-<operatorName>-<suffix>, but this convention is not guaranteed for future releases.

The following settings apply to the default configuration for internal topics:

  • For all internal topics, message.timestamp.type is set to CreateTime.
  • For internal repartition topics, the compaction policy is delete and the retention time is -1 (infinite).
  • For internal changelog topics for key-value stores, the compaction policy is compact.
  • For internal changelog topics for windowed key-value stores, the compaction policy is delete,compact. The retention time is set to 24 hours plus your setting for the windowed store.
  • For internal changelog topics for versioned state stores, the cleanup policy is compact, and min.compaction.lag.ms is set to 24 hours plus the store’s historyRetentionMs` value.

Previous Next

12 - Streams Security

Streams Security

Table of Contents

  • Required ACL setting for secure Kafka clusters
  • Security example

Kafka Streams natively integrates with the Kafka’s security features and supports all of the client-side security features in Kafka. Streams leverages the Java Producer and Consumer API.

To secure your Stream processing applications, configure the security settings in the corresponding Kafka producer and consumer clients, and then specify the corresponding configuration settings in your Kafka Streams application.

Kafka supports cluster encryption and authentication, including a mix of authenticated and unauthenticated, and encrypted and non-encrypted clients. Using security is optional.

Here a few relevant client-side security features:

Encrypt data-in-transit between your applications and Kafka brokers You can enable the encryption of the client-server communication between your applications and the Kafka brokers. For example, you can configure your applications to always use encryption when reading and writing data to and from Kafka. This is critical when reading and writing data across security domains such as internal network, public internet, and partner networks. Client authentication You can enable client authentication for connections from your application to Kafka brokers. For example, you can define that only specific applications are allowed to connect to your Kafka cluster. Client authorization You can enable client authorization of read and write operations by your applications. For example, you can define that only specific applications are allowed to read from a Kafka topic. You can also restrict write access to Kafka topics to prevent data pollution or fraudulent activities.

For more information about the security features in Apache Kafka, see Kafka Security.

Required ACL setting for secure Kafka clusters

Kafka clusters can use ACLs to control access to resources (like the ability to create topics), and for such clusters each client, including Kafka Streams, is required to authenticate as a particular user in order to be authorized with appropriate access. In particular, when Streams applications are run against a secured Kafka cluster, the principal running the application must have the ACL set so that the application has the permissions to create, read and write internal topics.

To avoid providing this permission to your application, you can create the required internal topics manually. If the internal topics exist, Kafka Streams will not try to recreate them. Note, that the internal repartition and changelog topics must be created with the correct number of partitions–otherwise, Kafka Streams will fail on startup. The topics must be created with the same number of partitions as your input topic, or if there are multiple topics, the maximum number of partitions across all input topics. Additionally, changelog topics must be created with log compaction enabled–otherwise, your application might lose data. For changelog topics for windowed KTables, apply “delete,compact” and set the retention time based on the corresponding store retention time. To avoid premature deletion, add a delta to the store retention time. By default, Kafka Streams adds 24 hours to the store retention time. You can find out more about the names of the required internal topics via Topology#describe(). All internal topics follow the naming pattern <application.id>-<operatorName>-<suffix> where the suffix is either repartition or changelog. Note, that there is no guarantee about this naming pattern in future releases–it’s not part of the public API.

Since all internal topics as well as the embedded consumer group name are prefixed with the application id, it is recommended to use ACLs on prefixed resource pattern to configure control lists to allow client to manage all topics and consumer groups started with this prefix as --resource-pattern-type prefixed --topic your.application.id --operation All (see KIP-277 and KIP-290 for details).

Security example

The purpose is to configure a Kafka Streams application to enable client authentication and encrypt data-in-transit when communicating with its Kafka cluster.

This example assumes that the Kafka brokers in the cluster already have their security setup and that the necessary SSL certificates are available to the application in the local filesystem locations. For example, if you are using Docker then you must also include these SSL certificates in the correct locations within the Docker image.

The snippet below shows the settings to enable client authentication and SSL encryption for data-in-transit between your Kafka Streams application and the Kafka cluster it is reading and writing from:

# Essential security settings to enable client authentication and SSL encryption
bootstrap.servers=kafka.example.com:9093
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Configure these settings in the application for your Properties instance. These settings will encrypt any data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the Kafka brokers that it is communicating with. Note that this example does not cover client authorization.

// Code of your Java application that uses the Kafka Streams library
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app");
// Where to find secure Kafka brokers.  Here, it's on port 9093.
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
//
// ...further non-security related settings may follow here...
//
// Security settings.
// 1. These settings must match the security settings of the secure Kafka cluster.
// 2. The SSL trust store and key store files must be locally accessible to the application.
settings.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
settings.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
settings.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");

If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you start it. For example, if you enter an incorrect password for the ssl.keystore.password setting, an error message similar to this would be logged and then the application would terminate:

# Misconfigured ssl.keystore.password
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[...snip...]
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
   java.io.IOException: Keystore was tampered with, or password was incorrect
[...snip...]
Caused by: java.security.UnrecoverableKeyException: Password verification failed

Monitor your Kafka Streams application log files for such error messages to spot any misconfigured applications quickly.

Previous Next

13 - Application Reset Tool

Application Reset Tool

You can reset an application and force it to reprocess its data from scratch by using the application reset tool. This can be useful for development and testing, or when fixing bugs.

The application reset tool handles the Kafka Streams user topics (input, and output) and internal topics differently when resetting the application.

Here’s what the application reset tool does for each topic type:

  • Input topics: Reset offsets to specified position (by default to the beginning of the topic).
  • Internal topics: Delete the internal topic (this automatically deletes any committed offsets).

The application reset tool does not:

  • Reset output topics of an application. If any output topics are consumed by downstream applications, it is your responsibility to adjust those downstream applications as appropriate when you reset the upstream application.
  • Reset the local environment of your application instances. It is your responsibility to delete the local state on any machine on which an application instance was run. See the instructions in section Step 2: Reset the local environments of your application instances on how to do this.

Prerequisites

  • All instances of your application must be stopped. Otherwise, the application may enter an invalid state, crash, or produce incorrect results. You can verify whether the consumer group with ID application.id is still active by using bin/kafka-consumer-groups. When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the --force option could remove those left-over members immediately. Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.

  • Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in application.id) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or your Kafka topics.

Step 1: Run the application reset tool

Invoke the application reset tool from the command line

Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them.

$ bin/kafka-streams-application-reset

The tool accepts the following parameters:

Option (* = required)                 Description
---------------------                 -----------
* --application-id <String: id>       The Kafka Streams application ID
                                        (application.id).
--bootstrap-server <String: server to  REQUIRED unless --bootstrap-servers
                            connect to>                            (deprecated) is specified. The server
                                         (s) to connect to. The broker list
                                         string in the form HOST1:PORT1,HOST2:
                                         PORT2.
--by-duration <String: urls>          Reset offsets to offset by duration from
                                        current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name>     Property file containing configs to be
                                        passed to admin clients and embedded
                                        consumer.
--dry-run                             Display the actions that would be
                                        performed without executing the reset
                                        commands.
--from-file <String: urls>            Reset offsets to values defined in CSV
                                        file.
--input-topics <String: list>         Comma-separated list of user input
                                        topics. For these topics, the tool will
                                        reset the offset to the earliest
                                        available offset.
--internal-topics <String: list>      Comma-separated list of internal topics
                                        to delete. Must be a subset of the
                                        internal topics marked for deletion by
                                        the default behaviour (do a dry-run without
                                        this option to view these topics).
--shift-by <Long: number-of-offsets>  Reset offsets shifting current offset by
                                        'n', where 'n' can be positive or
                                        negative
--to-datetime <String>                Reset offsets to offset from datetime.
                                        Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                         Reset offsets to earliest offset.
--to-latest                           Reset offsets to latest offset.
--to-offset <Long>                    Reset offsets to a specific offset.
--force                               Force removing members of the consumer group
                                      (intended to remove left-over members if
                                      long session timeout was configured).

Consider the following as reset-offset scenarios for input-topics:

  • by-duration
  • from-file
  • shift-by
  • to-datetime
  • to-earliest
  • to-latest
  • to-offset

Only one of these scenarios can be defined. If not, to-earliest will be executed by default

All the other parameters can be combined as needed. For example, if you want to restart an application from an empty internal state, but not reprocess previous data, simply omit the parameter --input-topics.

Step 2: Reset the local environments of your application instances

For a complete application reset, you must delete the application’s local state directory on any machines where the application instance was run. You must do this before restarting an application instance on the same machine. You can use either of these methods:

  • The API method KafkaStreams#cleanUp() in your application code.
  • Manually delete the corresponding local state directory (default location: /${java.io.tmpdir}/kafka-streams/<application.id>). For more information, see Streams javadocs.

Previous Next