How to Run a Data Pipeline on Vultr Kubernetes Engine (VKE) Using Kafka Connect

Last Updated: Wed, Jun 28, 2023
Kubernetes

Introduction

Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate data streams with other parts of your system in a secure, reliable, and scalable manner.

To build integration solutions, you can use the Kafka Connect framework to integrate with external systems. There are two types of Kafka connectors:

  • Source connector: Used to move data from source systems to Kafka topics.

  • Sink connector: Used to send data from Kafka topics into target (sink) systems.

When integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by pulling the database table to retrieve data, while Debezium relies on change data capture.

In this article, you will learn how to run a data pipeline using Kafka Connect on a Vultr Kubernetes Engine (VKE) cluster with the open-source Strimzi project. Then, you will integrate a Vultr Managed Database for PostgreSQL with Apache Kafka by creating source and sink connectors to synchronize data from one PostgreSQL table to another.

Prerequisites

Before you begin:

  1. Deploy a Vultr Kubernetes Engine (VKE) cluster with at least 3 nodes.

  2. Create a Vultr Managed Database for PostgreSQL.

  3. Install the kubectl CLI tool to access the cluster.

  4. Install the PostgreSQL psql CLI tool on your computer.

  5. Install Docker on your computer.

Set up the PostgreSQL database

  1. Connect to your Vultr Managed Database for PostgreSQL.

    $ psql "host=mydb.vultrdb.com port=5432 dbname=defaultdb user=example-user password=database-password sslmode=require"
    

    Replace mydb.vultrdb.com with your actual Vultr host details, example-user with your actual database user, and database-password with your actual PostgreSQL database password.

    When connected, your prompt should change to:

    defaultdb=>
    
  2. Create the orders table.

    CREATE TABLE orders (
    
        order_id serial PRIMARY KEY,
    
        customer_id integer,
    
        order_date date
    
    );
    
  3. Create the orders_sink table that will act as a target for records sent to the Kafka topic.

    CREATE TABLE orders_sink (
    
        order_id serial PRIMARY KEY,
    
        customer_id integer,
    
        order_date integer
    
    );
    

Install Strimzi on Vultr Kubernetes Engine

  1. Create a new Kafka namespace.

    $ kubectl create namespace kafka
    
  2. Apply the Strimzi installation files, including ClusterRoles, ClusterRoleBindings, and Custom Resource Definitions (CRDs).

    $ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
    
  3. Verify that the Strimzi cluster operator pod is available and running.

    $ kubectl get pod -n kafka -w
    

    Your output should look like the one below:

    NAME                                        READY   STATUS    RESTARTS   AGE
    
    strimzi-cluster-operator-56d64c8584-7k6sr   1/1     Running   0          43s
    

Set up a single-node Apache Kafka cluster

  1. Create a directory to store cluster files.

    $ mkdir vultr-vke-kafka-connect
    
  2. Switch to the directory.

    $ cd vultr-vke-kafka-connect
    
  3. Using a text editor of your choice, create a new kafka-cluster.yaml manifest.

    $ nano kafka-cluster.yml
    
  4. Add the following contents to the file.

    apiVersion: kafka.strimzi.io/v1beta2
    
    kind: Kafka
    
    metadata:
    
    name: kafka-cluster
    
    spec:
    
    kafka:
    
     version: 3.3.1
    
     replicas: 1
    
     listeners:
    
       - name: plain
    
         port: 9092
    
         type: internal
    
         tls: false
    
       - name: tls
    
         port: 9093
    
         type: internal
    
         tls: true
    
     config:
    
       offsets.topic.replication.factor: 1
    
       transaction.state.log.replication.factor: 1
    
       transaction.state.log.min.isr: 1
    
       default.replication.factor: 1
    
       min.insync.replicas: 1
    
       inter.broker.protocol.version: "3.3"
    
     storage:
    
       type: ephemeral
    
    zookeeper:
    
     replicas: 1
    
     storage:
    
       type: ephemeral
    
    entityOperator:
    
     topicOperator: {}
    
     userOperator: {}
    

    Save and close the file.

  5. Apply the cluster.

    $ kubectl apply -f kafka-cluster.yml -n kafka
    
  6. Verify that the Apache Kafka cluster is created.

    $ kubectl get kafka -n kafka
    

    Output:

    NAME            DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
    
    kafka-cluster   1                        1                     True    True
    
  7. Verify that the Kafka pod is available.

    $ kubectl get pod/kafka-cluster-kafka-0 -n kafka
    

    Output:

    NAME                   READY   STATUS    RESTARTS   AGE
    
    kafka-cluster-kafka-0   1/1     Running   0         1m23s
    

