1 - Security Overview

Security Overview

Security Overview

In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:

  1. Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms:
    • SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0
    • SASL/PLAIN - starting at version 0.10.0.0
    • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0
    • SASL/OAUTHBEARER - starting at version 2.0
  2. Authentication of connections from brokers to ZooKeeper
  3. Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
  4. Authorization of read / write operations by clients
  5. Authorization is pluggable and integration with external authorization services is supported

It’s worth noting that security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients. The guides below explain how to configure and use the security features in both clients and brokers.

2 - Encryption and Authentication using SSL

Encryption and Authentication using SSL

Encryption and Authentication using SSL

Apache Kafka allows clients to connect over SSL. By default, SSL is disabled but can be turned on as needed.

  1. Generate SSL key and certificate for each Kafka broker

The first step of deploying one or more brokers with the SSL support is to generate the key and the certificate for each machine in the cluster. You can use Java’s keytool utility to accomplish this task. We will generate the key into a temporary keystore initially so that we can export and sign it later with CA.

                keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA

You need to specify two parameters in the above command: 1. keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely. 2. validity: the valid time of the certificate in days.

Configuring Host Name Verification

From Kafka version 2.0.0 onwards, host name verification of servers is enabled by default for client connections as well as inter-broker connections to prevent man-in-the-middle attacks. Server host name verification may be disabled by setting ssl.endpoint.identification.algorithm to an empty string. For example,

    	ssl.endpoint.identification.algorithm=

For dynamically configured broker listeners, hostname verification may be disabled using kafka-configs.sh. For example,

            bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

For older versions of Kafka, ssl.endpoint.identification.algorithm is not defined by default, so host name verification is not performed. The property should be set to HTTPS to enable host name verification.

    	ssl.endpoint.identification.algorithm=HTTPS 

Host name verification must be enabled to prevent man-in-the-middle attacks if server endpoints are not validated externally.

Configuring Host Name In Certificates

If host name verification is enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields: 1. Common Name (CN) 2. Subject Alternative Name (SAN)
Both fields are valid, RFC-2818 recommends the use of SAN however. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field append the following argument -ext SAN=DNS:{FQDN} to the keytool command:

            keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

The following command can be run afterwards to verify the contents of the generated certificate:

            keytool -list -v -keystore server.keystore.jks
  1. Creating your own CA

After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.

Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.

                openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
The next step is to add the generated CA to the clients’ truststore so that the clients can trust this CA:

                keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

Note: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be “requested” or “required” on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients’ keys were signed by.

                keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

In contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines. 3. #### Signing the certificate

The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:

                keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file

Then sign it with the CA:

                openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:

                keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
            keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

The definitions of the parameters are the following: 1. keystore: the location of the keystore 2. ca-cert: the certificate of the CA 3. ca-key: the private key of the CA 4. ca-password: the passphrase of the CA 5. cert-file: the exported, unsigned certificate of the server 6. cert-signed: the signed certificate of the server Here is an example of a bash script with all above steps. Note that one of the commands assumes a password of test1234, so either use that password or edit the command before running it.

                #!/bin/bash
            #Step 1
            keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
            #Step 2
            openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
            keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
            keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
            #Step 3
            keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
            openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
            keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
            keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
  1. Configuring Kafka Brokers

Kafka Brokers support listening for connections on multiple ports. We need to configure the following property in server.properties, which must have one or more comma-separated values:

    listeners

If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.

                listeners=PLAINTEXT://host.name:port,SSL://host.name:port

Following SSL configs are needed on the broker side

                ssl.keystore.location=/var/private/ssl/server.keystore.jks
            ssl.keystore.password=test1234
            ssl.key.password=test1234
            ssl.truststore.location=/var/private/ssl/server.truststore.jks
            ssl.truststore.password=test1234

Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled. Optional settings that are worth considering: 1. ssl.client.auth=none (“required” => client authentication is required, “requested” => client authentication is requested and client without certs can still connect. The usage of “requested” is discouraged as it provides a false sense of security and misconfigured clients will still connect successfully.) 2. ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list) 3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note that SSL is deprecated in favor of TLS and using SSL in production is not recommended) 4. ssl.keystore.type=JKS 5. ssl.truststore.type=JKS 6. ssl.secure.random.implementation=SHA1PRNG If you want to enable SSL for inter-broker communication, add the following to the server.properties file (it defaults to PLAINTEXT)

                security.inter.broker.protocol=SSL

Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.

