Monitoring

Monitoring

Monitoring

Kafka uses Yammer Metrics for metrics reporting in the server. The Java clients use Kafka Metrics, a built-in metrics registry that minimizes transitive dependencies pulled into client applications. Both expose metrics via JMX and can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.

All Kafka rate metrics have a corresponding cumulative count metric with suffix -total. For example, records-consumed-rate has a corresponding metric named records-consumed-total.

The easiest way to see the available metrics is to fire up jconsole and point it at a running kafka client or server; this will allow browsing all metrics with JMX.

Security Considerations for Remote Monitoring using JMX

Apache Kafka disables remote JMX by default. You can enable remote monitoring using JMX by setting the environment variable JMX_PORT for processes started using the CLI or standard Java system properties to enable remote JMX programmatically. You must enable security when enabling remote JMX in production scenarios to ensure that unauthorized users cannot monitor or control your broker or application as well as the platform on which these are running. Note that authentication is disabled for JMX by default in Kafka and security configs must be overridden for production deployments by setting the environment variable KAFKA_JMX_OPTS for processes started using the CLI or by setting appropriate Java system properties. See Monitoring and Management Using JMX Technology for details on securing JMX.

We do graphing and alerting on the following metrics: DescriptionMbean nameNormal value
Message in ratekafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+)Incoming message rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Byte in rate from clientskafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=([-.\w]+)Byte in (from the clients) rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Byte in rate from other brokerskafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSecByte in (from the other brokers) rate across all topics.
Controller Request rate from Brokerkafka.controller:type=ControllerChannelManager,name=RequestRateAndQueueTimeMs,brokerId=([0-9]+)The rate (requests per second) at which the ControllerChannelManager takes requests from the queue of the given broker. And the time it takes for a request to stay in this queue before it is taken from the queue.
Controller Event queue sizekafka.controller:type=ControllerEventManager,name=EventQueueSizeSize of the ControllerEventManager’s queue.
Controller Event queue timekafka.controller:type=ControllerEventManager,name=EventQueueTimeMsTime that takes for any event (except the Idle event) to wait in the ControllerEventManager’s queue before being processed
Request ratekafka.network:type=RequestMetrics,name=RequestsPerSec,request={ProduceFetchConsumer
Error ratekafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)Number of errors in responses counted per-request-type, per-error-code. If a response contains multiple errors, all are counted. error=NONE indicates successful responses.
Produce request ratekafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec,topic=([-.\w]+)Produce request rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Fetch request ratekafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec,topic=([-.\w]+)Fetch request (from clients or followers) rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Failed produce request ratekafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic=([-.\w]+)Failed Produce request rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Failed fetch request ratekafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic=([-.\w]+)Failed Fetch request (from clients or followers) rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Request size in byteskafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+)Size of requests for each request type.
Temporary memory size in byteskafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={ProduceFetch}
Message conversion timekafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={ProduceFetch}
Message conversion ratekafka.server:type=BrokerTopicMetrics,name={ProduceFetch}MessageConversionsPerSec,topic=([-.\w]+)
Request Queue Sizekafka.network:type=RequestChannel,name=RequestQueueSizeSize of the request queue.
Byte out rate to clientskafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=([-.\w]+)Byte out (to the clients) rate per topic. Omitting ’topic=(…)’ will yield the all-topic rate.
Byte out rate to other brokerskafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSecByte out (to the other brokers) rate across all topics
Rejected byte ratekafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=([-.\w]+)Rejected byte rate per topic, due to the record batch size being greater than max.message.bytes configuration. Omitting ’topic=(…)’ will yield the all-topic rate.
Message validation failure rate due to no key specified for compacted topickafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec0
Message validation failure rate due to invalid magic numberkafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec0
Message validation failure rate due to incorrect crc checksumkafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec0
Message validation failure rate due to non-continuous offset or sequence number in batchkafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec0
Log flush rate and timekafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs

of offline log directories | kafka.log:type=LogManager,name=OfflineLogDirectoryCount | 0

Leader election rate | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | non-zero when there are broker failures
Unclean leader election rate | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | 0
Is controller active on broker | kafka.controller:type=KafkaController,name=ActiveControllerCount | only one broker in the cluster should have 1
Pending topic deletes | kafka.controller:type=KafkaController,name=TopicsToDeleteCount |
Pending replica deletes | kafka.controller:type=KafkaController,name=ReplicasToDeleteCount |
Ineligible pending topic deletes | kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount |
Ineligible pending replica deletes | kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount |

of under replicated partitions (|ISR| < |all replicas|) | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 0

of under minIsr partitions (|ISR| < min.insync.replicas) | kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount | 0

of at minIsr partitions (|ISR| = min.insync.replicas) | kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount | 0

Producer Id counts | kafka.server:type=ReplicaManager,name=ProducerIdCount | Count of all producer ids created by transactional and idempotent producers in each replica on the broker
Partition counts | kafka.server:type=ReplicaManager,name=PartitionCount | mostly even across brokers
Offline Replica counts | kafka.server:type=ReplicaManager,name=OfflineReplicaCount | 0
Leader replica counts | kafka.server:type=ReplicaManager,name=LeaderCount | mostly even across brokers
ISR shrink rate | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec | If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | See above
Failed ISR update rate | kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec | 0
Max lag in messages btw follower and leader replicas | kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica | lag should be proportional to the maximum batch size of a produce request.
Lag in messages per follower replica | kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) | lag should be proportional to the maximum batch size of a produce request.
Requests waiting in the producer purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce | non-zero if ack=-1 is used
Requests waiting in the fetch purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch | size depends on fetch.wait.max.ms in the consumer
Request total time | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} | broken into queue, local, remote and response send time
Time the request waits in the request queue | kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time the request is processed at the leader | kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time the request waits for the follower | kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower} | non-zero for produce requests when ack=-1
Time the request waits in the response queue | kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time to send the response | kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. | kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max |
The average fraction of time the network processors are idle | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent | between 0 and 1, ideally > 0.3
The number of connections disconnected on a processor due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication | kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count | ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this (listener, processor) combination
The total number of connections disconnected, across all processors, due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication | kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount | ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this broker
The average fraction of time the request handler threads are idle | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent | between 0 and 1, ideally > 0.3
Bandwidth quota metrics per (user, client-id), user or client-id | kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+) | Two attributes. throttle-time indicates the amount of time in ms the client was throttled. Ideally = 0. byte-rate indicates the data produce/consume rate of the client in bytes/sec. For (user, client-id) quotas, both user and client-id are specified. If per-client-id quota is applied to the client, user is not specified. If per-user quota is applied, client-id is not specified.
Request quota metrics per (user, client-id), user or client-id | kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) | Two attributes. throttle-time indicates the amount of time in ms the client was throttled. Ideally = 0. request-time indicates the percentage of time spent in broker network and I/O threads to process requests from client group. For (user, client-id) quotas, both user and client-id are specified. If per-client-id quota is applied to the client, user is not specified. If per-user quota is applied, client-id is not specified.
Requests exempt from throttling | kafka.server:type=Request | exempt-throttle-time indicates the percentage of time spent in broker network and I/O threads to process requests that are exempt from throttling.
Max time to load group metadata | kafka.server:type=group-coordinator-metrics,name=partition-load-time-max | maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Avg time to load group metadata | kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg | average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Max time to load transaction metadata | kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max | maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Avg time to load transaction metadata | kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg | average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)
Rate of transactional verification errors | kafka.server:type=AddPartitionsToTxnManager,name=VerificationFailureRate | Rate of verifications that returned in failure either from the AddPartitionsToTxn API response or through errors in the AddPartitionsToTxnManager. In steady state 0, but transient errors are expected during rolls and reassignments of the transactional state partition.
Time to verify a transactional request | kafka.server:type=AddPartitionsToTxnManager,name=VerificationTimeMs | The amount of time queueing while a possible previous request is in-flight plus the round trip to the transaction coordinator to verify (or not verify)
Number of reassigning partitions | kafka.server:type=ReplicaManager,name=ReassigningPartitions | The number of reassigning leader partitions on a broker.
Outgoing byte rate of reassignment traffic | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec | 0; non-zero when a partition reassignment is in progress.
Incoming byte rate of reassignment traffic | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec | 0; non-zero when a partition reassignment is in progress.
Size of a partition on disk (in bytes) | kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+) | The size of a partition on disk, measured in bytes.
Number of log segments in a partition | kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+) | The number of log segments in a partition.
First offset in a partition | kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+) | The first offset in a partition.
Last offset in a partition | kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+) | The last offset in a partition.
Remaining logs to recover | kafka.log:type=LogManager,name=remainingLogsToRecover | The number of remaining logs for each log.dir to be recovered.This metric provides an overview of the recovery progress for a given log directory.
Remaining segments to recover for the current recovery thread | kafka.log:type=LogManager,name=remainingSegmentsToRecover | The number of remaining segments assigned to the currently active recovery thread.
Log directory offline status | kafka.log:type=LogManager,name=LogDirectoryOffline | Indicates if a log directory is offline (1) or online (0).

Group Coordinator Monitoring

The following set of metrics are available for monitoring the group coordinator:

The Partition Count, per State | kafka.server:type=group-coordinator-metrics,name=partition-count,state={loading|active|failed} | The number of __consumer_offsets partitions hosted by the broker, broken down by state
—|—|—
Partition Maximum Loading Time | kafka.server:type=group-coordinator-metrics,name=partition-load-time-max | The maximum loading time needed to read the state from the __consumer_offsets partitions
Partition Average Loading Time | kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg | The average loading time needed to read the state from the __consumer_offsets partitions
Average Thread Idle Ratio | kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg | The average idle ratio of the coordinator threads
Event Queue Size | kafka.server:type=group-coordinator-metrics,name=event-queue-size | The number of events waiting to be processed in the queue
Event Queue Time (Ms) | kafka.server:type=group-coordinator-metrics,name=event-queue-time-ms-[max|p50|p99|p999] | The time that an event spent waiting in the queue to be processed
Event Processing Time (Ms) | kafka.server:type=group-coordinator-metrics,name=event-processing-time-ms-[max|p50|p99|p999] | The time that an event took to be processed
Event Purgatory Time (Ms) | kafka.server:type=group-coordinator-metrics,name=event-purgatory-time-ms-[max|p50|p99|p999] | The time that an event waited in the purgatory before being completed
Batch Flush Time (Ms) | kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p99|p999] | The time that a batch took to be flushed to the local partition
Group Count, per group type | kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic} | Total number of group per group type: Classic or Consumer
Consumer Group Count, per state | kafka.server:type=group-coordinator-metrics,name=consumer-group-count,state=[empty|assigning|reconciling|stable|dead] | Total number of Consumer Groups in each state: Empty, Assigning, Reconciling, Stable, Dead
Consumer Group Rebalance Rate | kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-rate | The rebalance rate of consumer groups
Consumer Group Rebalance Count | kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-count | Total number of Consumer Group Rebalances
Classic Group Count | kafka.server:type=GroupMetadataManager,name=NumGroups | Total number of Classic Groups
Classic Group Count, per State | kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead] | The number of Classic Groups in each state: PreparingRebalance, CompletingRebalance, Empty, Stable, Dead
Classic Group Completed Rebalance Rate | kafka.server:type=group-coordinator-metrics,name=group-completed-rebalance-rate | The rate of classic group completed rebalances
Classic Group Completed Rebalance Count | kafka.server:type=group-coordinator-metrics,name=group-completed-rebalance-count | The total number of classic group completed rebalances
Group Offset Count | kafka.server:type=GroupMetadataManager,name=NumOffsets | Total number of committed offsets for Classic and Consumer Groups
Offset Commit Rate | kafka.server:type=group-coordinator-metrics,name=offset-commit-rate | The rate of committed offsets
Offset Commit Count | kafka.server:type=group-coordinator-metrics,name=offset-commit-count | The total number of committed offsets
Offset Expiration Rate | kafka.server:type=group-coordinator-metrics,name=offset-expiration-rate | The rate of expired offsets
Offset Expiration Count | kafka.server:type=group-coordinator-metrics,name=offset-expiration-count | The total number of expired offsets
Offset Deletion Rate | kafka.server:type=group-coordinator-metrics,name=offset-deletion-rate | The rate of administrative deleted offsets
Offset Deletion Count | kafka.server:type=group-coordinator-metrics,name=offset-deletion-count | The total number of administrative deleted offsets

Tiered Storage Monitoring

The following set of metrics are available for monitoring of the tiered storage feature:

