Securing Apache Kafka Cluster using SSL, SASL and ACL
Enabling authentication using SCRAM , authorization using SimpleAclAuthorizer and encryption between clients and server using SSL.
Pre-requisite: Novice skills on Apache Kafka, Kafka producers and consumers. This blog will focus more on SASL, SSL and ACL on top of Apache Kafka Cluster.
In this blog, we will go over the configurations for enabling authentication using SCRAM , authorization using SimpleAclAuthorizer and encryption between clients and server using SSL.
Apache Kafka these days is widely used for exchange of sensitive information as well between various systems with in the company and also between different companies. So secure connections like HTTPS is essential between client applications and Kafka Server. Apache Kafka open source community contributed multiple Kafka Security options for Authentication, Authorization and Encryption.
Authentication in Kafka:
- SSL
- SASL: PLAIN, SCRAM(SHA-256 and SHA-512), OAUTHBEARER, GSSAPI(Kerberos)
Authorization in Kafka: Kafka comes with simple authorization class kafka.security.auth.SimpleAclAuthorizer for handling ACL's (create, read, write, describe, delete). We can add on our own custom implementation of Authorizer class as well.
Encryption: Data Encryption over the network between clients and Kafka server. i.e between producers and Kafka server, consumers and Kafka server.
Authentication using SCRAM:
Salted Challenge Response Authentication Mechanism (SCRAM) is similar to authentication using username/password. Apache Kafka supports SCRAM-SHA-256
and SCRAM-SHA-512
. For this mechanism, Kafka by default (ScramLoginModule) stores SCRAM credentials in zookeeper with the salt, so zookeeper need to be secured in the private network with very limited access. The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in ZooKeeper. In our example below, we will use SCRAM-SHA-512
for authentication.
Authorization using SimpleAclAuthorizer:
Apache Kafka ships out with simple default authorization implementation kafka.security.auth.SimpleAclAuthorizer for all ACL operations. SimpleAcl Authroizer comes up create, read, write, cluster_action, alter_configs, describe and delete permissions. In the below examples, we will see role of each of these permissions and any application errors in case if these are not properly assigned before any operation.
Encryption using SSL:
SSL(Secure Sockets Layer) can be configured for encryption and also serves as 2-way authentication between client and server. i.e broker authenticates client using client truststore certificate and client authenticates broker using broker truststore certificate. In this article, we will just configure SSL only for encryption.SSL uses private-key/certificates pairs which are used during the SSL handshake process.
- each Kafka broker needs its own private-key/certificate pair, and then client uses certificate to authenticate the broker
You could refer this Github repo for spinning up secure cluster by adding the required configs locally on your machine.
Now, Let's get onto working example:
Download Apache Kafka binary from open source Apache Kafka Downloads.
SSL Certificate and Key generation: Create Kafka broker SSL keystore and truststore certificate using confluent-platform-security-tools generate ssl script. Make a note of keystore and truststore password which you pass in while executing this script, we would need these for configuring Kafka server.properties.