The JRE/JDK will have a default pseudo-random number generator (PRNG) that is used for cryptography operations, so it is not required to configure the implementation used with the

    ssl.secure.random.implementation

. However, there are performance issues with some implementations (notably, the default chosen on Linux systems,

    NativePRNG

, utilizes a global lock). In cases where performance of SSL connections becomes an issue, consider explicitly setting the implementation to be used. The

    SHA1PRNG

implementation is non-blocking, and has shown very good performance characteristics under heavy load (50 MB/sec of produced messages, plus replication traffic, per-broker).

Once you start the broker you should be able to see in the server.log

                with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

To check quickly if the server keystore and truststore are setup properly you can run the following command

    openssl s_client -debug -connect localhost:9093 -tls1

(Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server’s certificate:

                -----BEGIN CERTIFICATE-----
            {variable sized random bytes}
            -----END CERTIFICATE-----
            subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
            issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

If the certificate does not show up or if there are any other error messages then your keystore is not setup properly. 5. #### Configuring Kafka Clients

SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.
If client authentication is not required in the broker, then the following is a minimal configuration example:

                security.protocol=SSL
            ssl.truststore.location=/var/private/ssl/client.truststore.jks
            ssl.truststore.password=test1234

Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled. If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:

                ssl.keystore.location=/var/private/ssl/client.keystore.jks
            ssl.keystore.password=test1234
            ssl.key.password=test1234

Other configuration settings that may also be needed depending on our requirements and the broker configuration: 1. ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. 2. ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. 3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side 4. ssl.truststore.type=JKS 5. ssl.keystore.type=JKS

Examples using console-producer and console-consumer:

                kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
            kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

3 - Authentication using SASL

Authentication using SASL

Authentication using SASL

  1. JAAS configuration

Kafka uses the Java Authentication and Authorization Service (JAAS) for SASL configuration.

1. ##### JAAS configuration for Kafka brokers

KafkaServer is the section name in the JAAS file used by each KafkaServer/Broker. This section provides SASL configuration options for the broker including any SASL client connections made by the broker for inter-broker communication. If multiple listeners are configured to use SASL, the section name may be prefixed with the listener name in lower-case followed by a period, e.g. sasl_ssl.KafkaServer.

Client section is used to authenticate a SASL connection with zookeeper. It also allows the brokers to set SASL ACL on zookeeper nodes which locks these nodes down so that only the brokers can modify it. It is necessary to have the same principal name across all brokers. If you want to use a section name other than Client, set the system property zookeeper.sasl.clientconfig to the appropriate name (e.g. , -Dzookeeper.sasl.clientconfig=ZkClient).

ZooKeeper uses “zookeeper” as the service name by default. If you want to change this, set the system property zookeeper.sasl.client.username to the appropriate name (e.g. , -Dzookeeper.sasl.client.username=zk).

Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. The property name must be prefixed with the listener prefix including the SASL mechanism, i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one login module may be specified in the config value. If multiple mechanisms are configured on a listener, configs must be provided for each mechanism using the listener and mechanism prefix. For example,

                    listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                username="admin" \
                password="admin-secret";
            listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
                username="admin" \
                password="admin-secret" \
                user_admin="admin-secret" \
                user_alice="alice-secret";

If JAAS configuration is defined at different levels, the order of precedence used is: * Broker configuration property listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config * {listenerName}.KafkaServer section of static JAAS configuration*KafkaServer section of static JAAS configuration Note that ZooKeeper JAAS config may only be configured using static JAAS configuration.

See GSSAPI (Kerberos), PLAIN, SCRAM or OAUTHBEARER for example broker configurations.

2. ##### JAAS configuration for Kafka clients

Clients may configure JAAS using the client configuration property sasl.jaas.config or using the static JAAS config file similar to brokers.

  1. ###### JAAS configuration using client configuration property

Clients may specify JAAS configuration as a producer or consumer property without creating a physical configuration file. This mode also enables different producers and consumers within the same JVM to use different credentials by specifying different properties for each client. If both static JAAS configuration system property java.security.auth.login.config and client property sasl.jaas.config are specified, the client property will be used.

See GSSAPI (Kerberos), PLAIN, SCRAM or OAUTHBEARER for example configurations.

  2. ###### JAAS configuration using static config file

To configure SASL authentication on the clients using static JAAS config file: 1. Add a JAAS config file with a client login section named KafkaClient. Configure a login module in KafkaClient for the selected mechanism as described in the examples for setting up GSSAPI (Kerberos), PLAIN, SCRAM or OAUTHBEARER. For example, GSSAPI credentials may be configured as:

                                    KafkaClient {
                    com.sun.security.auth.module.Krb5LoginModule required
                    useKeyTab=true
                    storeKey=true
                    keyTab="/etc/security/keytabs/kafka_client.keytab"
                    principal="kafka-client-1@EXAMPLE.COM";
                };

    2. Pass the JAAS config file location as JVM parameter to each client JVM. For example: 
            
                                -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
  1. SASL configuration

SASL may be used with PLAINTEXT or SSL as the transport layer using the security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is used, then SSL must also be configured.

1. ##### SASL mechanisms

Kafka supports the following SASL mechanisms: * GSSAPI (Kerberos) * PLAIN * SCRAM-SHA-256 * SCRAM-SHA-512 * OAUTHBEARER 2. ##### SASL configuration for Kafka brokers

  1. Configure a SASL port in server.properties, by adding at least one of SASL_PLAINTEXT or SASL_SSL to the _listeners_ parameter, which contains one or more comma-separated values: 
        
                        listeners=SASL_PLAINTEXT://host.name:port

If you are only configuring a SASL port (or if you want the Kafka brokers to authenticate each other using SASL) then make sure you set the same SASL protocol for inter-broker communication:

                        security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)

  2. Select one or more supported mechanisms to enable in the broker and follow the steps to configure SASL for the mechanism. To enable multiple mechanisms in the broker, follow the steps here.