Metric/Attribute nameDescriptionMbean name
Remote Fetch Bytes Per SecRate of bytes read from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteFetchBytesPerSec,topic=([-.\w]+)
Remote Fetch Requests Per SecRate of read requests from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteFetchRequestsPerSec,topic=([-.\w]+)
Remote Fetch Errors Per SecRate of read errors from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteFetchErrorsPerSec,topic=([-.\w]+)
Remote Copy Bytes Per SecRate of bytes copied to remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteCopyBytesPerSec,topic=([-.\w]+)
Remote Copy Requests Per SecRate of write requests to remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteCopyRequestsPerSec,topic=([-.\w]+)
Remote Copy Errors Per SecRate of write errors from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteCopyErrorsPerSec,topic=([-.\w]+)
Remote Copy Lag BytesBytes which are eligible for tiering, but are not in remote storage yet. Omitting ’topic=(…)’ will yield the all-topic sumkafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagBytes,topic=([-.\w]+)
Remote Copy Lag SegmentsSegments which are eligible for tiering, but are not in remote storage yet. Omitting ’topic=(…)’ will yield the all-topic countkafka.server:type=BrokerTopicMetrics,name=RemoteCopyLagSegments,topic=([-.\w]+)
Remote Delete Requests Per SecRate of delete requests to remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteDeleteRequestsPerSec,topic=([-.\w]+)
Remote Delete Errors Per SecRate of delete errors from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=RemoteDeleteErrorsPerSec,topic=([-.\w]+)
Remote Delete Lag BytesTiered bytes which are eligible for deletion, but have not been deleted yet. Omitting ’topic=(…)’ will yield the all-topic sumkafka.server:type=BrokerTopicMetrics,name=RemoteDeleteLagBytes,topic=([-.\w]+)
Remote Delete Lag SegmentsTiered segments which are eligible for deletion, but have not been deleted yet. Omitting ’topic=(…)’ will yield the all-topic countkafka.server:type=BrokerTopicMetrics,name=RemoteDeleteLagSegments,topic=([-.\w]+)
Build Remote Log Aux State Requests Per SecRate of requests for rebuilding the auxiliary state from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=BuildRemoteLogAuxStateRequestsPerSec,topic=([-.\w]+)
Build Remote Log Aux State Errors Per SecRate of errors for rebuilding the auxiliary state from remote storage per topic. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=BrokerTopicMetrics,name=BuildRemoteLogAuxStateErrorsPerSec,topic=([-.\w]+)
Remote Log Size Computation TimeThe amount of time needed to compute the size of the remote log. Omitting ’topic=(…)’ will yield the all-topic timekafka.server:type=BrokerTopicMetrics,name=RemoteLogSizeComputationTime,topic=([-.\w]+)
Remote Log Size BytesThe total size of a remote log in bytes. Omitting ’topic=(…)’ will yield the all-topic sumkafka.server:type=BrokerTopicMetrics,name=RemoteLogSizeBytes,topic=([-.\w]+)
Remote Log Metadata CountThe total number of metadata entries for remote storage. Omitting ’topic=(…)’ will yield the all-topic countkafka.server:type=BrokerTopicMetrics,name=RemoteLogMetadataCount,topic=([-.\w]+)
Delayed Remote Fetch Expires Per SecThe number of expired remote fetches per second. Omitting ’topic=(…)’ will yield the all-topic ratekafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec,topic=([-.\w]+)
RemoteLogReader Task Queue SizeSize of the queue holding remote storage read tasksorg.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderTaskQueueSize
RemoteLogReader Avg Idle PercentAverage idle percent of thread pool for processing remote storage read tasksorg.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderAvgIdlePercent
RemoteLogManager Tasks Avg Idle PercentAverage idle percent of thread pool for copying data to remote storagekafka.log.remote:type=RemoteLogManager,name=RemoteLogManagerTasksAvgIdlePercent
RemoteLogManager Avg Broker Fetch Throttle TimeThe average time in millis remote fetches was throttled by a brokerkafka.server:type=RemoteLogManager, name=remote-fetch-throttle-time-avg
RemoteLogManager Max Broker Fetch Throttle TimeThe max time in millis remote fetches was throttled by a brokerkafka.server:type=RemoteLogManager, name=remote-fetch-throttle-time-max
RemoteLogManager Avg Broker Copy Throttle TimeThe average time in millis remote copies was throttled by a brokerkafka.server:type=RemoteLogManager, name=remote-copy-throttle-time-avg
RemoteLogManager Max Broker Copy Throttle TimeThe max time in millis remote copies was throttled by a brokerkafka.server:type=RemoteLogManager, name=remote-copy-throttle-time-max

KRaft Monitoring Metrics

The set of metrics that allow monitoring of the KRaft quorum and the metadata log.
Note that some exposed metrics depend on the role of the node as defined by process.roles

KRaft Quorum Monitoring Metrics

These metrics are reported on both Controllers and Brokers in a KRaft Cluster Metric/Attribute nameDescriptionMbean name
Current StateThe current state of this member; possible values are leader, candidate, voted, follower, unattached, observer.kafka.server:type=raft-metrics
Current LeaderThe current quorum leader’s id; -1 indicates unknown.kafka.server:type=raft-metrics
Current VotedThe current voted leader’s id; -1 indicates not voted for anyone.kafka.server:type=raft-metrics
Current EpochThe current quorum epoch.kafka.server:type=raft-metrics
High WatermarkThe high watermark maintained on this member; -1 if it is unknown.kafka.server:type=raft-metrics
Log End OffsetThe current raft log end offset.kafka.server:type=raft-metrics
Number of Unknown Voter ConnectionsNumber of unknown voters whose connection information is not cached. This value of this metric is always 0.kafka.server:type=raft-metrics
Average Commit LatencyThe average time in milliseconds to commit an entry in the raft log.kafka.server:type=raft-metrics
Maximum Commit LatencyThe maximum time in milliseconds to commit an entry in the raft log.kafka.server:type=raft-metrics
Average Election LatencyThe average time in milliseconds spent on electing a new leader.kafka.server:type=raft-metrics
Maximum Election LatencyThe maximum time in milliseconds spent on electing a new leader.kafka.server:type=raft-metrics
Fetch Records RateThe average number of records fetched from the leader of the raft quorum.kafka.server:type=raft-metrics
Append Records RateThe average number of records appended per sec by the leader of the raft quorum.kafka.server:type=raft-metrics
Average Poll Idle RatioThe average fraction of time the client’s poll() is idle as opposed to waiting for the user code to process records.kafka.server:type=raft-metrics
Current Metadata VersionOutputs the feature level of the current effective metadata version.kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
Metadata Snapshot Load CountThe total number of times we have loaded a KRaft snapshot since the process was started.kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Latest Metadata Snapshot SizeThe total size in bytes of the latest snapshot that the node has generated. If none have been generated yet, this is the size of the latest snapshot that was loaded. If no snapshots have been generated or loaded, this is 0.kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
Latest Metadata Snapshot AgeThe interval in milliseconds since the latest snapshot that the node has generated. If none have been generated yet, this is approximately the time delta since the process was started.kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

KRaft Controller Monitoring Metrics