After generating truststore and keystore folders through this script, we will copy them over to config folder in downloaded Kafka binary directory.
We will execute all below commands by navigating to <kafka-binary-dir> in terminal.
Start Zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
Create Kafka Super User:
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=\[password='admin-secret'\]' --entity-type users --entity-name admin
Completed Updating config for entity: user-principal 'admin'.
Create kafka_server_jaas.conf file in config folder
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
Create ssl-user-config.properties file in config folder
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demo-user" password="secret";
ssl.truststore.location=
<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password
Append below security properties to existing Kafka server.properties file in config folder.
########### SECURITY using SCRAM-SHA-512 and SSL ###################
listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SASL_SSL://localhost:9094
advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SASL_SSL://localhost:9094
security.inter.broker.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=ssl.client.auth=required
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# Broker security settings
ssl.truststore.location=<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=<kafka-binary-dir>/config/keystore/kafka.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
# ACLs
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
#zookeeper SASL
zookeeper.set.acl=false
########### SECURITY using SCRAM-SHA-512 and SSL ###################
Now Start Kafka with jaas conf file:
export KAFKA_OPTS=-Djava.security.auth.login.config=<kafka-binary-dir>/config/kafka_server_jaas.conf
./bin/kafka-server-start.sh ./config/server.properties
Clients can now connect to Kafka cluster using 3 ways for above server.properties configuration.
- localhost:9092: PLAIN_TEXT with no authentication and authorization.
- localhost:9093: PLAIN_TEXT with authentication and authorization.
- localhost:9094: along with Encryption, authentication, authorization.
Now Let's go over different workflow's using Kafka Cluster.
Create User:
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='secret']' --entity-type users --entity-name demouser
Completed Updating config for entity: user-principal 'demouser'.
Create ssl-user-config.properties in /config folder:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";
ssl.truststore.location=<kafka-bin-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password
Creating Topic without Create Permissions:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic demo-topic
Error while executing topic command : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-06-20 11:27:29,244] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
(kafka.admin.TopicCommand$)
Assign Create and Describe permissions to demouser and then create topic:
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --operation Create --operation Describe --topic demo-topic
Adding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
./bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic demo-topic
Topic demo-topic created
Create ssl-producer.properties in config folder.
bootstrap.servers=localhost:9094
compression.type=none
### SECURITY ######
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";
ssl.truststore.location=<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password
Produce with incorrect password: By modifying password to something else in ssl-producer.properties and then try to produce data on demo-topic
./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties
[2019-06-22 18:41:27,406] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9094) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
Produce with correct password and with no produce permissions on topic demo-topic
./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties
>ERROR Error when sending message to topic demo-topic with key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [demo-topic]
Assign Producer permissions for demouser to demo-topic
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --producer --topic demo-topic
Adding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *
Produce messages on demo-topic with correct password and permissions:
./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties
>message1
>message2
>message3
^C
we have now produced 3 messages on to demo-topic, let's try to consume those messages from this topic, by creating a consumer.
Create ssl-consumer.properties in config folder
bootstrap.servers=localhost:9094
# consumer group id
group.id=demo-consumer-group
### SECURITY ######
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";
ssl.truststore.location=<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password
Consume data from demo-topic with incorrect password in ssl-consumer.properties
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties
[2019-06-22 18:46:41,983] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
Processed a total of 0 messages
Consume data from demo-topic with correct password in ssl-consumer.properties
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties
[2019-06-22 18:47:37,461] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: demo-consumer-group
Processed a total of 0 messages
Assign Consumer permissions for demouser on demo-topic
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --consumer --topic demo-topic --group demo-consumer-group
Adding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Read from hosts: *
Adding ACLs for resource `Group:LITERAL:demo-consumer-group`:
User:demouser has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *
User:demouser has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Group:LITERAL:demo-consumer-group`:
User:demouser has Allow permission for operations: Read from hosts: *
Consume messages from demo-topic with correct password and consume permissions:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties
message1
message2
message3
^C
Summary
To summarize, we have seen how to encrypt Kafka Cluster connections using SSL, authentication using SCRAM, and authorization using a Kafka inbuilt SimpleAclAuthorizer class. In the upcoming blogs, I will show you how to configure other options such as authentication using the OAuthBearer Token.
References:
You might also like
Generative AI vs Traditional Machine Learning: What Sets Them Apart?
The future of AI is filled with endless possibilities. Understanding the differences in AI is crucial for businesses to make informed decisions about which approach best suits their needs.
Read article5 Key Ways Data is Transforming Healthcare and Life Sciences
Healthcare accounts for 30% of global data, growing at 36% annually. Don't let your data drown—with structured storage and BI tools. Discover our 5 real-life use cases.
Read article10 Reasons Why Businesses Consider Google Cloud Platform (GCP)
Unleash the power of Google Cloud Platform (GCP) for your business. From scalability and big data processing to cost reduction and application development, find out why GCP is a top choice.
Read articleNavigating HIPAA Compliance: A Checklist for Healthcare Organizations
Ease your HIPAA compliance journey with our curated checklist— your trusty roadmap to navigate the complexities of data protection.
Read articleDeep Learning & Computer Vision: A Hybrid Future
AI's potential has drawn vast interest, with many rushing to harness its prowess. Deep Learning & Computer Vision are now hot topics, yet their complex inner workings are often overlooked.
Read articleHow to Reduce AWS Costs: Strategies and Best Practices
Discover how to reduce your AWS bill without sacrificing functionality with our tried-and-test tips and expert guidance.
Read articleHow AI and Personalized Marketing are Transforming Retail Sales
How AI/ML, CDP, personalization, and BI are revolutionizing retail, fashion, and beauty. Dive into brand examples from Sephora, ThredUp, and H&M.
Read article19 Cloud Computing Statistics You Need to Know in 2023
By 2025, over 100 zettabytes of data will be stored in the cloud—50% of all global data storage.
Read article5 Ways to Transform Grocery Retail with an AI-Driven Data Strategy
Explore 5 AI-driven data strategies for grocery retail. Learn how to solve challenges like workforce management, pricing, and disconnected CX.
Read article