3. ##### SASL configuration for Kafka clients

SASL authentication is only supported for the new Java Kafka producer and consumer, the older API is not supported.

To configure SASL authentication on the clients, select a SASL mechanism that is enabled in the broker for client authentication and follow the steps to configure SASL for the selected mechanism.

  1. Authentication using SASL/Kerberos

1. ##### Prerequisites

  1. **Kerberos**  

If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (Ubuntu, Redhat). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security. 2. Create Kerberos Principals
If you are using the organization’s Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools). If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:

                            sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
                sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"

  3. **Make sure all hosts can be reachable using hostnames** \- it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.
2. ##### Configuring Kafka Brokers

  1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab): 
        
                            KafkaServer {
                    com.sun.security.auth.module.Krb5LoginModule required
                    useKeyTab=true
                    storeKey=true
                    keyTab="/etc/security/keytabs/kafka_server.keytab"
                    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
                };
        
                // Zookeeper client authentication
                Client {
                com.sun.security.auth.module.Krb5LoginModule required
                useKeyTab=true
                storeKey=true
                keyTab="/etc/security/keytabs/kafka_server.keytab"
                principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
                };

KafkaServer section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It allows the broker to login using the keytab specified in this section. See notes for more details on Zookeeper SASL configuration. 2. Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see here for more details):

                        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
                -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

  3. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.
  4. Configure SASL port and SASL mechanisms in server.properties as described here. For example: 
        
                        listeners=SASL_PLAINTEXT://host.name:port
                security.inter.broker.protocol=SASL_PLAINTEXT
                sasl.mechanism.inter.broker.protocol=GSSAPI
                sasl.enabled.mechanisms=GSSAPI

We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is “kafka/kafka1.hostname.com@EXAMPLE.com”, so:

                        sasl.kerberos.service.name=kafka

3. ##### Configuring Kafka Clients

To configure SASL authentication on the clients: 1. Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user running the client), so obtain or create these principals as needed. Then configure the JAAS configuration property for each client. Different clients within a JVM may run as different users by specifiying different principals. The property sasl.jaas.config in producer.properties or consumer.properties describes how clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client using a keytab (recommended for long-running processes):

                        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
                useKeyTab=true \
                storeKey=true  \
                keyTab="/etc/security/keytabs/kafka_client.keytab" \
                principal="kafka-client-1@EXAMPLE.COM";

