You are viewing documentation for an older version (0.11.0) of Kafka. For up-to-date documentation, see the latest version.
If you want to upgrade from 0.10.2.x to 0.11.0 you don’t need to do any code changes as the public API is fully backward compatible. However, some configuration parameters were deprecated and thus it is recommend to update your code eventually to allow for future upgrades. See below a complete list of 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
If you want to upgrade from 0.10.1.x to 0.11.0, also see the Upgrade Section for 0.10.2. It highlights incompatible changes you need to consider to upgrade your code and application. See below a complete list of 0.10.2 and 0.11.0 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
Upgrading from 0.10.0.x to 0.11.0.x directly is also possible. Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher. See Streams API changes in 0.10.1, Streams API changes in 0.10.2, and Streams API changes in 0.11.0 for a complete list of API changes. Upgrading to 0.11.0.3 requires two rolling bounces with config upgrade.from="0.10.0"
set for first upgrade phase (cf. KIP-268). As an alternative, an offline upgrade is also possible.
upgrade.from
is set to "0.10.0"
for new version 0.11.0.3upgrade.mode
Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)
Updates in StreamsConfig
:
processing.guarantee
is addedkey.serde
was deprecated and replaced by default.key.serde
value.serde
was deprecated and replaced by default.value.serde
timestamp.extractor
was deprecated and replaced by default.timestamp.extractor
#keySerde()
was deprecated and replaced by #defaultKeySerde()
#valueSerde()
was deprecated and replaced by #defaultValueSerde()
#defaultTimestampExtractor()
was addedNew methods in TopologyBuilder
:
#addSource()
that allow to define a TimestampExtractor
per source node#addGlobalStore()
that allow to define a TimestampExtractor
per source node associated with the global storeNew methods in KStreamBuilder
:
#stream()
that allow to define a TimestampExtractor
per input stream#table()
that allow to define a TimestampExtractor
per input table#globalKTable()
that allow to define a TimestampExtractor
per global tableDeprecated methods in KTable
:
void foreach(final ForeachAction<? super K, ? super V> action)
void print()
void print(final String streamName)
void print(final Serde<K> keySerde, final Serde<V> valSerde)
void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
void writeAsText(final String filePath)
void writeAsText(final String filePath, final String streamName)
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)
The above methods have been deprecated in favor of using the Interactive Queries API. If you want to query the current content of the state store backing the KTable, use the following approach:
KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)
ReadOnlyKeyValueStore.all()
to iterate over the keys of a KTable
.If you want to view the changelog stream of the KTable
then you could call KTable.toStream().print()
.
Metrics using exactly-once semantics:
If exactly-once processing is enabled via the processing.guarantees
parameter, internally Streams switches from a producer per thread to a producer per task runtime model. In order to distinguish the different producers, the producer’s client.id
additionally encodes the task-ID for this case. Because the producer’s client.id
is used to report JMX metrics, it might be required to update tools that receive those metrics.
Producer’s client.id
naming schema:
[client.Id]-StreamThread-[sequence-number]
[client.Id]-StreamThread-[sequence-number]-[taskId]
[client.Id]
is either set via Streams configuration parameter client.id
or defaults to [application.id]-[processId]
([processId]
is a random UUID).
Parameter updates in StreamsConfig
:
retries
and consumer’s max.poll.interval.ms
have been changed to improve the resiliency of a Kafka Streams applicationNew methods in KafkaStreams
:
#setStateListener(StateListener listener)
#state()
#metrics()
#close(long timeout, TimeUnit timeUnit)
#toString(String indent)
Parameter updates in StreamsConfig
:
zookeeper.connect
was deprecated; a Kafka Streams application does no longer interact with Zookeeper for topic management but uses the new broker admin protocol (cf. KIP-4, Section “Topic Admin Schema”)Changes in StreamsMetrics
interface:
#addLatencySensor()
#addLatencyAndThroughputSensor()
, #addThroughputSensor()
, #recordThroughput()
, #addSensor()
, #removeSensor()
New methods in TopologyBuilder
:
#addSource()
that allow to define a auto.offset.reset
policy per source node#addGlobalStore()
to add global StateStore
sNew methods in KStreamBuilder
:
#stream()
and #table()
that allow to define a auto.offset.reset
policy per input stream/table#globalKTable()
to create a GlobalKTable
New joins for KStream
:
#join()
to join with KTable
#join()
and leftJoin()
to join with GlobalKTable
Aligned null
-key handling for KTable
joins:
KTable-KTable
joins do not throw an exception on null
key records anymore, but drop those records silentlyNew window type Session Windows :
SessionWindows
to specify session windowsKGroupedStream
methods #count()
, #reduce()
, and #aggregate()
to allow session window aggregationsChanges to TimestampExtractor
:
#extract()
has a second parameter nowFailOnInvalidTimestamp
(it gives the same behavior as old (and removed) default extractor ConsumerRecordTimestampExtractor
)LogAndSkipOnInvalidTimestamp
and UsePreviousTimeOnInvalidTimestamps
Relaxed type constraints of many DSL interfaces, classes, and methods (cf. KIP-100).
Stream grouping and aggregation split into two methods:
Auto Repartitioning:
TopologyBuilder:
DSL: new parameter to specify state store names:
Windowing:
Previous Next
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.