Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate data streams with other parts of your system in a secure, reliable, and scalable manner.
To build integration solutions, you can use the Kafka Connect framework to integrate with external systems. There are two types of Kafka connectors:
Source connector: Used to move data from source systems to Kafka topics.
Sink connector: Used to send data from Kafka topics into target (sink) systems.
When integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by pulling the database table to retrieve data, while Debezium relies on change data capture.
In this article, you will learn how to run a data pipeline using Kafka Connect on a Vultr Kubernetes Engine (VKE) cluster with the open-source Strimzi project. Then, you will integrate a Vultr Managed Database for PostgreSQL with Apache Kafka by creating source and sink connectors to synchronize data from one PostgreSQL table to another.
Before you begin:
Deploy a Vultr Kubernetes Engine (VKE) cluster with at least 3 nodes.
Create a Vultr Managed Database for PostgreSQL.
Install the kubectl CLI tool to access the cluster.
Install the PostgreSQL psql
CLI tool on your computer.
Install Docker on your computer.
Connect to your Vultr Managed Database for PostgreSQL.
$ psql "host=mydb.vultrdb.com port=5432 dbname=defaultdb user=example-user password=database-password sslmode=require"
Replace mydb.vultrdb.com
with your actual Vultr host details, example-user
with your actual database user, and database-password
with your actual PostgreSQL database password.
When connected, your prompt should change to:
defaultdb=>
Create the orders
table.
CREATE TABLE orders (
order_id serial PRIMARY KEY,
customer_id integer,
order_date date
);
Create the orders_sink
table that will act as a target for records sent to the Kafka topic.
CREATE TABLE orders_sink (
order_id serial PRIMARY KEY,
customer_id integer,
order_date integer
);
Create a new Kafka namespace.
$ kubectl create namespace kafka
Apply the Strimzi installation files, including ClusterRoles
, ClusterRoleBindings
, and Custom Resource Definitions (CRDs).
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Verify that the Strimzi cluster operator pod is available and running.
$ kubectl get pod -n kafka -w
Your output should look like the one below:
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-56d64c8584-7k6sr 1/1 Running 0 43s
Create a directory to store cluster files.
$ mkdir vultr-vke-kafka-connect
Switch to the directory.
$ cd vultr-vke-kafka-connect
Using a text editor of your choice, create a new kafka-cluster.yaml
manifest.
$ nano kafka-cluster.yml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
spec:
kafka:
version: 3.3.1
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.3"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Save and close the file.
Apply the cluster.
$ kubectl apply -f kafka-cluster.yml -n kafka
Verify that the Apache Kafka cluster is created.
$ kubectl get kafka -n kafka
Output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
kafka-cluster 1 1 True True
Verify that the Kafka pod is available.
$ kubectl get pod/kafka-cluster-kafka-0 -n kafka
Output:
NAME READY STATUS RESTARTS AGE
kafka-cluster-kafka-0 1/1 Running 0 1m23s
Create a new Dockerfile.
$ nano Dockerfile
Add the following contents to the file.
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir /opt/kafka/plugins
RUN curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.0.Final/debezium-connector-postgres-1.9.0.Final-plugin.tar.gz \
&& tar -xf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz -C plugins
RUN mkdir /opt/kafka/plugins/kafka-connect-jdbc
RUN curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar \
&& mv kafka-connect-jdbc-10.6.0.jar plugins/kafka-connect-jdbc \
&& cp plugins/debezium-connector-postgres/postgresql-42.3.3.jar plugins/kafka-connect-jdbc
USER 1001
Save and close the file.
The above file uses quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
as the base image, creates the /opt/kafka/plugins
directory, then downloads the Debezium PostgreSQL and Kafka Connect JDBC sink plugins to the directory.
To publish the image, log in to Docker.
$ sudo docker login
Login to your Docker account as displayed in the following output:
Login with your Docker ID to push and pull images from Docker Hub.
Username:
Password:
Build the image.
$ sudo docker build -t vke-kafka-connect-demo .
Tag the image.
$ sudo docker tag vke-kafka-connect-demo example-user/vke-kafka-connect-demo
Replace example-user
with your actual Docker account username.
Push the Image to Docker Hub.
$ sudo docker push example-user/vke-kafka-connect-demo
Output:
The push refers to repository [docker.io/example-user/vke-kafka-connect-demo]
5985a7b633b1: Pushed
a1cac272dd78: Pushed
afd5d00d6422: Pushed
Create a new file kafka-connect.yaml
.
$ nano kafka-connect.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: example-user/vke-kafka-connect-demo
replicas: 1
bootstrapServers: kafka-cluster-kafka-bootstrap:9092
config:
group.id: my-kafka-connect-cluster
offset.storage.topic: kafka-connect-cluster-offsets
config.storage.topic: kafka-connect-cluster-configs
status.storage.topic: kafka-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi
jvmOptions:
"-Xmx": "1g"
"-Xms": "1g"
Replace example-user/vke-kafka-connect-demo
with your actual Docker image path.
Save and close the file.
Create the cluster.
$ kubectl apply -f kafka-connect.yaml -n kafka
Verify that the Kafka Connect cluster is available and running.
$ kubectl get pod -l=strimzi.io/cluster=kafka-connect-cluster -n kafka
Your output should look like the one below.
NAME READY STATUS RESTARTS AGE
kafka-connect-cluster-connect-f9849ccdb-5lnjc 1/1 Running 0 60s
Create a new file kafka-connector-source.yaml
.
$ nano kafka-connector-source.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-postgres-source-connector
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 2
config:
database.hostname: mydb.vultrdb.com
database.port: 16751
database.user: example-admin
database.password: "database-password"
database.dbname: defaultdb
database.server.name: myserver
plugin.name: wal2json
table.include.list: public.orders
value.converter: org.apache.kafka.connect.json.JsonConverter
database.sslmode: require
publication.autocreate.mode: filtered
Replace the connector configuration properties as follows.
database.hostname:
- Your Vultr Managed PostgreSQL database host.
database.port:
- The Postgres database port.
database.user:
- Valid database user.
database.password:
- Your Vultr Managed Database for PostgreSQL password.
database.dbname:
- The database name to stream the Kafka changes from. As created earlier,public.orders
. Public represents the database schema name.
database.sslmode:
- Encrypted the connection to your database. The value require
enforces a secure connection to your Vultr Managed Database for PostgreSQL.
table.include.list:
- The table name to capture changes.
Apply the connector to the Kafka namespace.
$ kubectl apply -f kafka-connector-source.yaml -n kafka
Verify that the connector is added.
$ kubectl get kafkaconnector -n kafka
Output:
NAME CLUSTER CONNECTOR CLASS MAX TASKS
debezium-postgres-source-connector kafka-connect-cluster io.debezium.connector.postgresql.PostgresConnector 2
In a new terminal session, access your PostgreSQL database, and insert data to the orders table as below.
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 300);
In your Kubernetes session, start a Kafka consumer and confirm data in the Kafka topic as below.
$ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
You should see records in the Kafka topic in the form of a JSON
payload. The change event JSON
is big because it contains information about the changed record, including its schema as below.
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"}}
Every event generated by the connector has the following components (repeated twice) - schema
and payload
.
The first schema
field corresponds to the event key and describes the structure of the table's primary key.
The first payload
field is part of the event key and it contains the key for the row that was changed.
The second schema
field corresponds to the event value and describes the structure of the row that was changed.
The second payload
field is part of the event value. It has the structure described by the previous schema
field and contains the actual data for the row that was changed.
Insert more data and verify the capture events in the Kafka topic.
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
Create a new file kafka-connector-sink.yaml
.
$ nano kafka-connector-sink.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: jdbc-sink-connector
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.confluent.connect.jdbc.JdbcSinkConnector
tasksMax: 2
config:
topics: myserver.public.orders
connection.url: "jdbc:postgresql:mydb.vultrdb.com:16751/defaultdb?user=example-user&password=database-password"
dialect.name: PostgreSqlDatabaseDialect
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: false
insert.mode: upsert
delete.enabled: true
pk.fields: order_id
pk.mode: record_key
table.name.format: public.orders_sink
Replace the update the connection.url
to match your actual Vultr Managed Database for PostgreSQL details.
Apply the connector.
$ kubectl apply -f kafka-connector-sink.yaml -n kafka
Verify that the connector is created.
$ kubectl get kafkaconnector -n kafka
Output:
NAME CLUSTER CONNECTOR CLASS MAX TASKS
debezium-postgres-source-connector kafka-connect-cluster io.debezium.connector.postgresql.PostgresConnector 2
jdbc-sink-connector kafka-connect-cluster io.confluent.connect.jdbc.JdbcSinkConnector 2
In your database session, insert more data to the orders
table as below.
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 400);
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 500);
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
Verify that you can view all JSON event changes.
$ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
View the orders_sink
table and verify that the new data is persisted.
select * from public.orders_sink
Your output should look like the one below.
order_id | customer_id | order_date
----------+-------------+------------
1 | 300 | 19511
2 | 42 | 19511
3 | 400 | 19511
4 | 500 | 19511
To synchronize more data in the orders_sink
table, continue adding records.
Unable to View Kafka Connections to the PostgreSQL Database.
Verify that you added the correct PostgreSQL database connection details in your files such as kafka-connector-source.yaml
. Check that the correct port number is used, and the host details match your Vultr Database hostname.
View the Kafka Connect logs to investigate more on the error.
Fetch logs.
$ kubectl logs -f $(kubectl get pod -n kafka -l=strimzi.io/cluster=kafka-connect-cluster -o jsonpath='{.items[0].metadata.name}') -n kafka
The Source connector returns a myserver.orders=LEADER_NOT_AVAILABLE
warning.
Verify that your database connection details are correct and match your Vultr Managed Database for PostgreSQL credentials.
Verify that all cluster pods are available and running.
Fetch list of pods.
$ kubectl get pods -n kafka
If your output looks like the one below:
NAME READY STATUS RESTARTS AGE
kafka-cluster-entity-operator-75d9dcc9c4-4lzch 3/3 Running 0 92m
kafka-cluster-kafka-0 1/1 Running 0 85m
kafka-cluster-zookeeper-0 1/1 Running 0 93m
kafka-connect-cluster-connect-68586f8c97-zcdwz 0/1 Pending 0 58m
strimzi-cluster-operator-64d7d46fc-dbh26 1/1 Running 0 98m
Describe and Check the Pending
pod logs and verify that your Kubernetes Cluster has enough resources to run the cluster.
$ kubectl describe pods kafka-connect-cluster-connect-68586f8c97-zcdwz -n kafka
Check the operator logs.
$ kubectl logs deployment/strimzi-cluster-operator -n kafka -f
You have set up a data pipeline on a Vultr Kubernetes Engine (VKE) cluster by integrating Apache Kafka with a Vultr Managed Database for PostgreSQL. To achieve this, you configured the source (Debezium), and sink connectors (JBDC Sink) to synchronize data from one PostgreSQL table to another.