For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with “useTicketCache=true” as in:

                        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
                useTicketCache=true;

JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM. 2. Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client. 3. Optionally pass the krb5 file locations as JVM parameters to each client JVM (see here for more details):

                        -Djava.security.krb5.conf=/etc/kafka/krb5.conf

  4. Configure the following properties in producer.properties or consumer.properties: 
        
                        security.protocol=SASL_PLAINTEXT (or SASL_SSL)
            sasl.mechanism=GSSAPI
            sasl.kerberos.service.name=kafka
  1. Authentication using SASL/PLAIN

SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described here.

The username is used as the authenticated Principal for configuration of ACLs etc. 1. ##### Configuring Kafka Brokers

  1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: 
        
                            KafkaServer {
                    org.apache.kafka.common.security.plain.PlainLoginModule required
                    username="admin"
                    password="admin-secret"
                    user_admin="admin-secret"
                    user_alice="alice-secret";
                };

This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user__userName_ defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties. 2. Pass the JAAS config file location as JVM parameter to each Kafka broker:

                        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

  3. Configure SASL port and SASL mechanisms in server.properties as described here. For example: 
        
                        listeners=SASL_SSL://host.name:port
                security.inter.broker.protocol=SASL_SSL
                sasl.mechanism.inter.broker.protocol=PLAIN
                sasl.enabled.mechanisms=PLAIN

2. ##### Configuring Kafka Clients

To configure SASL authentication on the clients: 1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the PLAIN mechanism:

                        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
                username="alice" \
                password="alice-secret";

The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  2. Configure the following properties in producer.properties or consumer.properties: 
        
                        security.protocol=SASL_SSL
            sasl.mechanism=PLAIN

3. ##### Use of SASL/PLAIN in production

   * SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.
   * The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown here. From Kafka version 2.0 onwards, you can avoid storing clear passwords on disk by configuring your own callback handlers that obtain username and password from an external source using the configuration options `sasl.server.callback.handler.class` and `sasl.client.callback.handler.class`.
   * In production systems, external authentication servers may implement password authentication. From Kafka version 2.0 onwards, you can plug in your own callback handlers that use external authentication servers for password verification by configuring `sasl.server.callback.handler.class`.
  1. Authentication using SASL/SCRAM

Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN and DIGEST-MD5. The mechanism is defined in RFC 5802. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512 which can be used with TLS to perform secure authentication. The username is used as the authenticated Principal for configuration of ACLs etc. The default SCRAM implementation in Kafka stores SCRAM credentials in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Refer to Security Considerations for more details.

1. ##### Creating SCRAM Credentials

The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in Zookeeper using kafka-configs.sh. For each SCRAM mechanism enabled, credentials must be created by adding a config with the mechanism name. Credentials for inter-broker communication must be created before Kafka brokers are started. Client credentials may be created and updated dynamically and updated credentials will be used to authenticate new connections.

Create SCRAM credentials for user alice with password alice-secret :

                > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper. See RFC 5802 for details on SCRAM identity and the individual fields.

The following examples also require a user admin for inter-broker communication which can be created using:

                > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

Existing credentials may be listed using the --describe option:

                > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

Credentials may be deleted for one or more SCRAM mechanisms using the --delete option:

                > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
            

2. ##### Configuring Kafka Brokers

  1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: 
        
                        KafkaServer {
                org.apache.kafka.common.security.scram.ScramLoginModule required
                username="admin"
                password="admin-secret";
            };