Create a Docker image for Kafka Connect

  1. Create a new Dockerfile.

    $ nano Dockerfile
    
  2. Add the following contents to the file.

    FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
    
    USER root:root
    
    
    
    RUN mkdir /opt/kafka/plugins
    
    
    
    RUN curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.0.Final/debezium-connector-postgres-1.9.0.Final-plugin.tar.gz \
    
     && tar -xf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz -C plugins
    
    
    
    RUN mkdir /opt/kafka/plugins/kafka-connect-jdbc
    
    
    
    RUN curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar \
    
     && mv kafka-connect-jdbc-10.6.0.jar plugins/kafka-connect-jdbc \
    
     && cp plugins/debezium-connector-postgres/postgresql-42.3.3.jar plugins/kafka-connect-jdbc
    
    
    
    USER 1001
    

    Save and close the file.

    The above file uses quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 as the base image, creates the /opt/kafka/plugins directory, then downloads the Debezium PostgreSQL and Kafka Connect JDBC sink plugins to the directory.

  3. To publish the image, log in to Docker.

    $ sudo docker login
    

    Login to your Docker account as displayed in the following output:

    Login with your Docker ID to push and pull images from Docker Hub.
    
    Username: 
    
    Password:
    
  4. Build the image.

    $ sudo docker build -t vke-kafka-connect-demo .
    
  5. Tag the image.

    $ sudo docker tag vke-kafka-connect-demo example-user/vke-kafka-connect-demo
    

    Replace example-user with your actual Docker account username.

  6. Push the Image to Docker Hub.

    $ sudo docker push example-user/vke-kafka-connect-demo
    

    Output:

    The push refers to repository [docker.io/example-user/vke-kafka-connect-demo]
    
    5985a7b633b1: Pushed
    
    a1cac272dd78: Pushed
    
    afd5d00d6422: Pushed
    

Deploy a Kafka Connect cluster

  1. Create a new file kafka-connect.yaml.

    $ nano kafka-connect.yaml
    
  2. Add the following contents to the file.

    apiVersion: kafka.strimzi.io/v1beta2
    
    kind: KafkaConnect
    
    metadata:
    
     name: kafka-connect-cluster
    
     annotations:
    
       strimzi.io/use-connector-resources: "true"
    
    spec:
    
     image: example-user/vke-kafka-connect-demo
    
     replicas: 1
    
     bootstrapServers: kafka-cluster-kafka-bootstrap:9092
    
     config:
    
       group.id: my-kafka-connect-cluster
    
       offset.storage.topic: kafka-connect-cluster-offsets
    
       config.storage.topic: kafka-connect-cluster-configs
    
       status.storage.topic: kafka-connect-cluster-status
    
       key.converter: org.apache.kafka.connect.json.JsonConverter
    
       value.converter: org.apache.kafka.connect.json.JsonConverter
    
       key.converter.schemas.enable: true
    
       value.converter.schemas.enable: true
    
       config.storage.replication.factor: 1
    
       offset.storage.replication.factor: 1
    
       status.storage.replication.factor: 1
    
     resources:
    
       requests:
    
         cpu: "1"
    
         memory: 2Gi
    
       limits:
    
         cpu: "2"
    
         memory: 2Gi
    
     jvmOptions:
    
       "-Xmx": "1g"
    
       "-Xms": "1g"
    

    Replace example-user/vke-kafka-connect-demo with your actual Docker image path.

    Save and close the file.

  3. Create the cluster.

    $ kubectl apply -f kafka-connect.yaml -n kafka
    
  4. Verify that the Kafka Connect cluster is available and running.

    $ kubectl get pod -l=strimzi.io/cluster=kafka-connect-cluster -n kafka
    

    Your output should look like the one below.

    NAME                                            READY   STATUS    RESTARTS   AGE
    
    kafka-connect-cluster-connect-f9849ccdb-5lnjc   1/1     Running   0          60s
    