Metric/Attribute nameDescriptionMbean name
Active Controller CountThe number of Active Controllers on this node. Valid values are ‘0’ or ‘1’.kafka.controller:type=KafkaController,name=ActiveControllerCount
Event Queue Time MsA Histogram of the time in milliseconds that requests spent waiting in the Controller Event Queue.kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs
Event Queue Processing Time MsA Histogram of the time in milliseconds that requests spent being processed in the Controller Event Queue.kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs
Fenced Broker CountThe number of fenced brokers as observed by this Controller.kafka.controller:type=KafkaController,name=FencedBrokerCount
Active Broker CountThe number of active brokers as observed by this Controller.kafka.controller:type=KafkaController,name=ActiveBrokerCount
Global Topic CountThe number of global topics as observed by this Controller.kafka.controller:type=KafkaController,name=GlobalTopicCount
Global Partition CountThe number of global partitions as observed by this Controller.kafka.controller:type=KafkaController,name=GlobalPartitionCount
Offline Partition CountThe number of offline topic partitions (non-internal) as observed by this Controller.kafka.controller:type=KafkaController,name=OfflinePartitionsCount
Preferred Replica Imbalance CountThe count of topic partitions for which the leader is not the preferred leader.kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
Metadata Error CountThe number of times this controller node has encountered an error during metadata log processing.kafka.controller:type=KafkaController,name=MetadataErrorCount
Last Applied Record OffsetThe offset of the last record from the cluster metadata partition that was applied by the Controller.kafka.controller:type=KafkaController,name=LastAppliedRecordOffset
Last Committed Record OffsetThe offset of the last record committed to this Controller.kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
Last Applied Record TimestampThe timestamp of the last record from the cluster metadata partition that was applied by the Controller.kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp
Last Applied Record Lag MsThe difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the controller. For active Controllers the value of this lag is always zero.kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
Timed-out Broker Heartbeat CountThe number of broker heartbeats that timed out on this controller since the process was started. Note that only active controllers handle heartbeats, so only they will see increases in this metric.kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
Number Of Operations Started In Event QueueThe total number of controller event queue operations that were started. This includes deferred operations.kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
Number of Operations Timed Out In Event QueueThe total number of controller event queue operations that timed out before they could be performed.kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
Number Of New Controller ElectionsCounts the number of times this node has seen a new controller elected. A transition to the “no leader” state is not counted here. If the same controller as before becomes active, that still counts.kafka.controller:type=KafkaController,name=NewActiveControllersCount

KRaft Broker Monitoring Metrics

Metric/Attribute nameDescriptionMbean name
Last Applied Record OffsetThe offset of the last record from the cluster metadata partition that was applied by the brokerkafka.server:type=broker-metadata-metrics
Last Applied Record TimestampThe timestamp of the last record from the cluster metadata partition that was applied by the broker.kafka.server:type=broker-metadata-metrics
Last Applied Record Lag MsThe difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the brokerkafka.server:type=broker-metadata-metrics
Metadata Load Error CountThe number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new MetadataDelta based on it.kafka.server:type=broker-metadata-metrics
Metadata Apply Error CountThe number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta.kafka.server:type=broker-metadata-metrics

Common monitoring metrics for producer/consumer/connect/streams