The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. 2. Pass the JAAS config file location as JVM parameter to each Kafka broker:

                        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

  3. Configure SASL port and SASL mechanisms in server.properties as described here.`

For example:

                        listeners=SASL_SSL://host.name:port
            security.inter.broker.protocol=SASL_SSL
            sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
            sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)

3. ##### Configuring Kafka Clients

To configure SASL authentication on the clients: 1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the SCRAM mechanisms:

                       sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                username="alice" \
                password="alice-secret";

The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  2. Configure the following properties in producer.properties or consumer.properties: 
        
                        security.protocol=SASL_SSL
            sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)

4. ##### Security Considerations for SASL/SCRAM

   * The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in Zookeeper. This is suitable for production use in installations where Zookeeper is secure and on a private network.
   * Kafka supports only the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if Zookeeper security is compromised.
   * SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if Zookeeper is compromised.
   * From Kafka version 2.0 onwards, the default SASL/SCRAM credential store may be overridden using custom callback handlers by configuring `sasl.server.callback.handler.class` in installations where Zookeeper is not secure.
   * For more details on security considerations, refer to [RFC 5802](https://tools.ietf.org/html/rfc5802#section-9). 
  1. Authentication using SASL/OAUTHBEARER

The OAuth 2 Authorization Framework “enables a third-party application to obtain limited access to an HTTP service, either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP service, or by allowing the third-party application to obtain access on its own behalf.” The SASL OAUTHBEARER mechanism enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in RFC 7628. The default OAUTHBEARER implementation in Kafka creates and validates Unsecured JSON Web Tokens and is only suitable for use in non-production Kafka installations. Refer to Security Considerations for more details.

1. ##### Configuring Kafka Brokers

  1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example: 
        
                        KafkaServer {
                org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
                unsecuredLoginStringClaim_sub="admin";
            };

The property unsecuredLoginStringClaim_sub in the KafkaServer section is used by the broker when it initiates connections to other brokers. In this example, admin will appear in the subject (sub) claim and will be the user for inter-broker communication. 2. Pass the JAAS config file location as JVM parameter to each Kafka broker:

                        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

  3. Configure SASL port and SASL mechanisms in server.properties as described here.`

For example:

                        listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
            security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
            sasl.mechanism.inter.broker.protocol=OAUTHBEARER
            sasl.enabled.mechanisms=OAUTHBEARER

2. ##### Configuring Kafka Clients

To configure SASL authentication on the clients: 1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the OAUTHBEARER mechanisms:

                       sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
                unsecuredLoginStringClaim_sub="alice";

The option unsecuredLoginStringClaim_sub is used by clients to configure the subject (sub) claim, which determines the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different subject (sub) claims in sasl.jaas.config.

JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  2. Configure the following properties in producer.properties or consumer.properties: 
        
                        security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
            sasl.mechanism=OAUTHBEARER

  3. The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library. Since it's an optional dependency, users have to configure it as a dependency via their build tool.
3. ##### Unsecured Token Creation Options for SASL/OAUTHBEARER

   * The default implementation of SASL/OAUTHBEARER in Kafka creates and validates [Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5). While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.
   * Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):  JAAS Module Option for Unsecured Token Creation | Documentation  

—|—
unsecuredLoginStringClaim_<claimname>="value" | Creates a String claim with the given name and value. Any valid claim name can be specified except ‘iat’ and ‘exp’ (these are automatically generated).
unsecuredLoginNumberClaim_<claimname>="value" | Creates a Number claim with the given name and value. Any valid claim name can be specified except ‘iat’ and ‘exp’ (these are automatically generated).
unsecuredLoginListClaim_<claimname>="value" | Creates a String List claim with the given name and values parsed from the given value where the first character is taken as the delimiter. For example: unsecuredLoginListClaim_fubar="|value1|value2". Any valid claim name can be specified except ‘iat’ and ‘exp’ (these are automatically generated).
unsecuredLoginExtension_<extensionname>="value" | Creates a String extension with the given name and value. For example: unsecuredLoginExtension_traceId="123". A valid extension name is any sequence of lowercase or uppercase alphabet characters. In addition, the “auth” extension name is reserved. A valid extension value is any combination of characters with ASCII codes 1-127.
unsecuredLoginPrincipalClaimName | Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than ‘sub’.
unsecuredLoginLifetimeSeconds | Set to an integer value if the token expiration is to be set to something other than the default value of 3600 seconds (which is 1 hour). The ‘exp’ claim will be set to reflect the expiration time.
unsecuredLoginScopeClaimName | Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than ‘scope’.
4. ##### Unsecured Token Validation Options for SASL/OAUTHBEARER

   * Here are the various supported JAAS module options on the broker side for [Unsecured JSON Web Token](https://tools.ietf.org/html/rfc7515#appendix-A.5) validation:  JAAS Module Option for Unsecured Token Validation | Documentation  

—|—
unsecuredValidatorPrincipalClaimName="value" | Set to a non-empty value if you wish a particular String claim holding a principal name to be checked for existence; the default is to check for the existence of the ‘sub’ claim.
unsecuredValidatorScopeClaimName="value" | Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than ‘scope’.
unsecuredValidatorRequiredScope="value" | Set to a space-delimited list of scope values if you wish the String/String List claim holding the token scope to be checked to make sure it contains certain values.
unsecuredValidatorAllowableClockSkewMs="value" | Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0).
* The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments) using custom login and SASL Server callback handlers. * For more details on security considerations, refer to RFC 6749, Section 10. 5. ##### Token Refresh for SASL/OAUTHBEARER