Create the Kafka source connector

  1. Create a new file kafka-connector-source.yaml.

    $ nano kafka-connector-source.yaml
    
  2. Add the following contents to the file.

    apiVersion: kafka.strimzi.io/v1beta2
    
    kind: KafkaConnector
    
    metadata:
    
     name: debezium-postgres-source-connector
    
     labels:
    
       strimzi.io/cluster: kafka-connect-cluster
    
    spec:
    
     class: io.debezium.connector.postgresql.PostgresConnector
    
     tasksMax: 2
    
     config:
    
       database.hostname: mydb.vultrdb.com
    
       database.port: 16751
    
       database.user: example-admin
    
       database.password: "database-password"
    
       database.dbname: defaultdb
    
       database.server.name: myserver
    
       plugin.name: wal2json
    
       table.include.list: public.orders
    
       value.converter: org.apache.kafka.connect.json.JsonConverter
    
       database.sslmode: require
    
       publication.autocreate.mode: filtered
    

    Replace the connector configuration properties as follows.

    • database.hostname: - Your Vultr Managed PostgreSQL database host.

    • database.port: - The Postgres database port.

    • database.user: - Valid database user.

    • database.password: - Your Vultr Managed Database for PostgreSQL password.

    • database.dbname: - The database name to stream the Kafka changes from. As created earlier,public.orders. Public represents the database schema name.

    • database.sslmode: - Encrypted the connection to your database. The value require enforces a secure connection to your Vultr Managed Database for PostgreSQL.

    • table.include.list: - The table name to capture changes.

  3. Apply the connector to the Kafka namespace.

    $ kubectl apply -f kafka-connector-source.yaml -n kafka
    
  4. Verify that the connector is added.

    $ kubectl get kafkaconnector -n kafka
    

    Output:

    NAME                                 CLUSTER                 CONNECTOR CLASS                                      MAX TASKS
    
    debezium-postgres-source-connector   kafka-connect-cluster   io.debezium.connector.postgresql.PostgresConnector   2
    

Verify the Source Connector functionality

  1. In a new terminal session, access your PostgreSQL database, and insert data to the orders table as below.

    INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 300);
    
  2. In your Kubernetes session, start a Kafka consumer and confirm data in the Kafka topic as below.

    $ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
    

    You should see records in the Kafka topic in the form of a JSON payload. The change event JSON is big because it contains information about the changed record, including its schema as below.

    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"}}
    

    Every event generated by the connector has the following components (repeated twice) - schema and payload.

    • The first schema field corresponds to the event key and describes the structure of the table's primary key.

    • The first payload field is part of the event key and it contains the key for the row that was changed.

    • The second schema field corresponds to the event value and describes the structure of the row that was changed.

    • The second payload field is part of the event value. It has the structure described by the previous schema field and contains the actual data for the row that was changed.

  3. Insert more data and verify the capture events in the Kafka topic.

    INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
    

