You are viewing documentation for an older version (2.3) of Kafka. For up-to-date documentation, see the latest version.

Monitoring

Monitoring

Monitoring

Kafka uses Yammer Metrics for metrics reporting in the server. The Java clients use Kafka Metrics, a built-in metrics registry that minimizes transitive dependencies pulled into client applications. Both expose metrics via JMX and can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.

All Kafka rate metrics have a corresponding cumulative count metric with suffix -total. For example, records-consumed-rate has a corresponding metric named records-consumed-total.

The easiest way to see the available metrics is to fire up jconsole and point it at a running kafka client or server; this will allow browsing all metrics with JMX.

Security Considerations for Remote Monitoring using JMX

Apache Kafka disables remote JMX by default. You can enable remote monitoring using JMX by setting the environment variable JMX_PORT for processes started using the CLI or standard Java system properties to enable remote JMX programmatically. You must enable security when enabling remote JMX in production scenarios to ensure that unauthorized users cannot monitor or control your broker or application as well as the platform on which these are running. Note that authentication is disabled for JMX by default in Kafka and security configs must be overridden for production deployments by setting the environment variable KAFKA_JMX_OPTS for processes started using the CLI or by setting appropriate Java system properties. See Monitoring and Management Using JMX Technology for details on securing JMX.

We do graphing and alerting on the following metrics: DescriptionMbean nameNormal value
Message in ratekafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
Byte in rate from clientskafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
Byte in rate from other brokerskafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
Request ratekafka.network:type=RequestMetrics,name=RequestsPerSec,request={ProduceFetchConsumer
Error ratekafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)Number of errors in responses counted per-request-type, per-error-code. If a response contains multiple errors, all are counted. error=NONE indicates successful responses.
Request size in byteskafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+)Size of requests for each request type.
Temporary memory size in byteskafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={ProduceFetch}
Message conversion timekafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={ProduceFetch}
Message conversion ratekafka.server:type=BrokerTopicMetrics,name={ProduceFetch}MessageConversionsPerSec,topic=([-.\w]+)
Byte out rate to clientskafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
Byte out rate to other brokerskafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
Log flush rate and timekafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs

of under replicated partitions (|ISR| < |all replicas|) | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 0

of under minIsr partitions (|ISR| < min.insync.replicas) | kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount | 0

of at minIsr partitions (|ISR| = min.insync.replicas) | kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount | 0

of offline log directories | kafka.log:type=LogManager,name=OfflineLogDirectoryCount | 0

Is controller active on broker | kafka.controller:type=KafkaController,name=ActiveControllerCount | only one broker in the cluster should have 1
Leader election rate | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | non-zero when there are broker failures
Unclean leader election rate | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | 0
Partition counts | kafka.server:type=ReplicaManager,name=PartitionCount | mostly even across brokers
Leader replica counts | kafka.server:type=ReplicaManager,name=LeaderCount | mostly even across brokers
ISR shrink rate | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec | If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | See above
Max lag in messages btw follower and leader replicas | kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica | lag should be proportional to the maximum batch size of a produce request.
Lag in messages per follower replica | kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) | lag should be proportional to the maximum batch size of a produce request.
Requests waiting in the producer purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce | non-zero if ack=-1 is used
Requests waiting in the fetch purgatory | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch | size depends on fetch.wait.max.ms in the consumer
Request total time | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} | broken into queue, local, remote and response send time
Time the request waits in the request queue | kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time the request is processed at the leader | kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time the request waits for the follower | kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower} | non-zero for produce requests when ack=-1
Time the request waits in the response queue | kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Time to send the response | kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower} |
Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. | kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max |
The average fraction of time the network processors are idle | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent | between 0 and 1, ideally > 0.3
The number of connections disconnected on a processor due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication | kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count | ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this (listener, processor) combination
The total number of connections disconnected, across all processors, due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication | kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount | ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this broker
The average fraction of time the request handler threads are idle | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent | between 0 and 1, ideally > 0.3
Bandwidth quota metrics per (user, client-id), user or client-id | kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+) | Two attributes. throttle-time indicates the amount of time in ms the client was throttled. Ideally = 0. byte-rate indicates the data produce/consume rate of the client in bytes/sec. For (user, client-id) quotas, both user and client-id are specified. If per-client-id quota is applied to the client, user is not specified. If per-user quota is applied, client-id is not specified.
Request quota metrics per (user, client-id), user or client-id | kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) | Two attributes. throttle-time indicates the amount of time in ms the client was throttled. Ideally = 0. request-time indicates the percentage of time spent in broker network and I/O threads to process requests from client group. For (user, client-id) quotas, both user and client-id are specified. If per-client-id quota is applied to the client, user is not specified. If per-user quota is applied, client-id is not specified.
Requests exempt from throttling | kafka.server:type=Request | exempt-throttle-time indicates the percentage of time spent in broker network and I/O threads to process requests that are exempt from throttling.
ZooKeeper client request latency | kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs | Latency in millseconds for ZooKeeper requests from broker.
ZooKeeper connection status | kafka.server:type=SessionExpireListener,name=SessionState | Connection status of broker’s ZooKeeper session which may be one of Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired.