Kafka periodically refreshes any token before it expires so that the client can continue to make connections to brokers. The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration and are as follows. See the documentation for these properties elsewhere for details. The default values are usually reasonable, in which case these configuration parameters would not need to be explicitly set. Producer/Consumer/Broker Configuration Property

sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.login.refresh.min.period.seconds
sasl.login.refresh.min.buffer.seconds
6. ##### Secure/Production Use of SASL/OAUTHBEARER

Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).

Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option. 7. ##### Security Considerations for SASL/OAUTHBEARER

   * The default implementation of SASL/OAUTHBEARER in Kafka creates and validates [Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5). This is suitable only for non-production use.
   * OAUTHBEARER should be used in production enviromnments only with TLS-encryption to prevent interception of tokens.
   * The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments) using custom login and SASL Server callback handlers as described above.
   * For more details on OAuth 2 security considerations in general, refer to [RFC 6749, Section 10](https://tools.ietf.org/html/rfc6749#section-10).
  1. Enabling multiple SASL mechanisms in a broker

1. Specify configuration for the login modules of all enabled mechanisms in the `KafkaServer` section of the JAAS config file. For example: 
    
                    KafkaServer {
                com.sun.security.auth.module.Krb5LoginModule required
                useKeyTab=true
                storeKey=true
                keyTab="/etc/security/keytabs/kafka_server.keytab"
                principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    
                org.apache.kafka.common.security.plain.PlainLoginModule required
                username="admin"
                password="admin-secret"
                user_admin="admin-secret"
                user_alice="alice-secret";
            };

2. Enable the SASL mechanisms in server.properties: 
    
                sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER

3. Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required: 
    
                security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
        sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)

4. Follow the mechanism-specific steps in GSSAPI (Kerberos), PLAIN, SCRAM and OAUTHBEARER to configure SASL for the enabled mechanisms.
  1. Modifying SASL mechanism in a Running Cluster

SASL mechanism can be modified in a running cluster using the following sequence:

1. Enable new SASL mechanism by adding the mechanism to `sasl.enabled.mechanisms` in server.properties for each broker. Update JAAS config file to include both mechanisms as described here. Incrementally bounce the cluster nodes.
2. Restart clients using the new mechanism.
3. To change the mechanism of inter-broker communication (if this is required), set `sasl.mechanism.inter.broker.protocol` in server.properties to the new mechanism and incrementally bounce the cluster again.
4. To remove old mechanism (if this is required), remove the old mechanism from `sasl.enabled.mechanisms` in server.properties and remove the entries for the old mechanism from JAAS config file. Incrementally bounce the cluster again.
  1. Authentication using Delegation Tokens

Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing Kerberos TGT/keytabs or keystores when 2-way SSL is used. See KIP-48 for more details.

Typical steps for delegation token usage are:

1. User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done using AdminClient APIs or using `kafka-delegation-token.sh` script.
2. User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster.
3. Token owner/renewer can renew/expire the delegation tokens.
1. ##### Token Management

A master key/secret is used to generate and verify delegation tokens. This is supplied using config option delegation.token.master.key. Same secret key must be configured across all the brokers. If the secret is not set or set to empty string, brokers will disable the delegation token authentication.

In current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Also currently, master key/secret is stored as plain text in server.properties config file. We intend to make these configurable in a future Kafka release.

A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

2. ##### Creating Delegation Tokens