The following metrics are available on producer/consumer/connector/streams instances. For specific metrics, please see following sections. Metric/Attribute nameDescriptionMbean name
connection-close-rateConnections closed per second in the window.kafka.[producer
connection-close-totalTotal connections closed in the window.kafka.[producer
connection-creation-rateNew connections established per second in the window.kafka.[producer
connection-creation-totalTotal new connections established in the window.kafka.[producer
network-io-rateThe average number of network operations (reads or writes) on all connections per second.kafka.[producer
network-io-totalThe total number of network operations (reads or writes) on all connections.kafka.[producer
outgoing-byte-rateThe average number of outgoing bytes sent per second to all servers.kafka.[producer
outgoing-byte-totalThe total number of outgoing bytes sent to all servers.kafka.[producer
request-rateThe average number of requests sent per second.kafka.[producer
request-totalThe total number of requests sent.kafka.[producer
request-size-avgThe average size of all requests in the window.kafka.[producer
request-size-maxThe maximum size of any request sent in the window.kafka.[producer
incoming-byte-rateBytes/second read off all sockets.kafka.[producer
incoming-byte-totalTotal bytes read off all sockets.kafka.[producer
response-rateResponses received per second.kafka.[producer
response-totalTotal responses received.kafka.[producer
select-rateNumber of times the I/O layer checked for new I/O to perform per second.kafka.[producer
select-totalTotal number of times the I/O layer checked for new I/O to perform.kafka.[producer
io-wait-time-ns-avgThe average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.kafka.[producer
io-wait-time-ns-totalThe total time the I/O thread spent waiting in nanoseconds.kafka.[producer
io-wait-ratioThe fraction of time the I/O thread spent waiting.kafka.[producer
io-time-ns-avgThe average length of time for I/O per select call in nanoseconds.kafka.[producer
io-time-ns-totalThe total time the I/O thread spent doing I/O in nanoseconds.kafka.[producer
io-ratioThe fraction of time the I/O thread spent doing I/O.kafka.[producer
connection-countThe current number of active connections.kafka.[producer
successful-authentication-rateConnections per second that were successfully authenticated using SASL or SSL.kafka.[producer
successful-authentication-totalTotal connections that were successfully authenticated using SASL or SSL.kafka.[producer
failed-authentication-rateConnections per second that failed authentication.kafka.[producer
failed-authentication-totalTotal connections that failed authentication.kafka.[producer
successful-reauthentication-rateConnections per second that were successfully re-authenticated using SASL.kafka.[producer
successful-reauthentication-totalTotal connections that were successfully re-authenticated using SASL.kafka.[producer
reauthentication-latency-maxThe maximum latency in ms observed due to re-authentication.kafka.[producer
reauthentication-latency-avgThe average latency in ms observed due to re-authentication.kafka.[producer
failed-reauthentication-rateConnections per second that failed re-authentication.kafka.[producer
failed-reauthentication-totalTotal connections that failed re-authentication.kafka.[producer
successful-authentication-no-reauth-totalTotal connections that were successfully authenticated by older, pre-2.2.0 SASL clients that do not support re-authentication. May only be non-zero.kafka.[producer

Common Per-broker metrics for producer/consumer/connect/streams

The following metrics are available on producer/consumer/connector/streams instances. For specific metrics, please see following sections. Metric/Attribute nameDescriptionMbean name
outgoing-byte-rateThe average number of outgoing bytes sent per second for a node.kafka.[producer
outgoing-byte-totalThe total number of outgoing bytes sent for a node.kafka.[producer
request-rateThe average number of requests sent per second for a node.kafka.[producer
request-totalThe total number of requests sent for a node.kafka.[producer
request-size-avgThe average size of all requests in the window for a node.kafka.[producer
request-size-maxThe maximum size of any request sent in the window for a node.kafka.[producer
incoming-byte-rateThe average number of bytes received per second for a node.kafka.[producer
incoming-byte-totalThe total number of bytes received for a node.kafka.[producer
request-latency-avgThe average request latency in ms for a node.kafka.[producer
request-latency-maxThe maximum request latency in ms for a node.kafka.[producer
response-rateResponses received per second for a node.kafka.[producer
response-totalTotal responses received for a node.kafka.[producer

Producer monitoring

The following metrics are available on producer instances. Metric/Attribute nameDescriptionMbean name
waiting-threadsThe number of user threads blocked waiting for buffer memory to enqueue their records.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-total-bytesThe maximum amount of buffer memory the client can use (whether or not it is currently used).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-available-bytesThe total amount of buffer memory that is not being used (either unallocated or in the free list).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-exhausted-rateThe average per-second number of record sends that are dropped due to buffer exhaustionkafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-exhausted-totalThe total number of record sends that are dropped due to buffer exhaustionkafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-timeThe fraction of time an appender waits for space allocation.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-ratioThe fraction of time an appender waits for space allocation.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-time-ns-totalThe total time an appender waits for space allocation in nanoseconds.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
flush-time-ns-totalThe total time the Producer spent in Producer.flush in nanoseconds.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-init-time-ns-totalThe total time the Producer spent initializing transactions in nanoseconds (for EOS).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-begin-time-ns-totalThe total time the Producer spent in beginTransaction in nanoseconds (for EOS).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-send-offsets-time-ns-totalThe total time the Producer spent sending offsets to transactions in nanoseconds (for EOS).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-commit-time-ns-totalThe total time the Producer spent committing transactions in nanoseconds (for EOS).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
txn-abort-time-ns-totalThe total time the Producer spent aborting transactions in nanoseconds (for EOS).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
metadata-wait-time-ns-totalthe total time in nanoseconds that has spent waiting for metadata from the Kafka brokerkafka.producer:type=producer-metrics,client-id=([-.\w]+)

Producer Sender Metrics

Metric/Attribute nameDescriptionMbean name
batch-size-avgThe average number of bytes sent per partition per-request.kafka.producer:type=producer-metrics,client-id="{client-id}"
batch-size-maxThe max number of bytes sent per partition per-request.kafka.producer:type=producer-metrics,client-id="{client-id}"
batch-split-rateThe average number of batch splits per secondkafka.producer:type=producer-metrics,client-id="{client-id}"
batch-split-totalThe total number of batch splitskafka.producer:type=producer-metrics,client-id="{client-id}"
compression-rate-avgThe average compression rate of record batches, defined as the average ratio of the compressed batch size over the uncompressed size.kafka.producer:type=producer-metrics,client-id="{client-id}"
metadata-ageThe age in seconds of the current producer metadata being used.kafka.producer:type=producer-metrics,client-id="{client-id}"
produce-throttle-time-avgThe average time in ms a request was throttled by a brokerkafka.producer:type=producer-metrics,client-id="{client-id}"
produce-throttle-time-maxThe maximum time in ms a request was throttled by a brokerkafka.producer:type=producer-metrics,client-id="{client-id}"
record-error-rateThe average per-second number of record sends that resulted in errorskafka.producer:type=producer-metrics,client-id="{client-id}"
record-error-totalThe total number of record sends that resulted in errorskafka.producer:type=producer-metrics,client-id="{client-id}"
record-queue-time-avgThe average time in ms record batches spent in the send buffer.kafka.producer:type=producer-metrics,client-id="{client-id}"
record-queue-time-maxThe maximum time in ms record batches spent in the send buffer.kafka.producer:type=producer-metrics,client-id="{client-id}"
record-retry-rateThe average per-second number of retried record sendskafka.producer:type=producer-metrics,client-id="{client-id}"
record-retry-totalThe total number of retried record sendskafka.producer:type=producer-metrics,client-id="{client-id}"
record-send-rateThe average number of records sent per second.kafka.producer:type=producer-metrics,client-id="{client-id}"
record-send-totalThe total number of records sent.kafka.producer:type=producer-metrics,client-id="{client-id}"
record-size-avgThe average record sizekafka.producer:type=producer-metrics,client-id="{client-id}"
record-size-maxThe maximum record sizekafka.producer:type=producer-metrics,client-id="{client-id}"
records-per-request-avgThe average number of records per request.kafka.producer:type=producer-metrics,client-id="{client-id}"
request-latency-avgThe average request latency in mskafka.producer:type=producer-metrics,client-id="{client-id}"
request-latency-maxThe maximum request latency in mskafka.producer:type=producer-metrics,client-id="{client-id}"
requests-in-flightThe current number of in-flight requests awaiting a response.kafka.producer:type=producer-metrics,client-id="{client-id}"
byte-rateThe average number of bytes sent per second for a topic.kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
byte-totalThe total number of bytes sent for a topic.kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
compression-rateThe average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size over the uncompressed size.kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-error-rateThe average per-second number of record sends that resulted in errors for a topickafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-error-totalThe total number of record sends that resulted in errors for a topickafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-retry-rateThe average per-second number of retried record sends for a topickafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-retry-totalThe total number of retried record sends for a topickafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-send-rateThe average number of records sent per second for a topic.kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
record-send-totalThe total number of records sent for a topic.kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"

Consumer monitoring

The following metrics are available on consumer instances. Metric/Attribute nameDescriptionMbean name
time-between-poll-avgThe average delay between invocations of poll().kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
time-between-poll-maxThe max delay between invocations of poll().kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
last-poll-seconds-agoThe number of seconds since the last poll() invocation.kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
poll-idle-ratio-avgThe average fraction of time the consumer’s poll() is idle as opposed to waiting for the user code to process records.kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
committed-time-ns-totalThe total time the Consumer spent in committed in nanoseconds.kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
commit-sync-time-ns-totalThe total time the Consumer spent committing offsets in nanoseconds (for AOS).kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)

Consumer Group Metrics

Metric/Attribute nameDescriptionMbean name
commit-latency-avgThe average time taken for a commit requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-latency-maxThe max time taken for a commit requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-rateThe number of commit calls per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-totalThe total number of commit callskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
assigned-partitionsThe number of partitions currently assigned to this consumerkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-response-time-maxThe max time taken to receive a response to a heartbeat requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-rateThe average number of heartbeats per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-totalThe total number of heartbeatskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-avgThe average time taken for a group rejoinkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-maxThe max time taken for a group rejoinkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-rateThe number of group joins per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-totalThe total number of group joinskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-avgThe average time taken for a group synckafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-maxThe max time taken for a group synckafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-rateThe number of group syncs per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-totalThe total number of group syncskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-avgThe average time taken for a group rebalancekafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-maxThe max time taken for a group rebalancekafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-latency-totalThe total time taken for group rebalances so farkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-totalThe total number of group rebalances participatedkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
rebalance-rate-per-hourThe number of group rebalance participated per hourkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-totalThe total number of failed group rebalanceskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
failed-rebalance-rate-per-hourThe number of failed group rebalance event per hourkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-rebalance-seconds-agoThe number of seconds since the last rebalance eventkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-heartbeat-seconds-agoThe number of seconds since the last controller heartbeatkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-avgThe average time taken by the on-partitions-revoked rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-maxThe max time taken by the on-partitions-revoked rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-avgThe average time taken by the on-partitions-assigned rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-maxThe max time taken by the on-partitions-assigned rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-avgThe average time taken by the on-partitions-lost rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-maxThe max time taken by the on-partitions-lost rebalance listener callbackkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)

Consumer Fetch Metrics

Metric/Attribute nameDescriptionMbean name
bytes-consumed-rateThe average number of bytes consumed per secondkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
bytes-consumed-totalThe total number of bytes consumedkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-latency-avgThe average time taken for a fetch request.kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-latency-maxThe max time taken for any fetch request.kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-rateThe number of fetch requests per second.kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-size-avgThe average number of bytes fetched per requestkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-size-maxThe maximum number of bytes fetched per requestkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-throttle-time-avgThe average throttle time in mskafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-throttle-time-maxThe maximum throttle time in mskafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
fetch-totalThe total number of fetch requests.kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
records-consumed-rateThe average number of records consumed per secondkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
records-consumed-totalThe total number of records consumedkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
records-lag-maxThe maximum lag in terms of number of records for any partition in this window. NOTE: This is based on current offset and not committed offsetkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
records-lead-minThe minimum lead in terms of number of records for any partition in this windowkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
records-per-request-avgThe average number of records in each requestkafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
bytes-consumed-rateThe average number of bytes consumed per second for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
bytes-consumed-totalThe total number of bytes consumed for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
fetch-size-avgThe average number of bytes fetched per request for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
fetch-size-maxThe maximum number of bytes fetched per request for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
records-consumed-rateThe average number of records consumed per second for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
records-consumed-totalThe total number of records consumed for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
records-per-request-avgThe average number of records in each request for a topickafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
preferred-read-replicaThe current read replica for the partition, or -1 if reading from leaderkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-lagThe latest lag of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-lag-avgThe average lag of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-lag-maxThe max lag of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-leadThe latest lead of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-lead-avgThe average lead of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
records-lead-minThe min lead of the partitionkafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

Connect Monitoring

A Connect worker process contains all the producer and consumer metrics as well as metrics specific to Connect. The worker process itself has a number of metrics, while each connector and task have additional metrics.

Metric/Attribute nameDescriptionMbean name
connector-countThe number of connectors run in this worker.kafka.connect:type=connect-worker-metrics
connector-startup-attempts-totalThe total number of connector startups that this worker has attempted.kafka.connect:type=connect-worker-metrics
connector-startup-failure-percentageThe average percentage of this worker's connectors starts that failed.kafka.connect:type=connect-worker-metrics
connector-startup-failure-totalThe total number of connector starts that failed.kafka.connect:type=connect-worker-metrics
connector-startup-success-percentageThe average percentage of this worker's connectors starts that succeeded.kafka.connect:type=connect-worker-metrics
connector-startup-success-totalThe total number of connector starts that succeeded.kafka.connect:type=connect-worker-metrics
task-countThe number of tasks run in this worker.kafka.connect:type=connect-worker-metrics
task-startup-attempts-totalThe total number of task startups that this worker has attempted.kafka.connect:type=connect-worker-metrics
task-startup-failure-percentageThe average percentage of this worker's tasks starts that failed.kafka.connect:type=connect-worker-metrics
task-startup-failure-totalThe total number of task starts that failed.kafka.connect:type=connect-worker-metrics
task-startup-success-percentageThe average percentage of this worker's tasks starts that succeeded.kafka.connect:type=connect-worker-metrics
task-startup-success-totalThe total number of task starts that succeeded.kafka.connect:type=connect-worker-metrics
connector-destroyed-task-countThe number of destroyed tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-failed-task-countThe number of failed tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-paused-task-countThe number of paused tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-restarting-task-countThe number of restarting tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-running-task-countThe number of running tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-total-task-countThe number of tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
connector-unassigned-task-countThe number of unassigned tasks of the connector on the worker.kafka.connect:type=connect-worker-metrics,connector="{connector}"
completed-rebalances-totalThe total number of rebalances completed by this worker.kafka.connect:type=connect-worker-rebalance-metrics
connect-protocolThe Connect protocol used by this clusterkafka.connect:type=connect-worker-rebalance-metrics
epochThe epoch or generation number of this worker.kafka.connect:type=connect-worker-rebalance-metrics
leader-nameThe name of the group leader.kafka.connect:type=connect-worker-rebalance-metrics
rebalance-avg-time-msThe average time in milliseconds spent by this worker to rebalance.kafka.connect:type=connect-worker-rebalance-metrics
rebalance-max-time-msThe maximum time in milliseconds spent by this worker to rebalance.kafka.connect:type=connect-worker-rebalance-metrics
rebalancingWhether this worker is currently rebalancing.kafka.connect:type=connect-worker-rebalance-metrics
time-since-last-rebalance-msThe time in milliseconds since this worker completed the most recent rebalance.kafka.connect:type=connect-worker-rebalance-metrics
connector-classThe name of the connector class.kafka.connect:type=connector-metrics,connector="{connector}"
connector-typeThe type of the connector. One of 'source' or 'sink'.kafka.connect:type=connector-metrics,connector="{connector}"
connector-versionThe version of the connector class, as reported by the connector.kafka.connect:type=connector-metrics,connector="{connector}"
statusThe status of the connector. One of 'unassigned', 'running', 'paused', 'stopped', 'failed', or 'restarting'.kafka.connect:type=connector-metrics,connector="{connector}"
batch-size-avgThe average number of records in the batches the task has processed so far.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
batch-size-maxThe number of records in the largest batch the task has processed so far.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
offset-commit-avg-time-msThe average time in milliseconds taken by this task to commit offsets.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
offset-commit-failure-percentageThe average percentage of this task's offset commit attempts that failed.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
offset-commit-max-time-msThe maximum time in milliseconds taken by this task to commit offsets.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
offset-commit-success-percentageThe average percentage of this task's offset commit attempts that succeeded.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
pause-ratioThe fraction of time this task has spent in the pause state.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
running-ratioThe fraction of time this task has spent in the running state.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
statusThe status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'restarting'.kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
offset-commit-completion-rateThe average per-second number of offset commit completions that were completed successfully.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
offset-commit-completion-totalThe total number of offset commit completions that were completed successfully.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
offset-commit-seq-noThe current sequence number for offset commits.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
offset-commit-skip-rateThe average per-second number of offset commit completions that were received too late and skipped/ignored.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
offset-commit-skip-totalThe total number of offset commit completions that were received too late and skipped/ignored.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
partition-countThe number of topic partitions assigned to this task belonging to the named sink connector in this worker.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
put-batch-avg-time-msThe average time taken by this task to put a batch of sinks records.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
put-batch-max-time-msThe maximum time taken by this task to put a batch of sinks records.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-active-countThe number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-active-count-avgThe average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-active-count-maxThe maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-lag-maxThe maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-read-rateThe average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-read-totalThe total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-send-rateThe average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
sink-record-send-totalThe total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted.kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
poll-batch-avg-time-msThe average time in milliseconds taken by this task to poll for a batch of source records.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
poll-batch-max-time-msThe maximum time in milliseconds taken by this task to poll for a batch of source records.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-active-countThe number of records that have been produced by this task but not yet completely written to Kafka.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-active-count-avgThe average number of records that have been produced by this task but not yet completely written to Kafka.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-active-count-maxThe maximum number of records that have been produced by this task but not yet completely written to Kafka.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-poll-rateThe average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-poll-totalThe total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-write-rateThe average per-second number of records written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
source-record-write-totalThe number of records output written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted. This is after transformations are applied, and excludes any records filtered out by the transformations.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
transaction-size-avgThe average number of records in the transactions the task has committed so far.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
transaction-size-maxThe number of records in the largest transaction the task has committed so far.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
transaction-size-minThe number of records in the smallest transaction the task has committed so far.kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
deadletterqueue-produce-failuresThe number of failed writes to the dead letter queue.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
deadletterqueue-produce-requestsThe number of attempted writes to the dead letter queue.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
last-error-timestampThe epoch timestamp when this task last encountered an error.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
total-errors-loggedThe number of errors that were logged.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
total-record-errorsThe number of record processing errors in this task.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
total-record-failuresThe number of record processing failures in this task.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
total-records-skippedThe number of records skipped due to errors.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
total-retriesThe number of operations retried.kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"

Streams Monitoring

A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to Streams. The metrics have three recording levels: info, debug, and trace.

Note that the metrics have a 4-layer hierarchy. At the top level there are client-level metrics for each started Kafka Streams client. Each client has stream threads, with their own metrics. Each stream thread has tasks, with their own metrics. Each task has a number of processor nodes, with their own metrics. Each task also has a number of state stores and record caches, all with their own metrics.

Use the following configuration option to specify which metrics you want collected:

metrics.recording.level="info"

Client Metrics

All the following metrics have a recording level of info: Metric/Attribute nameDescriptionMbean name
versionThe version of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-idThe version control commit ID of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
application-idThe application ID of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
topology-descriptionThe description of the topology executed in the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
stateThe state of the Kafka Streams client as a string.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
client-stateThe state of the Kafka Streams client as a number (ordinal() of the corresponding enum).kafka.streams:type=stream-metrics,client-id=([-.\w]+),process-id=([-.\w]+)
alive-stream-threadsThe current number of alive stream threads that are running or participating in rebalance.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
failed-stream-threadsThe number of failed stream threads since the start of the Kafka Streams client.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
recording-levelThe metric recording level as a number (0 = INFO, 1 = DEBUG, 2 = TRACE).kafka.streams:type=stream-metrics,client-id=([-.\w]+),process-id=([-.\w]+)

Thread Metrics

All the following metrics have a recording level of info: Metric/Attribute nameDescriptionMbean name
stateThe state of the thread as a string.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
thread-stateThe state of the thread as a number (ordinal() of the corresponding enum).kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+),process-id=([-.\w]+)
commit-latency-avgThe average execution time in ms, for committing, across all running tasks of this thread.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-latency-maxThe maximum execution time in ms, for committing, across all running tasks of this thread.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-avgThe average execution time in ms, for consumer polling.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-latency-maxThe maximum execution time in ms, for consumer polling.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-avgThe average execution time in ms, for processing.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-latency-maxThe maximum execution time in ms, for processing.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-avgThe average execution time in ms, for punctuating.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-latency-maxThe maximum execution time in ms, for punctuating.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-rateThe average number of commits per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
commit-totalThe total number of commit calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-rateThe average number of consumer poll calls per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
poll-totalThe total number of consumer poll calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-rateThe average number of processed records per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
process-totalThe total number of processed records.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-rateThe average number of punctuate calls per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
punctuate-totalThe total number of punctuate calls.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-rateThe average number of tasks created per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-created-totalThe total number of tasks created.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-rateThe average number of tasks closed per sec.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
task-closed-totalThe total number of tasks closed.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
blocked-time-ns-totalThe total time in ns the thread spent blocked on Kafka brokers.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
thread-start-timeThe system timestamp in ms that the thread was started.kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)

Task Metrics

All the following metrics have a recording level of debug, except for the dropped-records-* and active-process-ratio metrics which have a recording level of info: Metric/Attribute nameDescriptionMbean name
process-latency-avgThe average execution time in ns, for processing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-latency-maxThe maximum execution time in ns, for processing.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-rateThe average number of processed records per sec across all source processor nodes of this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
process-totalThe total number of processed records across all source processor nodes of this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-avgThe average observed lateness in ms of records (stream time - record timestamp).kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-maxThe max observed lateness in ms of records (stream time - record timestamp).kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-rateThe average number of enforced processings per sec.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
enforced-processing-totalThe total number enforced processings.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-rateThe average number of records dropped per sec within this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
dropped-records-totalThe total number of records dropped within this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
active-process-ratioThe fraction of time the stream thread spent on processing this task among all assigned active tasks.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
input-buffer-bytes-totalThe total number of bytes accumulated by this task,kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
cache-size-bytes-totalThe cache size in bytes accumulated by this task.kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)

Processor Node Metrics

The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, emit-final-* metrics are only available for windowed aggregations nodes, and the record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). All the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: Metric/Attribute nameDescriptionMbean name
bytes-consumed-totalThe total number of bytes consumed by a source processor node.kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
bytes-produced-totalThe total number of bytes produced by a sink processor node.kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
process-rateThe average number of records processed by a source processor node per sec.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-totalThe total number of records processed by a source processor node per sec.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-rateThe rate of records emitted per sec that have been emitted downstream from suppression operation nodes.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-totalThe total number of records that have been emitted downstream from suppression operation nodes.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
emit-final-latency-maxThe max latency in ms to emit final records when a record could be emitted.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
emit-final-latency-avgThe avg latency in ms to emit final records when a record could be emitted.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
emit-final-records-rateThe rate of records emitted per sec when records could be emitted.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
emit-final-records-totalThe total number of records emitted.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-avgThe average end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-maxThe maximum end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
record-e2e-latency-minThe minimum end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
records-consumed-totalThe total number of records consumed by a source processor node.kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)
records-produced-totalThe total number of records produced by a sink processor node.kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+)

State Store Metrics

All the following metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level trace and num-open-iterators which has recording level info. Note that the store-scope value is specified in StoreSupplier#metricsScope() for user’s customized state stores; for built-in state stores, currently we have:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression (for suppression buffers)
  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)
Metrics suppression-buffer-size-avg, suppression-buffer-size-max, suppression-buffer-count-avg, and suppression-buffer-count-max are only available for suppression buffers. All other metrics are not available for suppression buffers. Metric/Attribute nameDescriptionMbean name
put-latency-avgThe average put execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-latency-maxThe maximum put execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-avgThe average put-if-absent execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-maxThe maximum put-if-absent execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-avgThe average get execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-maxThe maximum get execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-avgThe average delete execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-maxThe maximum delete execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-avgThe average put-all execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-maxThe maximum put-all execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-avgThe average execution time in ns, from iterator create to close time.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-max, from iterator create to close time.The maximum all operation execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-avg, from iterator create to close time.The average range execution time in ns, from iterator create to close time.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-max, from iterator create to close time.The maximum range execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
prefix-scan-latency-avgThe average prefix-scan execution time in ns, from iterator create to close time.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
prefix-scan-latency-maxThe maximum prefix-scan execution time in ns, from iterator create to close time.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-avgThe average flush execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-maxThe maximum flush execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-avgThe average restore execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-maxThe maximum restore execution time in ns.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-rateThe average put rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-rateThe average put-if-absent rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-rateThe average get rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-rateThe average delete rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-rateThe average put-all rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-rateThe average all operation rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-rateThe average range rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
prefix-scan-rateThe average prefix-scan rate per sec for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-rateThe average flush rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-rateThe average restore rate for this store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
suppression-buffer-size-avgThe average total size in bytes of the buffered data over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-size-maxThe maximum total size, in bytes, of the buffered data over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-avgThe average number of records buffered over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
suppression-buffer-count-maxThe maximum number of records buffered over the sampling window.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
record-e2e-latency-avgThe average end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
record-e2e-latency-maxThe maximum end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
record-e2e-latency-minThe minimum end-to-end latency in ms of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-open-iteratorsThe current number of iterators on the store that have been created, but not yet closed.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
iterator-duration-avgThe average time in ns spent between creating an iterator and closing it.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
iterator-duration-maxThe maximum time in ns spent between creating an iterator and closing it.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
oldest-iterator-open-since-msThe system timestamp in ms the oldest still open iterator was created.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

RocksDB Metrics

RocksDB metrics are grouped into statistics-based metrics and properties-based metrics. The former are recorded from statistics that a RocksDB state store collects whereas the latter are recorded from properties that RocksDB exposes. Statistics collected by RocksDB provide cumulative measurements over time, e.g. bytes written to the state store. Properties exposed by RocksDB provide current measurements, e.g., the amount of memory currently used. Note that the store-scope for built-in RocksDB state stores are currently the following:

  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)
RocksDB Statistics-based Metrics: All the following statistics-based metrics have a recording level of debug because collecting statistics in RocksDB may have an impact on performance. Statistics-based metrics are collected every minute from the RocksDB state stores. If a state store consists of multiple RocksDB instances, as is the case for WindowStores and SessionStores, each metric reports an aggregation over the RocksDB instances of the state store. Metric/Attribute nameDescriptionMbean name
bytes-written-rateThe average number of bytes written per sec to the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-totalThe total number of bytes written to the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-rateThe average number of bytes read per second from the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-totalThe total number of bytes read from the RocksDB state store.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-rateThe average number of bytes flushed per sec from the memtable to disk.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-totalThe total number of bytes flushed from the memtable to disk.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-hit-ratioThe ratio of memtable hits relative to all lookups to the memtable.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-avgThe average duration in ms of memtable flushes to disc.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-minThe minimum duration of memtable flushes to disc in ms.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-maxThe maximum duration in ms of memtable flushes to disc.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-data-hit-ratioThe ratio of block cache hits for data blocks relative to all lookups for data blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-index-hit-ratioThe ratio of block cache hits for index blocks relative to all lookups for index blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-filter-hit-ratioThe ratio of block cache hits for filter blocks relative to all lookups for filter blocks to the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-avgThe average duration in ms of write stalls.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-totalThe total duration in ms of write stalls.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-compaction-rateThe average number of bytes read per sec during compaction.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-compaction-rateThe average number of bytes written per sec during compaction.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-avgThe average duration in ms of disc compactions.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-minThe minimum duration of disc compactions in ms.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-maxThe maximum duration in ms of disc compactions.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-open-filesThis metric will return constant -1 because the RocksDB’s counter NO_FILE_CLOSES has been removed in RocksDB 9.7.3kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-file-errors-totalThe total number of file errors occurred.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
RocksDB Properties-based Metrics: All the following properties-based metrics have a recording level of info and are recorded when the metrics are accessed. If a state store consists of multiple RocksDB instances, as is the case for WindowStores and SessionStores, each metric reports the sum over all the RocksDB instances of the state store, except for the block cache metrics block-cache-*. The block cache metrics report the sum over all RocksDB instances if each instance uses its own block cache, and they report the recorded value from only one instance if a single block cache is shared among all instances. Metric/Attribute nameDescriptionMbean name
num-immutable-mem-tableThe number of immutable memtables that have not yet been flushed.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
cur-size-active-mem-tableThe approximate size in bytes of the active memtable.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
cur-size-all-mem-tablesThe approximate size in bytes of active and unflushed immutable memtables.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
size-all-mem-tablesThe approximate size in bytes of active, unflushed immutable, and pinned immutable memtables.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-entries-active-mem-tableThe number of entries in the active memtable.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-entries-imm-mem-tablesThe number of entries in the unflushed immutable memtables.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-deletes-active-mem-tableThe number of delete entries in the active memtable.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-deletes-imm-mem-tablesThe number of delete entries in the unflushed immutable memtables.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
mem-table-flush-pendingThis metric reports 1 if a memtable flush is pending, otherwise it reports 0.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-running-flushesThe number of currently running flushes.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-pendingThis metric reports 1 if at least one compaction is pending, otherwise it reports 0.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-running-compactionsThe number of currently running compactions.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-pending-compaction-bytesThe estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size (only valid for level compaction).kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
total-sst-files-sizeThe total size in bytes of all SST files.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
live-sst-files-sizeThe total size in bytes of all SST files that belong to the latest LSM tree.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
num-live-versionsNumber of live versions of the LSM tree.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-capacityThe capacity in bytes of the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-usageThe memory size in bytes of the entries residing in block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-pinned-usageThe memory size in bytes for the entries being pinned in the block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-num-keysThe estimated number of keys in the active and unflushed immutable memtables and storage.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
estimate-table-readers-memThe estimated memory in bytes used for reading SST tables, excluding memory used in block cache.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
background-errorsThe total number of background errors.kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

Record Cache Metrics

All the following metrics have a recording level of debug: Metric/Attribute nameDescriptionMbean name
hit-ratio-avgThe average cache hit ratio defined as the ratio of cache read hits over the total cache read requests.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-minThe minimum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hit-ratio-maxThe maximum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)

Others

We recommend monitoring GC time and other stats and various server stats such as CPU utilization, I/O service time, etc. On the client side, we recommend monitoring the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.


Last modified March 28, 2025: Updates from 4.0 (4222b044)