Create the sink connector

  1. Create a new file kafka-connector-sink.yaml.

    $ nano kafka-connector-sink.yaml
    
  2. Add the following contents to the file.

    apiVersion: kafka.strimzi.io/v1beta2
    
    kind: KafkaConnector
    
    metadata:
    
     name: jdbc-sink-connector
    
     labels:
    
       strimzi.io/cluster: kafka-connect-cluster
    
    spec:
    
     class: io.confluent.connect.jdbc.JdbcSinkConnector
    
     tasksMax: 2
    
     config:
    
       topics: myserver.public.orders
    
       connection.url: "jdbc:postgresql:mydb.vultrdb.com:16751/defaultdb?user=example-user&password=database-password"
    
       dialect.name: PostgreSqlDatabaseDialect
    
       transforms: unwrap
    
       transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    
       transforms.unwrap.drop.tombstones: false
    
       auto.create: false
    
       insert.mode: upsert
    
       delete.enabled: true
    
       pk.fields: order_id
    
       pk.mode: record_key
    
       table.name.format: public.orders_sink
    

    Replace the update the connection.url to match your actual Vultr Managed Database for PostgreSQL details.

  3. Apply the connector.

    $ kubectl apply -f kafka-connector-sink.yaml -n kafka
    
  4. Verify that the connector is created.

    $ kubectl get kafkaconnector -n kafka
    

    Output:

    NAME                                 CLUSTER                 CONNECTOR CLASS                                      MAX TASKS
    
    debezium-postgres-source-connector   kafka-connect-cluster   io.debezium.connector.postgresql.PostgresConnector   2
    
    jdbc-sink-connector                  kafka-connect-cluster   io.confluent.connect.jdbc.JdbcSinkConnector          2
    

Verify Kafka Integration with the Database

  1. In your database session, insert more data to the orders table as below.

    INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 400);
    
    INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 500);
    
    INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
    
  2. Verify that you can view all JSON event changes.

    $ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
    
  3. View the orders_sink table and verify that the new data is persisted.

    select * from public.orders_sink
    

    Your output should look like the one below.

     order_id | customer_id | order_date 
    
    ----------+-------------+------------
    
            1 |         300 |      19511
    
            2 |          42 |      19511
    
            3 |         400 |      19511
    
            4 |         500 |      19511
    

    To synchronize more data in the orders_sink table, continue adding records.

Troubleshooting

  1. Unable to View Kafka Connections to the PostgreSQL Database.

    • Verify that you added the correct PostgreSQL database connection details in your files such as kafka-connector-source.yaml. Check that the correct port number is used, and the host details match your Vultr Database hostname.

    • View the Kafka Connect logs to investigate more on the error.

    Fetch logs.

    $ kubectl logs -f $(kubectl get pod -n kafka -l=strimzi.io/cluster=kafka-connect-cluster -o jsonpath='{.items[0].metadata.name}') -n kafka
    
  2. The Source connector returns a myserver.orders=LEADER_NOT_AVAILABLE warning.

    • Verify that your database connection details are correct and match your Vultr Managed Database for PostgreSQL credentials.

    • Verify that all cluster pods are available and running.

    Fetch list of pods.

    $ kubectl get pods -n kafka
    

    If your output looks like the one below:

        NAME                                                READY   STATUS    RESTARTS   AGE
    
        kafka-cluster-entity-operator-75d9dcc9c4-4lzch      3/3     Running   0          92m
    
        kafka-cluster-kafka-0                               1/1     Running   0          85m
    
        kafka-cluster-zookeeper-0                           1/1     Running   0          93m
    
        kafka-connect-cluster-connect-68586f8c97-zcdwz      0/1     Pending   0          58m
    
        strimzi-cluster-operator-64d7d46fc-dbh26            1/1     Running   0          98m
    

    Describe and Check the Pending pod logs and verify that your Kubernetes Cluster has enough resources to run the cluster.

    $ kubectl describe pods kafka-connect-cluster-connect-68586f8c97-zcdwz -n kafka
    

    Check the operator logs.

    $ kubectl logs deployment/strimzi-cluster-operator -n kafka -f
    

Conclusion

You have set up a data pipeline on a Vultr Kubernetes Engine (VKE) cluster by integrating Apache Kafka with a Vultr Managed Database for PostgreSQL. To achieve this, you configured the source (Debezium), and sink connectors (JBDC Sink) to synchronize data from one PostgreSQL table to another.

Next Steps

Want to contribute?

You could earn up to $600 by adding new articles.