In this article, you will learn how to run Apache Kafka on Kubernetes using the open-source Strimzi project. You will setup Strizmi on Vultr Kubernetes Engine, setup a Kafka cluster, sent messages to a topic and recieved messages from that topic. You will also secure the Kafka cluster using encryption and user authentication.
Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate these data streams with other parts of your system in a secure, reliable, and scalable manner.
Key components of Apache Kafka:
Broker (Node): A Kafka broker runs the Kafka JVM process. A best practice is to run three or more brokers for scalability and high availability. These groups of Kafka brokers form a cluster.
Producers: These are client applications that send messages to Kafka. Each message is nothing but a key-value pair.
Topics: Events (messages) are stored in topics, and each topic has one or more partitions. Data in each of these partitions are distributed across the Kafka cluster for high availability and redundancy.
Consumers: Just like producers, consumers are also client applications. They receive and process data/events from Kafka topics.
Strimzi can be used to run an Apache Kafka cluster on Kubernetes. In addition to the cluster itself, Strimzi can also help you manage topics, users, Mirror Maker and Kafka Connect deployments. With Strimzi, you can configure the cluster as per your needs. This includes advanced features such as rack awareness configuration to distribute Kafka nodes across availability zones, as well as Kubernetes taints and tolerations to pin Kafka to dedicated worker nodes in your Kubernetes cluster. You can also expose Kafka to external clients outside the Kubernetes cluster using Service
types such as NodePort
, LoadBalancer
etc. and these can be secured using SSL
.
All this is made possible with a combination of Custom resources, Operators and respective Docker container images.
You can customize Strimzi Kafka components in a Kubernetes cluster using custom resources. These are created as instances of APIs added by Custom resource definitions (CRDs) that extend Kubernetes resources. Each Strimzi component has an associated CRD which is used to describe that component.
Thanks to CRDs, Strimzi resources benefit from Kubernetes features such as CLI accessibility and configuration validation.
Once a Strimzi custom resource is created, it's managed using Operators. Operators are a method of packaging, deploying, and managing a Kubernetes-native application. Because Strimzi Operators automate common and complex tasks related to a Kafka deployment, Kafka administration tasks are simplified and require less manual intervention.
Let's look at the Strimzi operators and the custom resources they manage.
Cluster Operator
Strimzi Cluster Operator is used to deploy and manage Kafka components. Although a single Cluster Operator instance is deployed by default, you can add replicas with leader election to ensure operator high availability.
The Cluster Operator manages the following Kafka components:
Kafka - The Kafka
resource is used to configure a Kafka deployment. Configuration options for the ZooKeeper cluster also included within the Kafka
resource.
KafkaConnector - This resources allow you to create and manage connector instances for Kafka Connect.
KafkaMirrorMaker2 - It can be used to run and manage a Kafka MirrorMaker 2.0 deployment. MirrorMaker 2.0 replicates data between two or more Kafka clusters, within or across data centers.
KafkaBridge - This recourse managed Kafka Bridge, which is component that provides an API for integrating HTTP-based clients with a Kafka cluster.
Entity Operator
Entity Operator comprises the Topic
and User
Operator.
You can declare a KafkaTopic
resource as part of your applicationâs deployment and the Topic Operator will take care of creating the topic for you and keeping them in-sync with corresponding Kafka topics. Information about each topic in a topic store, which is continually synchronized with updates from Kafka topics or Kubernetes KafkaTopic
custom resources. If a topic is reconfigured or reassigned to other brokers, the KafkaTopic
will always be up to date.
KafkaUser
resource as part of your applicationâs deployment along with authentication and authorization mechanisms for the user. You can also configure user quotas that control usage of Kafka resources. In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the userâs access rights in the KafkaUser
declaration.Install kubectl on your local workstation. It is a Kubernetes command-line tool that allows you to run commands against Kubernetes clusters.
Deploy a Vultr Kubernetes Engine (VKE) cluster using the Reference Guide. Once it's deployed, from the Overview tab, click the Download Configuration button in the upper-right corner to download your kubeconfig
file and save it to a local directory.
Point kubectl
to Vultr Kubernetes Engine cluster by setting the KUBECONFIG
environment variable to the path where you downloaded the cluster kubeconfig
file in the previous step.
export KUBECONFIG=<path to VKE kubeconfig file>
Verify the same using the following command:
kubectl config current-context
Create a namespace called kafka
.
kubectl create namespace kafka
You should see this output:
namespace/kafka created
Apply the Strimzi installation files, including ClusterRoles
, ClusterRoleBindings
and Custom Resource Definitions (CRDs).
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
You should see this output:
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-leader-election created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-watched created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
deployment.apps/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
customresourcedefinition.apiextensions.k8s.io/kafkarebalances.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-leader-election created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-client-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-watched created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-client created
configmap/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
serviceaccount/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/strimzipodsets.core.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
Follow the deployment of the Strimzi cluster operator and wait for the Pod
to transition to Running
status.
kubectl get pod -n kafka --watch
You should see this output (the Pod
name might differ in your case):
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-56d64c8584-7k6sr 1/1 Running 0 43s
To check the operatorâs log:
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
Create a directory and switch to it:
mkdir vultr-vke-kafka
cd vultr-vke-kafka
Create a new file kafka-cluster-1.yml
:
touch kafka-cluster-1.yml
Add the below contents to kafka-cluster-1.yml
file and save it.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster-1
spec:
kafka:
version: 3.3.1
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.3"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Install the Kafka cluster:
kubectl apply -f kafka-cluster-1.yml -n kafka
You should see this output:
kafka.kafka.strimzi.io/my-cluster-1 created
Wait for cluster to be created.
kubectl wait kafka/my-cluster-1 --for=condition=Ready --timeout=300s -n kafka
Once completed, you will see this output:
kafka.kafka.strimzi.io/my-cluster-1 condition met
Verify Kafka cluster
kubectl get kafka -n kafka
You should see this output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster-1 1 1 True True
Verify Kafka Pod
kubectl get pod/my-cluster-1-kafka-0 -n kafka
You should see this output:
NAME READY STATUS RESTARTS AGE
my-cluster-1-kafka-0 1/1 Running 0 9m23s
Verify Zookeeper Pod
kubectl get pod/my-cluster-1-zookeeper-0 -n kafka
You should see this output:
NAME READY STATUS RESTARTS AGE
my-cluster-1-zookeeper-0 1/1 Running 0 10m
Check the ConfigMap
s associated with the cluster:
kubectl get configmap -n kafka
You should see this output:
NAME DATA AGE
kube-root-ca.crt 1 57m
my-cluster-1-entity-topic-operator-config 1 9m20s
my-cluster-1-entity-user-operator-config 1 9m20s
my-cluster-1-kafka-0 3 9m45s
my-cluster-1-zookeeper-config 2 10m
strimzi-cluster-operator 1 57m
Check Service
s associated with the cluster:
kubectl get svc -n kafka
You should see this output (the ClusterIP
s might differ in your case):
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-1-kafka-bootstrap ClusterIP 10.96.23.73 <none> 9091/TCP,9092/TCP,9093/TCP 10m
my-cluster-1-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 10m
my-cluster-1-zookeeper-client ClusterIP 10.108.246.28 <none> 2181/TCP 10m
my-cluster-1-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 10m
Check Secret
s associated with the cluster:
kubectl get secret -n kafka
You should see this output:
NAME TYPE DATA AGE
my-cluster-1-clients-ca Opaque 1 10m
my-cluster-1-clients-ca-cert Opaque 3 10m
my-cluster-1-cluster-ca Opaque 1 10m
my-cluster-1-cluster-ca-cert Opaque 3 10m
my-cluster-1-cluster-operator-certs Opaque 4 10m
my-cluster-1-entity-topic-operator-certs Opaque 4 9m44s
my-cluster-1-entity-user-operator-certs Opaque 4 9m44s
my-cluster-1-kafka-brokers Opaque 4 10m
my-cluster-1-zookeeper-nodes Opaque 4 10m
You can test the Kafka cluster using the Kafka CLI based consumer and producer.
You will verify cluster functionality by producing data using Kafka CLI producer and consuming data using Kafka CLI consumer.
Run a Pod
to execute Kafka CLI producer and send data to a topic
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-1-kafka-bootstrap:9092 --topic my-topic
You should see the following output with prompt
If you don't see a command prompt, try pressing enter.
>
Enter messages in the prompt. These will be send to the specified Kafka topic.
Open a new terminal. Point kubectl
to Vultr Kubernetes Engine cluster by setting the KUBECONFIG
environment variable to the path where you downloaded the cluster kubeconfig
file.
export KUBECONFIG=<path to VKE kubeconfig file>
Run a Pod
to execute Kafka CLI consumer to consume data from a topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-1-kafka-bootstrap:9092 --topic my-topic --from-beginning
You should receive messages you sent from the producer terminal.
Press ctrl+c on each terminal to close them. This will delete both the Pod
s.
Delete the Kafka cluster
kubectl delete -f kafka-cluster-1.yml -n kafka
Verify that the associated Pod
s were deleted. Wait for my-cluster-1-kafka-0
and my-cluster-1-zookeeper-0
Pod
s to terminate.
kubectl get pods -n kafka
So far, you have setup a simple Kafka cluster. In the next section, you will learn how to secure the setup by using the following:
Encryption via TLS.
Authentication via SASL SCRAM
.
Create a new file kafka-cluster-2.yml
:
touch kafka-cluster-2.yml
Add the below contents to kafka-cluster-2.yml
file and save it.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster-2
spec:
kafka:
version: 3.3.1
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: true
authentication:
type: scram-sha-512
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.3"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Install the Kafka cluster:
kubectl apply -f kafka-cluster-2.yml -n kafka
You should see this output:
kafka.kafka.strimzi.io/my-cluster-2 created
Wait for cluster to be created.
kubectl wait kafka/my-cluster-2 --for=condition=Ready --timeout=300s -n kafka
Once completed, you will see this output:
kafka.kafka.strimzi.io/my-cluster-2 condition met
Verify Kafka cluster
kubectl get kafka -n kafka
You should see this output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster-2 1 1 True True
Create a new file kafka-user.yml
:
touch kafka-user.yml
Add the below contents to kafka-user.yml
file and save it.
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: test-kafka-user
labels:
strimzi.io/cluster: my-cluster-2
spec:
authentication:
type: scram-sha-512
Create the KafkaUser
resource
kubectl apply -f kafka-user.yml -n kafka
You should see this output:
kafkauser.kafka.strimzi.io/test-kafka-user created
Verify user creation
kubectl get kafkauser -n kafka
You should see this output:
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY
test-kafka-user my-cluster-2 scram-sha-512 True
When the user is created, the User Operator creates a Kubernetes Secret
and seeds it with the user credentials required to authenticate to the Kafka cluster.
Verify the Secret
kubectl get secret/test-kafka-user -n kafka -o yaml
You will verify cluster functionality by producing data using Kafka CLI producer and consuming data using Kafka CLI consumer.
The CLI clients will connect to the Kafka broker using SSL.
The CLI clients will need to authenticate to the Kafka broker using username and password.
Fetch the password for the Kafka user that you had created and save it to your local workstation.
kubectl get secret test-kafka-user -n kafka -o jsonpath='{.data.password}' | base64 --decode > user.password
Fetch the Kafka server certificate and save it to your local workstation.
kubectl get secret my-cluster-2-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' -n kafka | base64 --decode > ca.p12
Fetch the Kafka server certificate password and save it to your local workstation.
kubectl get secret my-cluster-2-cluster-ca-cert -o jsonpath='{.data.ca\.password}' -n kafka | base64 --decode > ca.password
Open a new terminal. Point kubectl
to Vultr Kubernetes Engine cluster by setting the KUBECONFIG
environment variable to the path where you downloaded the cluster kubeconfig
file.
export KUBECONFIG=<path to VKE kubeconfig file>
Start a new Pod
name kafka-producer
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never
You should see a shell prompt after the Pod
starts
[kafka@kafka-producer kafka]$
From the previous terminal, copy the local certificate into the kafka-producer
Pod
that you just started:
kubectl cp ca.p12 kafka-producer:/tmp -n kafka
Go back to the terminal where the kafka-producer
Pod
is running and execute the below commands
cp $JAVA_HOME/lib/security/cacerts /tmp/cacerts
chmod 777 /tmp/cacerts
Import the server CA certificate in to the keystore. For keypass
, use the password you had saved to your local ca.password
file
keytool -importcert -alias strimzi-kafka-cert -file /tmp/ca.p12 -keystore /tmp/cacerts -keypass <password from ca.password file> -storepass changeit -noprompt
You should see this output
Certificate was added to keystore
Create the configuration file which will be used by the Kafka CLI producer. For password
, use the password you had saved to your local user.password
file
cat > /tmp/producer.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test-kafka-user" password="<password from user.password file>";
ssl.truststore.location=/tmp/cacerts
ssl.truststore.password=changeit
EOF
Send data to a topic
bin/kafka-console-producer.sh --bootstrap-server my-cluster-2-kafka-bootstrap:9092 --topic my-topic --producer.config /tmp/producer.properties
You should see the following output with prompt
If you don't see a command prompt, try pressing enter.
>
Enter messages in the prompt. These will be send to the specified Kafka topic.
Open a new terminal. Point kubectl
to Vultr Kubernetes Engine cluster by setting the KUBECONFIG
environment variable to the path where you downloaded the cluster kubeconfig
.
export KUBECONFIG=<path to VKE kubeconfig file>
Start a new Pod
name kafka-consumer
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never
You should see a shell prompt after the Pod
starts
[kafka@kafka-consumer kafka]$
From the previous terminal, copy the local certificate into the kafka-consumer
Pod
that you just started:
kubectl cp ca.p12 kafka-consumer:/tmp -n kafka
Go back to the terminal where the kafka-consumer
Pod
is running and execute the below commands to
cp $JAVA_HOME/lib/security/cacerts /tmp/cacerts
chmod 777 /tmp/cacerts
Import the server CA certificate in to the keystore. For keypass
, use the password you had saved to your local ca.password
file
keytool -importcert -alias strimzi-kafka-cert -file /tmp/ca.p12 -keystore /tmp/cacerts -keypass <password from ca.password file> -storepass changeit -noprompt
You should see this output
Certificate was added to keystore
Create the configuration file which will be used by the Kafka CLI consumer. For password
, use the password you had saved to your local user.password
file
cat > /tmp/consumer.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test-kafka-user" password="jCtF8vhh23Lu";
ssl.truststore.location=/tmp/cacerts
ssl.truststore.password=changeit
EOF
Use Kafka CLI consumer to consume data from the topic
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-2-kafka-bootstrap:9092 --topic my-topic --consumer.config /tmp/consumer.properties --from-beginning
You should receive messages you sent from the producer terminal.
From a new terminal, delete the Kafka cluster
kubectl delete -f kafka-cluster-2.yml -n kafka
Verify that the associated Pod
s were deleted. Wait for my-cluster-2-kafka-0
and my-cluster-2-zookeeper-0
Pod
s to terminate.
kubectl get pods -n kafka
After you have completed the tutorial in this article, you can delete the Vultr Kubernetes Engine cluster.
In this article, you learnt how to use Strimzi to run Kafka and its related components on Kubernetes. You installed Strizmi on Vultr Kubernetes Engine, setup a Kafka cluster, sent messages to a topic and recieved messages from that topic. Next, you secured the Kafka cluster by enforcing TLS encryption as well as SASL authentication. TLS encryption ensured that the clients could only connect via SSL and with SASL authentication, clients had to specify the username and password to interact with the cluster (send or receive data).
You can also learn more in the following documentation: