Scale Apache Kafka Applications on Vultr Kubernetes Engine with KEDA

Last Updated: Sun, Aug 27, 2023


Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.

Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.

This guide explains how to use the KEDA scaler for Kafka on a Vultr Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.


Before you begin:

Install the KEDA Operator

  1. Create a directory

    $ mkdir kafka-keda-vultr
  2. Switch to the directory

    $ cd kafka-keda-vultr
  3. Set the KUBECONFIG environment variable with the path to your VKE YAML file to grant kubectl access to the cluster

    $ export KUBECONFIG=/path/to/vke/YAML

    The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of localhost

  4. Deploy KEDA using its deployment YAML file

    $ kubectl apply --server-side -f
  5. Verify the KEDA deployment status

    $ kubectl get deployment -n keda

    Your output should look like the one below:

    NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
    keda-metrics-apiserver   1/1     1            1           57s
    keda-operator            1/1     1            1           57s

    Verify that the KEDA deployment status is READY

Set Up a Single Node Kafka cluster using the Strimzi Operator

  1. Install the Strimzi operator

    $ kubectl create -f ''
  2. Using a text editor such as Nano, create a new file kafka-cluster.yaml

    $ nano kafka-cluster.yaml
  3. Add the following contents to the file

    kind: Kafka
      name: my-cluster
        version: 3.4.0
        replicas: 1
          - name: plain
            port: 9092
            type: internal
            tls: false
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          type: ephemeral
        replicas: 1
          type: ephemeral
        topicOperator: {}
        userOperator: {}

    Save and close the file

  4. Deploy the Kafka cluster

    $ kubectl apply -f kafka-cluster.yaml
  5. Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete

    $ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s

    When complete, the following output should display: condition met
  6. When the cluster creation is complete, create a topic

    $ kubectl run kafka-topics -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1

    Your output should look like the one below:

    Created topic test-topic.
    pod "kafka-topics" deleted

Prepare the Kafka Consumer Application

  1. Create a new Go module

    $ go mod init kafka-consumer
  2. Create a new file main.go

    $ nano main.go
  3. Add the following contents to the file

    package main
    import (
    const consumerGroupName = "my-group"
    var kafkaBroker string
    var topic string
    var client *kgo.Client
    func init() {
        kafkaBroker = os.Getenv("KAFKA_BROKER")
        if kafkaBroker == "" {
            log.Fatal("missing env var KAFKA_BROKER")
        topic = os.Getenv("KAFKA_TOPIC")
        if topic == "" {
            log.Fatal("missing env var KAFKA_TOPIC")
        fmt.Println("KAFKA_BROKER", kafkaBroker)
        fmt.Println("KAFKA_TOPIC", topic)
        opts := []kgo.Opt{
            kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...),
        var err error
        client, err = kgo.NewClient(opts...)
        if err != nil {
    func main() {
        go func() {
            fmt.Println("starting kafka consumer goroutine")
            for {
                err := client.Ping(context.Background())
                if err != nil {
                    log.Fatal("ping failed - ", err)
                fmt.Println("fetching records....")
                fetches := client.PollRecords(context.Background(), 0)
                if fetches.IsClientClosed() {
                    fmt.Println("kgo kafka client closed")
                fetches.EachError(func(t string, p int32, err error) {
                    fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err)
                fetches.EachRecord(func(r *kgo.Record) {
                    fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value))
                    time.Sleep(3 * time.Second)
                    err = client.CommitRecords(context.Background(), r)
                    if err != nil {
                        fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition)
                    } else {
                        fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition)
        end := make(chan os.Signal, 1)
        signal.Notify(end, syscall.SIGINT, syscall.SIGTERM)
        fmt.Println("kafka consumer exit")
    func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) {
        fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic])
    func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) {
        fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic])
    func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) {
        fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic])

    Save and close the file.

    Below are what the above application parts do:

    • The init function:

      • Fetches the Kafka broker and topic information from the environment variables

      • If these environment variables aren't provided, the program exits with an error code

      • Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost

    • The main function starts a goroutine to poll and process Kafka records, then:

      • The goroutine polls for Kafka records and logs any fetch errors

      • Processes each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka

      • Listens for SIGINT or SIGTERM to ensure that the application can be gracefully shut down using CTRL + C, or when receiving a termination signal.

      • When the program detects a termination signal, the Kafka client closes, and the program exits

Deploy the Application to your VKE Cluster

  1. Create a new Dockerfile to store Docker variables

    $ nano Dockerfile
  2. Add the following contents to the file

    FROM golang:1.19-buster AS build
    WORKDIR /app
    COPY go.mod ./
    COPY go.sum ./
    RUN go mod download
    COPY main.go ./
    RUN go build -o /kafka-go-app
    COPY --from=build /kafka-go-app /kafka-go-app
    USER nonroot:nonroot
    ENTRYPOINT ["/kafka-go-app"]

    Save and close the file

    The above Dockerfile builds the Kafka consumer Go application using a multi-stage build

  3. Fetch the program Go module dependencies

    $ go mod tidy
  4. Log in to Docker using your active Docker Hub account

    $ sudo docker login
  5. Build the Docker image. Replace example-user with your actual Docker Hub ID

    $ sudo docker build -t example-user/myapp .
  6. Push the image to Docker hub

    $ sudo docker push example-user/myapp

    Verify that the command is successful and a new myapp repository is available on your DockerHub profile

  7. Create a new file consumer.yaml

    $ nano consumer.yaml
  8. Add the following contents to the file. Replace example-user/myapp with your actual Docker repository

    apiVersion: apps/v1
    kind: Deployment
      name: kafka-consumer-app
        app: kafka-consumer-app
      replicas: 1
          app: kafka-consumer-app
            app: kafka-consumer-app
          - name: kafka-consumer-app-container
            image: example-user/myapp
            imagePullPolicy: Always
              - name: KAFKA_BROKER
                value: my-cluster-kafka-bootstrap:9092
              - name: KAFKA_TOPIC
                value: test-topic

    Save and close the file

  9. Deploy the application to your cluster

    $ kubectl apply -f consumer.yaml
  10. Verify the application deployment status

    $ kubectl get pods -l=app=kafka-consumer-app

    Your output should look like the one below:

    NAME                                 READY   STATUS    RESTARTS   AGE
    kafka-consumer-app-c4b67d694-mptlw   1/1     Running   0          2m12s

    Verify that the Pod status changes to Running

  11. View the application logs

    $ kubectl logs -f -l=app=kafka-consumer-app


    KAFKA_BROKER my-cluster-kafka-bootstrap:9092
    KAFKA_TOPIC test-topic
    starting kafka consumer goroutine
    fetching records....
    partitions ASSIGNED for topic test-topic [0 1 2 3 4]

Enable Autoscaling

  1. Create a new file scaled-object.yaml

    $ nano scaled-object.yaml
  2. Add the following contents to the file

    kind: ScaledObject
      name: kafka-scaledobject
        name: kafka-consumer-app
      minReplicaCount: 1
      maxReplicaCount: 5
      pollingInterval: 30
      - type: kafka
          bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092
          consumerGroup: my-group
          topic: test-topic
          lagThreshold: "5"
          offsetResetPolicy: latest

    Save and close the file

  3. Deploy the KEDA scaled object

    $ kubectl apply -f scaled-object.yaml

Verify the Consumer Application Autoscaling

  1. Monitor the number of consumer application pods

    $ kubectl get pods -l=app=kafka-consumer-app -w
  2. In a new terminal window, export the KUBECONFIG variable to activate Kubectl in the session

    $ export KUBECONFIG=/path/to/vke/YAML
  3. Run the following command to send data to the Kafka topic

    $ kubectl run kafka-producer -ti --rm=true --restart=Never -- bin/ --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092

    The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the test-topic at a rate of 200 records per second.

    When successful, your output should look like the one below:

    producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000
    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic}   : 15.471
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic}  : 500.000
    pod "kafka-producer" deleted
  4. Wait for one minute, and verify the Kafka consumer application Deployment status

    $ kubectl get deployment/kafka-consumer-app

    Your output should look like the one below:

    NAME                 READY   UP-TO-DATE   AVAILABLE   AGE
    kafka-consumer-app   5/5     5            5           2m48s

    Verify that the deployment has scaled up to five pods

  5. Navigate back to the monitoring terminal, and verify that your output looks like the one below:

    kafka-consumer-app-6bc79dd94f-jb867   0/1     ContainerCreating   0          0s
    kafka-consumer-app-6bc79dd94f-59bg4   1/1     Running             0          3s
    kafka-consumer-app-6bc79dd94f-f5ll4   1/1     Running             0          5s
    kafka-consumer-app-6bc79dd94f-jb867   1/1     Running             0          7s
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     ContainerCreating   0          0s
    kafka-consumer-app-6bc79dd94f-wzrkp   1/1     Running             0          3s

    Press CTRL + C to exit the watch session

KEDA Autoscaling Results

Auto-scaling happens due to the KEDA ScaledObject you created earlier. KEDA monitors the test-topic Kafka topic for the consumer message lag, and when the unread message count exceeds 5, KEDA scales out the kafka-consumer-app deployment with a maximum limit of 5 replicas. KEDA checks the Kafka metrics every 30 seconds to make these scaling decisions.

KEDA also scales down to 1 replica when the consumer applications complete processing the messages and the consumer lag decreases.


In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a Vultr Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.

Next Steps

To implement more solutions on your VKE cluster, visit the following resources:

Want to contribute?

You could earn up to $600 by adding new articles.