Tokens can be created by using AdminClient APIs or using kafka-delegation-token.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. kafka-delegation-token.sh script examples are given below.

Create a delegation token:

                > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

Renew a delegation token:

                > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

Expire a delegation token:

                > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

Existing tokens can be described using the –describe option:

                > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
            

3. ##### Token Authentication

Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

Configuring Kafka Clients:

  1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication: 
        
                       sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
                username="tokenID123" \
                password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
                tokenauth="true";

The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

4. ##### Procedure to manually rotate the secret:

We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

  1. Expire all existing tokens.
  2. Rotate the secret by rolling upgrade, and
  3. Generate new tokens

We intend to automate this in a future Kafka release.

5. ##### Notes on Delegation Tokens

   * Currently, we only allow a user to create delegation token for that user only. Owner/Renewers can renew or expire tokens. Owner/renewers can always describe their own tokens. To describe others tokens, we need to add DESCRIBE permission on Token Resource.

4 - Authorization and ACLs

Authorization and ACLs

Authorization and ACLs

Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting authorizer.class.name in server.properties. To enable the out of the box implementation use:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP”. You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.

allow.everyone.if.no.acl.found=true

One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string “User” is case sensitive.

super.users=User:Bob;User:Alice

By default, the SSL user name will be of the form “CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”. One can change that by setting a customized PrincipalBuilder in server.properties like the following.

principal.builder.class=CustomizedPrincipalBuilderClass

By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase rule, to force the translated result to be all lower case. This is done by adding a “/L” to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.

        RULE:[n:string](regexp)s/pattern/replacement/
        RULE:[n:string](regexp)s/pattern/replacement/g
        RULE:[n:string](regexp)s/pattern/replacement//L
        RULE:[n:string](regexp)s/pattern/replacement/g/L

An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

Command Line Interface

Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

OptionDescriptionDefaultOption type
--addIndicates to the script that user is trying to add an acl.Action
--removeIndicates to the script that user is trying to remove an acl.Action
--listIndicates to the script that user is trying to list acls.Action
--authorizerFully qualified class name of the authorizer.kafka.security.auth.SimpleAclAuthorizerConfiguration
--authorizer-propertieskey=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181Configuration
--bootstrap-serverA list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of –bootstrap-server or –authorizer option must be specified.Configuration
--command-configA property file containing configs to be passed to Admin Client. This option can only be used with –bootstrap-server option.Configuration
--clusterIndicates to the script that the user is trying to interact with acls on the singular cluster resource.ResourcePattern
--topic [topic-name]Indicates to the script that the user is trying to interact with acls on topic resource pattern(s).ResourcePattern
--group [group-name]Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s)ResourcePattern
--resource-pattern-type [pattern-type]Indicates to the script the type of resource pattern, (for –add), or resource pattern filter, (for –list and –remove), the user wishes to use.
When adding acls, this should be a specific pattern type, e.g. ’literal’ or ‘prefixed’.
When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of ‘any’ or ‘match’ can be used, where ‘any’ will match any pattern type, but will match the resource name exactly, and ‘match’ will perform pattern matching to list or remove all acls that affect the supplied resource(s).
WARNING: ‘match’, when used in combination with the ‘–remove’ switch, should be used with care.literalConfiguration
--allow-principalPrincipal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string “User” is case sensitive.
You can specify multiple –allow-principal in a single command.Principal
--deny-principalPrincipal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string “User” is case sensitive.
You can specify multiple –deny-principal in a single command.Principal
--principalPrincipal is in PrincipalType:name format that will be used along with –list option. Default PrincipalType string “User” is case sensitive. This will list the ACLs for the specified principal.
You can specify multiple –principal in a single command.Principal
--allow-hostIP address from which principals listed in –allow-principal will have access.if –allow-principal is specified defaults to * which translates to “all hosts”Host
--deny-hostIP address from which principals listed in –deny-principal will be denied access.if –deny-principal is specified defaults to * which translates to “all hosts”Host
--operationOperation that will be allowed or denied.
Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, AllAllOperation
--producerConvenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic.Convenience
--consumerConvenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group.Convenience
--forceConvenience option to assume yes to all queries and do not prompt.Convenience

Examples

  • Adding Acls
    Suppose you want to add an acl “Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1”. You can do that by executing the CLI with following options:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

By default, all principals that don’t have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the –deny-principal and –deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic

Note that --allow-host and deny-host only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying –topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying –cluster and to a consumer group by specifying –group [group-name]. You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl “Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0” You can do that by using the wildcard resource ‘*’, e.g. by executing the CLI with following options:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *

You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl “Principal User:Jane is allowed to produce to any Topic whose name starts with ‘Test-’ from any host”. You can do that by executing the CLI with following options:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed

Note, –resource-pattern-type defaults to ’literal’, which only affects resources with the exact same name or, in the case of the wildcard resource name ‘*’, a resource with any name.

  • Removing Acls
    Removing acls is pretty much the same. The only difference is instead of –add option users will have to specify –remove option. To remove the acls added by the first example above we can execute the CLI with following options:

     bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic 
    

If you wan to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:

     bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
  • List Acls
    We can list acls for any resource by specifying the –list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    

However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic, e.g. any acls on the topic wildcard ‘*’, or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *

However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all acls affecting Test-topic by using ‘–resource-pattern-type match’, e.g.

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match

This will list acls on all matching literal, wildcard and prefixed resource patterns.

  • Adding or removing a principal as producer or consumer
    The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command:

     bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass –consumer option:

     bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 

Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass –remove option.

  • AdminClient API based acl management
    Users having Alter permission on ClusterResource can use AdminClient API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly. All the above examples can be executed by using --bootstrap-server option. For example:

                bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
            bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
    

5 - Incorporating Security Features in a Running Cluster

Incorporating Security Features in a Running Cluster

Incorporating Security Features in a Running Cluster

You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:

  • Incrementally bounce the cluster nodes to open additional secured port(s).
  • Restart clients using the secured rather than PLAINTEXT port (assuming you are securing the client-broker connection).
  • Incrementally bounce the cluster again to enable broker-to-broker security (if this is required)
  • A final incremental bounce to close the PLAINTEXT port.

The specific steps for configuring SSL and SASL are described in sections 7.2 and 7.3. Follow these steps to enable security for your desired protocol(s).

The security implementation lets you configure different protocols for both broker-client and broker-broker communication. These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.

When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It’s also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.

As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, a SSL port is opened on each node:

            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092

We then restart the clients, changing their config to point at the newly opened, secured port:

            bootstrap.servers = [broker1:9092,...]
            security.protocol = SSL
            ...etc

In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):

            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
            security.inter.broker.protocol=SSL

In the final bounce we secure the cluster by closing the PLAINTEXT port:

            listeners=SSL://broker1:9092
            security.inter.broker.protocol=SSL

Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we’d like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:

            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093

We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:

            bootstrap.servers = [broker1:9093,...]
            security.protocol = SASL_SSL
            ...etc

The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:

            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
            security.inter.broker.protocol=SSL

The final bounce secures the cluster by closing the PLAINTEXT port.

        listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
        security.inter.broker.protocol=SSL

ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.

6 - ZooKeeper Authentication

ZooKeeper Authentication

ZooKeeper Authentication

New clusters

To enable ZooKeeper authentication on brokers, there are two necessary steps:

  1. Create a JAAS login file and set the appropriate system property to point to it as described above
  2. Set the configuration property zookeeper.set.acl in each broker to true

The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).

Migrating clusters

If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:

  1. Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs
  2. Perform a second rolling restart of brokers, this time setting the configuration parameter zookeeper.set.acl to true, which enables the use of secure ACLs when creating znodes
  3. Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: ./bin/zookeeper-security-migration.sh with zookeeper.acl set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes

It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:

  1. Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting zookeeper.set.acl to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes
  2. Execute the ZkSecurityMigrator tool. To execute the tool, run this script ./bin/zookeeper-security-migration.sh with zookeeper.acl set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes
  3. Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file

Here is an example of how to run the migration tool:

    ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

Run this to see the full list of parameters:

    ./bin/zookeeper-security-migration.sh --help

Migrating the ZooKeeper ensemble

It is also necessary to enable authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. Please refer to the ZooKeeper documentation for more detail:

  1. Apache ZooKeeper documentation
  2. Apache ZooKeeper wiki