Common monitoring metrics for producer/consumer/connect/streams

The following metrics are available on producer/consumer/connector/streams instances. For specific metrics, please see following sections. Metric/Attribute nameDescriptionMbean name
connection-close-rateConnections closed per second in the window.kafka.[producer
connection-close-totalTotal connections closed in the window.kafka.[producer
connection-creation-rateNew connections established per second in the window.kafka.[producer
connection-creation-totalTotal new connections established in the window.kafka.[producer
network-io-rateThe average number of network operations (reads or writes) on all connections per second.kafka.[producer
network-io-totalThe total number of network operations (reads or writes) on all connections.kafka.[producer
outgoing-byte-rateThe average number of outgoing bytes sent per second to all servers.kafka.[producer
outgoing-byte-totalThe total number of outgoing bytes sent to all servers.kafka.[producer
request-rateThe average number of requests sent per second.kafka.[producer
request-totalThe total number of requests sent.kafka.[producer
request-size-avgThe average size of all requests in the window.kafka.[producer
request-size-maxThe maximum size of any request sent in the window.kafka.[producer
incoming-byte-rateBytes/second read off all sockets.kafka.[producer
incoming-byte-totalTotal bytes read off all sockets.kafka.[producer
response-rateResponses received per second.kafka.[producer
response-totalTotal responses received.kafka.[producer
select-rateNumber of times the I/O layer checked for new I/O to perform per second.kafka.[producer
select-totalTotal number of times the I/O layer checked for new I/O to perform.kafka.[producer
io-wait-time-ns-avgThe average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.kafka.[producer
io-wait-ratioThe fraction of time the I/O thread spent waiting.kafka.[producer
io-time-ns-avgThe average length of time for I/O per select call in nanoseconds.kafka.[producer
io-ratioThe fraction of time the I/O thread spent doing I/O.kafka.[producer
connection-countThe current number of active connections.kafka.[producer
successful-authentication-rateConnections per second that were successfully authenticated using SASL or SSL.kafka.[producer
successful-authentication-totalTotal connections that were successfully authenticated using SASL or SSL.kafka.[producer
failed-authentication-rateConnections per second that failed authentication.kafka.[producer
failed-authentication-totalTotal connections that failed authentication.kafka.[producer
successful-reauthentication-rateConnections per second that were successfully re-authenticated using SASL.kafka.[producer
successful-reauthentication-totalTotal connections that were successfully re-authenticated using SASL.kafka.[producer
reauthentication-latency-maxThe maximum latency in ms observed due to re-authentication.kafka.[producer
reauthentication-latency-avgThe average latency in ms observed due to re-authentication.kafka.[producer
failed-reauthentication-rateConnections per second that failed re-authentication.kafka.[producer
failed-reauthentication-totalTotal connections that failed re-authentication.kafka.[producer
successful-authentication-no-reauth-totalTotal connections that were successfully authenticated by older, pre-2.2.0 SASL clients that do not support re-authentication. May only be non-zerokafka.[producer

Common Per-broker metrics for producer/consumer/connect/streams

The following metrics are available on producer/consumer/connector/streams instances. For specific metrics, please see following sections. Metric/Attribute nameDescriptionMbean name
outgoing-byte-rateThe average number of outgoing bytes sent per second for a node.kafka.[producer
outgoing-byte-totalThe total number of outgoing bytes sent for a node.kafka.[producer
request-rateThe average number of requests sent per second for a node.kafka.[producer
request-totalThe total number of requests sent for a node.kafka.[producer
request-size-avgThe average size of all requests in the window for a node.kafka.[producer
request-size-maxThe maximum size of any request sent in the window for a node.kafka.[producer
incoming-byte-rateThe average number of bytes received per second for a node.kafka.[producer
incoming-byte-totalThe total number of bytes received for a node.kafka.[producer
request-latency-avgThe average request latency in ms for a node.kafka.[producer
request-latency-maxThe maximum request latency in ms for a node.kafka.[producer
response-rateResponses received per second for a node.kafka.[producer
response-totalTotal responses received for a node.kafka.[producer

Producer monitoring

The following metrics are available on producer instances. Metric/Attribute nameDescriptionMbean name
waiting-threadsThe number of user threads blocked waiting for buffer memory to enqueue their records.kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-total-bytesThe maximum amount of buffer memory the client can use (whether or not it is currently used).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
buffer-available-bytesThe total amount of buffer memory that is not being used (either unallocated or in the free list).kafka.producer:type=producer-metrics,client-id=([-.\w]+)
bufferpool-wait-timeThe fraction of time an appender waits for space allocation.kafka.producer:type=producer-metrics,client-id=([-.\w]+)

Producer Sender Metrics

kafka.producer:type=producer-metrics,client-id="{client-id}"
Attribute nameDescription
batch-size-avgThe average number of bytes sent per partition per-request.
batch-size-maxThe max number of bytes sent per partition per-request.
batch-split-rateThe average number of batch splits per second
batch-split-totalThe total number of batch splits
compression-rate-avgThe average compression rate of record batches.
metadata-ageThe age in seconds of the current producer metadata being used.
produce-throttle-time-avgThe average time in ms a request was throttled by a broker
produce-throttle-time-maxThe maximum time in ms a request was throttled by a broker
record-error-rateThe average per-second number of record sends that resulted in errors
record-error-totalThe total number of record sends that resulted in errors
record-queue-time-avgThe average time in ms record batches spent in the send buffer.
record-queue-time-maxThe maximum time in ms record batches spent in the send buffer.
record-retry-rateThe average per-second number of retried record sends
record-retry-totalThe total number of retried record sends
record-send-rateThe average number of records sent per second.
record-send-totalThe total number of records sent.
record-size-avgThe average record size
record-size-maxThe maximum record size
records-per-request-avgThe average number of records per request.
request-latency-avgThe average request latency in ms
request-latency-maxThe maximum request latency in ms
requests-in-flightThe current number of in-flight requests awaiting a response.
kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
Attribute nameDescription
byte-rateThe average number of bytes sent per second for a topic.
byte-totalThe total number of bytes sent for a topic.
compression-rateThe average compression rate of record batches for a topic.
record-error-rateThe average per-second number of record sends that resulted in errors for a topic
record-error-totalThe total number of record sends that resulted in errors for a topic
record-retry-rateThe average per-second number of retried record sends for a topic
record-retry-totalThe total number of retried record sends for a topic
record-send-rateThe average number of records sent per second for a topic.
record-send-totalThe total number of records sent for a topic.

consumer monitoring

The following metrics are available on consumer instances.

Consumer Group Metrics

Metric/Attribute nameDescriptionMbean name
commit-latency-avgThe average time taken for a commit requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-latency-maxThe max time taken for a commit requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-rateThe number of commit calls per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
commit-totalThe total number of commit callskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
assigned-partitionsThe number of partitions currently assigned to this consumerkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-response-time-maxThe max time taken to receive a response to a heartbeat requestkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-rateThe average number of heartbeats per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
heartbeat-totalThe total number of heartbeatskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-avgThe average time taken for a group rejoinkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-time-maxThe max time taken for a group rejoinkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-rateThe number of group joins per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
join-totalThe total number of group joinskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-avgThe average time taken for a group synckafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-time-maxThe max time taken for a group synckafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-rateThe number of group syncs per secondkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
sync-totalThe total number of group syncskafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
last-heartbeat-seconds-agoThe number of seconds since the last controller heartbeatkafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)

Consumer Fetch Metrics

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
Attribute nameDescription
bytes-consumed-rateThe average number of bytes consumed per second
bytes-consumed-totalThe total number of bytes consumed
fetch-latency-avgThe average time taken for a fetch request.
fetch-latency-maxThe max time taken for any fetch request.
fetch-rateThe number of fetch requests per second.
fetch-size-avgThe average number of bytes fetched per request
fetch-size-maxThe maximum number of bytes fetched per request
fetch-throttle-time-avgThe average throttle time in ms
fetch-throttle-time-maxThe maximum throttle time in ms
fetch-totalThe total number of fetch requests.
records-consumed-rateThe average number of records consumed per second
records-consumed-totalThe total number of records consumed
records-lag-maxThe maximum lag in terms of number of records for any partition in this window
records-lead-minThe minimum lead in terms of number of records for any partition in this window
records-per-request-avgThe average number of records in each request
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
Attribute nameDescription
bytes-consumed-rateThe average number of bytes consumed per second for a topic
bytes-consumed-totalThe total number of bytes consumed for a topic
fetch-size-avgThe average number of bytes fetched per request for a topic
fetch-size-maxThe maximum number of bytes fetched per request for a topic
records-consumed-rateThe average number of records consumed per second for a topic
records-consumed-totalThe total number of records consumed for a topic
records-per-request-avgThe average number of records in each request for a topic
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
Attribute nameDescription
records-lagThe latest lag of the partition
records-lag-avgThe average lag of the partition
records-lag-maxThe max lag of the partition
records-leadThe latest lead of the partition
records-lead-avgThe average lead of the partition
records-lead-minThe min lead of the partition

Connect Monitoring

A Connect worker process contains all the producer and consumer metrics as well as metrics specific to Connect. The worker process itself has a number of metrics, while each connector and task have additional metrics.

kafka.connect:type=connect-worker-metrics
Attribute nameDescription
connector-countThe number of connectors run in this worker.
connector-startup-attempts-totalThe total number of connector startups that this worker has attempted.
connector-startup-failure-percentageThe average percentage of this worker's connectors starts that failed.
connector-startup-failure-totalThe total number of connector starts that failed.
connector-startup-success-percentageThe average percentage of this worker's connectors starts that succeeded.
connector-startup-success-totalThe total number of connector starts that succeeded.
task-countThe number of tasks run in this worker.
task-startup-attempts-totalThe total number of task startups that this worker has attempted.
task-startup-failure-percentageThe average percentage of this worker's tasks starts that failed.
task-startup-failure-totalThe total number of task starts that failed.
task-startup-success-percentageThe average percentage of this worker's tasks starts that succeeded.
task-startup-success-totalThe total number of task starts that succeeded.
kafka.connect:type=connect-worker-rebalance-metrics
Attribute nameDescription
completed-rebalances-totalThe total number of rebalances completed by this worker.
epochThe epoch or generation number of this worker.
leader-nameThe name of the group leader.
rebalance-avg-time-msThe average time in milliseconds spent by this worker to rebalance.
rebalance-max-time-msThe maximum time in milliseconds spent by this worker to rebalance.
rebalancingWhether this worker is currently rebalancing.
time-since-last-rebalance-msThe time in milliseconds since this worker completed the most recent rebalance.
kafka.connect:type=connector-metrics,connector="{connector}"
Attribute nameDescription
connector-classThe name of the connector class.
connector-typeThe type of the connector. One of 'source' or 'sink'.
connector-versionThe version of the connector class, as reported by the connector.
statusThe status of the connector. One of 'unassigned', 'running', 'paused', 'failed', or 'destroyed'.
kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
Attribute nameDescription
batch-size-avgThe average size of the batches processed by the connector.
batch-size-maxThe maximum size of the batches processed by the connector.
offset-commit-avg-time-msThe average time in milliseconds taken by this task to commit offsets.
offset-commit-failure-percentageThe average percentage of this task's offset commit attempts that failed.
offset-commit-max-time-msThe maximum time in milliseconds taken by this task to commit offsets.
offset-commit-success-percentageThe average percentage of this task's offset commit attempts that succeeded.
pause-ratioThe fraction of time this task has spent in the pause state.
running-ratioThe fraction of time this task has spent in the running state.
statusThe status of the connector task. One of 'unassigned', 'running', 'paused', 'failed', or 'destroyed'.
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
Attribute nameDescription
offset-commit-completion-rateThe average per-second number of offset commit completions that were completed successfully.
offset-commit-completion-totalThe total number of offset commit completions that were completed successfully.
offset-commit-seq-noThe current sequence number for offset commits.
offset-commit-skip-rateThe average per-second number of offset commit completions that were received too late and skipped/ignored.
offset-commit-skip-totalThe total number of offset commit completions that were received too late and skipped/ignored.
partition-countThe number of topic partitions assigned to this task belonging to the named sink connector in this worker.
put-batch-avg-time-msThe average time taken by this task to put a batch of sinks records.
put-batch-max-time-msThe maximum time taken by this task to put a batch of sinks records.
sink-record-active-countThe number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-active-count-avgThe average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-active-count-maxThe maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.
sink-record-lag-maxThe maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions.
sink-record-read-rateThe average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied.
sink-record-read-totalThe total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted.
sink-record-send-rateThe average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.
sink-record-send-totalThe total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted.
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
Attribute nameDescription
poll-batch-avg-time-msThe average time in milliseconds taken by this task to poll for a batch of source records.
poll-batch-max-time-msThe maximum time in milliseconds taken by this task to poll for a batch of source records.
source-record-active-countThe number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-avgThe average number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-maxThe maximum number of records that have been produced by this task but not yet completely written to Kafka.
source-record-poll-rateThe average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.
source-record-poll-totalThe total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.
source-record-write-rateThe average per-second number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.
source-record-write-totalThe number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted.
kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
Attribute nameDescription
deadletterqueue-produce-failuresThe number of failed writes to the dead letter queue.
deadletterqueue-produce-requestsThe number of attempted writes to the dead letter queue.
last-error-timestampThe epoch timestamp when this task last encountered an error.
total-errors-loggedThe number of errors that were logged.
total-record-errorsThe number of record processing errors in this task.
total-record-failuresThe number of record processing failures in this task.
total-records-skippedThe number of records skipped due to errors.
total-retriesThe number of operations retried.

Streams Monitoring

A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to streams. By default Kafka Streams has metrics with two recording levels: debug and info. The debug level records all metrics, while the info level records only the thread-level metrics.

Note that the metrics have a 3-layer hierarchy. At the top level there are per-thread metrics. Each thread has tasks, with their own metrics. Each task has a number of processor nodes, with their own metrics. Each task also has a number of state stores and record caches, all with their own metrics.

Use the following configuration option to specify which metrics you want collected:

metrics.recording.level="info"

Thread Metrics

All the following metrics have a recording level of info: Metric/Attribute nameDescriptionMbean name
commit-latency-avgThe average execution time in ms for committing, across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-latency-maxThe maximum execution time in ms for committing across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
poll-latency-avgThe average execution time in ms for polling, across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
poll-latency-maxThe maximum execution time in ms for polling across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
process-latency-avgThe average execution time in ms for processing, across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
process-latency-maxThe maximum execution time in ms for processing across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
punctuate-latency-avgThe average execution time in ms for punctuating, across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
punctuate-latency-maxThe maximum execution time in ms for punctuating across all running tasks of this thread.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-rateThe average number of commits per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
commit-totalThe total number of commit calls across all tasks.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
poll-rateThe average number of polls per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
poll-totalThe total number of poll calls across all tasks.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
process-rateThe average number of process calls per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
process-totalThe total number of process calls across all tasks.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
punctuate-rateThe average number of punctuates per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
punctuate-totalThe total number of punctuate calls across all tasks.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
task-created-rateThe average number of newly created tasks per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
task-created-totalThe total number of tasks created.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
task-closed-rateThe average number of tasks closed per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
task-closed-totalThe total number of tasks closed.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-rateThe average number of skipped records per second.kafka.streams:type=stream-metrics,client-id=([-.\w]+)
skipped-records-totalThe total number of skipped records.kafka.streams:type=stream-metrics,client-id=([-.\w]+)

Task Metrics

All the following metrics have a recording level of debug: Metric/Attribute nameDescriptionMbean name
commit-latency-avgThe average commit time in ns for this task.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
commit-latency-maxThe maximum commit time in ns for this task.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
commit-rateThe average number of commit calls per second.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
commit-totalThe total number of commit calls.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-avgThe average observed lateness of records.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
record-lateness-maxThe max observed lateness of records.kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)

Processor Node Metrics

All the following metrics have a recording level of debug: Metric/Attribute nameDescriptionMbean name
process-latency-avgThe average process execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-latency-maxThe maximum process execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
punctuate-latency-avgThe average punctuate execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
punctuate-latency-maxThe maximum punctuate execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
create-latency-avgThe average create execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
create-latency-maxThe maximum create execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
destroy-latency-avgThe average destroy execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
destroy-latency-maxThe maximum destroy execution time in ns.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-rateThe average number of process operations per second.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
process-totalThe total number of process operations called.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
punctuate-rateThe average number of punctuate operations per second.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
punctuate-totalThe total number of punctuate operations called.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
create-rateThe average number of create operations per second.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
create-totalThe total number of create operations called.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
destroy-rateThe average number of destroy operations per second.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
destroy-totalThe total number of destroy operations called.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
forward-rateThe average rate of records being forwarded downstream, from source nodes only, per second.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
forward-totalThe total number of of records being forwarded downstream, from source nodes only.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-rateThe rate at which records that have been emitted downstream from suppression operation nodes. Compare with the process-rate metric to determine how many updates are being suppressed.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
suppression-emit-totalThe total number of records that have been emitted downstream from suppression operation nodes. Compare with the process-total metric to determine how many updates are being suppressed.kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)

State Store Metrics

All the following metrics have a recording level of debug. Note that the store-scope value is specified in StoreSupplier#metricsScope() for user’s customized state stores; for built-in state stores, currently we have:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • rocksdb-state (for RocksDB backed key-value store)
  • rocksdb-window-state (for RocksDB backed window store)
  • rocksdb-session-state (for RocksDB backed session store)
Metric/Attribute nameDescriptionMbean name
put-latency-avgThe average put execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-latency-maxThe maximum put execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-avgThe average put-if-absent execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-latency-maxThe maximum put-if-absent execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-avgThe average get execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-latency-maxThe maximum get execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-avgThe average delete execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-latency-maxThe maximum delete execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-avgThe average put-all execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-latency-maxThe maximum put-all execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-avgThe average all operation execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-latency-maxThe maximum all operation execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-avgThe average range execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-latency-maxThe maximum range execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-avgThe average flush execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-latency-maxThe maximum flush execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-avgThe average restore execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-latency-maxThe maximum restore execution time in ns.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-rateThe average put rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-totalThe total number of put calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-rateThe average put-if-absent rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-if-absent-totalThe total number of put-if-absent calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-rateThe average get rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
get-totalThe total number of get calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-rateThe average delete rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
delete-totalThe total number of delete calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-rateThe average put-all rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
put-all-totalThe total number of put-all calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-rateThe average all operation rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
all-totalThe total number of all operation calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-rateThe average range rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
range-totalThe total number of range calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-rateThe average flush rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
flush-totalThe total number of flush calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-rateThe average restore rate for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
restore-totalThe total number of restore calls for this store.kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

Record Cache Metrics

All the following metrics have a recording level of debug: Metric/Attribute nameDescriptionMbean name
hitRatio-avgThe average cache hit ratio defined as the ratio of cache read hits over the total cache read requests.kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hitRatio-minThe mininum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
hitRatio-maxThe maximum cache hit ratio.kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)

Suppression Buffer Metrics

All the following metrics have a recording level of debug: suppression-buffer-size-currentThe current total size, in bytes, of the buffered data.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
suppression-buffer-size-avgThe average total size, in bytes, of the buffered data over the sampling window.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
suppression-buffer-size-maxThe maximum total size, in bytes, of the buffered data over the sampling window.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
suppression-buffer-count-currentThe current number of records buffered.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
suppression-buffer-size-avgThe average number of records buffered over the sampling window.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)
suppression-buffer-size-maxThe maximum number of records buffered over the sampling window.kafka.streams:type=stream-buffer-metrics,client-id=([-.\w]+),task-id=([-.\w]+),buffer-id=([-.\w]+)

Others

We recommend monitoring GC time and other stats and various server stats such as CPU utilization, I/O service time, etc. On the client side, we recommend monitoring the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.