Change Data Capture (CDC) with Debezium and Kafka
What is Change Data Capture (CDC) ?
- Change Data Capture is a methodology to capture and track the data change on a particular data source
- The captured data change can be used for multiple purposes like Real Time Analytics, Synchronizing data across geographically distributed systems, Fraud Prevention etc.,
What is Debezium ?
- Debezium is an open-source distributed platform for Change Data Capture
- It is simple and easy to setup and can be used to record data change operations
Prerequisites
- Docker and Docker-Compose on Ubuntu installed and ready to use
- Postman to perform API requests
Overview of Kafka Architecture (Refer Deploy Kafka and Zookeeper on Docker for better understanding)
Steps to Setup the Stack
The following are the components we will be using:
- Zookeeper: It is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. We will be using two instances of Zookeeper in this case
- Kafka: It is a distributed system consisting of servers and clients which can be deployed on VM’s, Containers or Cloud Environments. We will be using three instances of Kafka. Kafka and Zookeeper can be scaled according to the requirements. For more detailed insights into Kafka and Zookeeper refer Deploy Kafka and Zookeeper on Docker
- MySQL: We will be using MySQL as the data source in our case. This can be any data source which provides integration support with debezium
- Kafka Connect: It is used to Integrate Kafka with different types of data sources and ensure sync between streams and data produced. In our case we will be needing this in order to use the debezium connector for Kafka
- Kafdrop: It is a simple web UI for viewing Kafka topics and browsing consumer groups
1. Create a docker-compose.yml file with the below configuration
# Open the docker-compose file with an editor of your choice and add in the below contents
nano docker-compose.yaml
# Paste the following into the docker-compose file
version: '3'
services:
# Zookeeper instance 1
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
zookeeper_instance_1:
container_name: zookeeper_instance_1
image: quay.io/debezium/zookeeper
ports:
- 12181:2181
- 12888:2888
- 13888:3888
# Zookeeper instance 2
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
zookeeper_instance_2:
container_name: zookeeper_instance_2
image: quay.io/debezium/zookeeper
ports:
- 12182:2181
- 12889:2888
- 13889:3888
# Kafka instance 1
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
kafka_instance_1:
container_name: kafka_instance_1
image: quay.io/debezium/kafka
environment:
ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
BROKER_ID: 1
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_1:9091,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19091
KAFKA_LISTENERS: INTERNAL://kafka_instance_1:9091,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
depends_on:
- zookeeper_instance_1
- zookeeper_instance_2
ports:
- 9091:9091
# Kafka instance 2
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
kafka_instance_2:
container_name: kafka_instance_2
image: quay.io/debezium/kafka
environment:
ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
BROKER_ID: 2
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_2:9092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19092
KAFKA_LISTENERS: INTERNAL://kafka_instance_2:9092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
depends_on:
- zookeeper_instance_1
- zookeeper_instance_2
ports:
- 9092:9092
# Kafka instance 3
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
kafka_instance_3:
container_name: kafka_instance_3
image: quay.io/debezium/kafka
environment:
ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
BROKER_ID: 3
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_3:9093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19093
KAFKA_LISTENERS: INTERNAL://kafka_instance_3:9093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
depends_on:
- zookeeper_instance_1
- zookeeper_instance_2
ports:
- 9093:9093
# MySQL Instance
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- MYSQL_PASSWORD: The password for the mysql instance
# -- MYSQL_ROOT_PASSWORD: The root password for the mysql instance
# -- MYSQL_USER: The user for the mysql instance
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
mysql:
container_name: mysql
environment:
MYSQL_PASSWORD: mysqlpw
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
image: quay.io/debezium/example-mysql
ports:
- 3307:3306
# Kafka Connect
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- OFFSET_STORAGE_TOPIC: Custom Topic Name for the Offset Storage Topic
# -- STATUS_STORAGE_TOPIC: Custom Topic Name for the Status Storage Topic
# -- CONFIG_STORAGE_TOPIC: Custom Topic Name for the Config Storage Topic
# -- GROUP_ID: Group ID for the instance
# -- BOOTSTRAP_SERVERS: The link to Kafka Deployments (instance 1, 2 and 3 in our case) in the format <container_name>:<port>
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
connect:
container_name: connect
environment:
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
CONFIG_STORAGE_TOPIC: my_connect_configs
GROUP_ID: '1'
BOOTSTRAP_SERVERS: kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093
image: quay.io/debezium/connect
depends_on:
- zookeeper_instance_1
- zookeeper_instance_2
- kafka_instance_1
- kafka_instance_2
- kafka_instance_3
- mysql
ports:
- 8083:8083
# Kafdrop
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- KAFKA_BROKERCONNECT: The link to Kafka Deployments (instance 1, 2 and 3 in our case) in the format <container_name>:<port>
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
kafdrop:
container_name: kafdrop
environment:
KAFKA_BROKERCONNECT: kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093
image: obsidiandynamics/kafdrop
ports:
- 9000:9000
2. Start the docker-compose services using the below command
# -d will run the containers in detached state
docker-compose up -d
3. View the Kafdrop deployed on http://<IP_ADDRESS>:<PORT>, in our case it is http://localhost:9000, All the deployed kafka instances should be visible on the UI
4. Login to the mysql container to view the sample database provided in the container by default
docker exec -it mysql /bin/sh
# Login to mysql (Use the root password from the environment variable provided in the docker-compose file)
mysql -uroot -p
# Access the sample database
use inventory;
# View the data in customers table
select * from customers;
5. Now let us activate the debezium connector through kafka connect
# We will be able to hit the API from curl or postman
# The curl command is as shown below
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "1",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
# For the sake of simplicity we will be using postman as shown below
6. To confirm the connector was added, navigate to http://<IP_ADDRESS>:<PORT>/connectors, in this case http://localhost:8083/connectors. A connector named inventory-connector should be seen
7. Now, in the Kafdrop UI we should be able to see topics created by debezium, (dbserver1 in this case)
8. Navigate to the “dbserver1.inventory.customers” topic. This topic is used to track the customers table from the inventory db
9. Click on the “View Messages” Button to view the messages in the topic.
10. Click on the “View Messages” icon on the above screen. This should display four entries with id 1001, 1002, 1003, 1004 which correspond to the id’s in the customers table as seen before
Steps to test Debezium
1. Login to mysql as shown before and update a record
docker exec -it mysql /bin/sh
# Login to mysql (Use the root password from the environment variable provided in the docker-compose file)
mysql -uroot -p
# Access the sample database
use inventory;
# View the data in customers table
select * from customers;
# Update a record in the table
update customers set first_name='easycode' where id=1001;
# View the data in customers table
select * from customers;
2. Refresh the Kafdrop Webpage and we should now be able to see a new entry for the id 1001
3. As seen above, we have successfully captured the data change through debezium and added it to our message queue. This data can further be used for many purposes like data replication, real time analytics etc.,
On inspection of the payload of the record, we can see the update has been successfully captured by debezium
Congratulations!! You have now successfully setup Debezium with Kafka to monitor and record DB operations on MySQL
For monitoring multiple tables, should we create multiple topics ? Is it possible to monitor an particular column change in a table?
Also are there any analytical tools for correlating data change events received across topics?
The topics corresponding to the tables in the db are automatically created by debezium. It is possible to monitor only a particular table or column which can be configured through the debezium connector properties. The properties respectively can be found in this page
https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html#advanced-parameters
As of now, i haven’t explored much on any tool that can correlate the data change events but will share it if i come accross any.
Thank you