Change Data Capture (CDC) with Debezium and Kafka

Change Data Capture (CDC) with Debezium and Kafka

What is Change Data Capture (CDC) ?

  • Change Data Capture is a methodology to capture and track the data change on a particular data source
  • The captured data change can be used for multiple purposes like Real Time Analytics, Synchronizing data across geographically distributed systems, Fraud Prevention etc.,

What is Debezium ?

  • Debezium is an open-source distributed platform for Change Data Capture
  • It is simple and easy to setup and can be used to record data change operations

Prerequisites

Overview of Kafka Architecture (Refer Deploy Kafka and Zookeeper on Docker for better understanding)

Kafka

Steps to Setup the Stack

The following are the components we will be using:

  • Zookeeper: It is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. We will be using two instances of Zookeeper in this case
  • Kafka: It is a distributed system consisting of servers and clients which can be deployed on VM’s, Containers or Cloud Environments. We will be using three instances of Kafka. Kafka and Zookeeper can be scaled according to the requirements. For more detailed insights into Kafka and Zookeeper refer Deploy Kafka and Zookeeper on Docker
  • MySQL: We will be using MySQL as the data source in our case. This can be any data source which provides integration support with debezium
  • Kafka Connect: It is used to Integrate Kafka with different types of data sources and ensure sync between streams and data produced. In our case we will be needing this in order to use the debezium connector for Kafka
  • Kafdrop: It is a simple web UI for viewing Kafka topics and browsing consumer groups

1. Create a docker-compose.yml file with the below configuration

# Open the docker-compose file with an editor of your choice and add in the below contents
nano docker-compose.yaml

# Paste the following into the docker-compose file
version: '3'
services:
# Zookeeper instance 1 
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    zookeeper_instance_1:
        container_name: zookeeper_instance_1
        image: quay.io/debezium/zookeeper
        ports:
            - 12181:2181
            - 12888:2888
            - 13888:3888
# Zookeeper instance 2
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    zookeeper_instance_2:
        container_name: zookeeper_instance_2
        image: quay.io/debezium/zookeeper
        ports:
            - 12182:2181
            - 12889:2888
            - 13889:3888
# Kafka instance 1
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    kafka_instance_1:
        container_name: kafka_instance_1
        image: quay.io/debezium/kafka
        environment:
            ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
            BROKER_ID: 1
            KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_1:9091,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19091
            KAFKA_LISTENERS: INTERNAL://kafka_instance_1:9091,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19091
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
        depends_on:
            - zookeeper_instance_1
            - zookeeper_instance_2
        ports:
            - 9091:9091
# Kafka instance 2
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    kafka_instance_2:
        container_name: kafka_instance_2
        image: quay.io/debezium/kafka
        environment:
            ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
            BROKER_ID: 2
            KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_2:9092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19092
            KAFKA_LISTENERS: INTERNAL://kafka_instance_2:9092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
        depends_on:
            - zookeeper_instance_1
            - zookeeper_instance_2
        ports:
            - 9092:9092
# Kafka instance 3
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- ZOOKEEPER_CONNECT: The link to Zookeeper Deployments (instance 1 and 2 in our case) in the format <container_name>:<port>
# -- BROKER_ID: ID for the Kafka Instance
# -- KAFKA_ADVERTISED_LISTENERS: Advertised INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENERS: INTERNAL AND EXTERNAL Listeners for the Kafka Instance
# -- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Protocol definition for the above specified Listeners
# -- KAFKA_INTER_BROKER_LISTENER_NAME: Listener to use for the broker
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    kafka_instance_3:
        container_name: kafka_instance_3
        image: quay.io/debezium/kafka
        environment:
            ZOOKEEPER_CONNECT: zookeeper_instance_1:2181,zookeeper_instance_2:2181
            BROKER_ID: 3
            KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka_instance_3:9093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19093
            KAFKA_LISTENERS: INTERNAL://kafka_instance_3:9093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:19093
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
        depends_on:
            - zookeeper_instance_1
            - zookeeper_instance_2
        ports:
            - 9093:9093
# MySQL Instance
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- MYSQL_PASSWORD: The password for the mysql instance
# -- MYSQL_ROOT_PASSWORD: The root password for the mysql instance
# -- MYSQL_USER: The user for the mysql instance
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    mysql:
        container_name: mysql
        environment:
            MYSQL_PASSWORD: mysqlpw
            MYSQL_ROOT_PASSWORD: debezium
            MYSQL_USER: mysqluser
        image: quay.io/debezium/example-mysql
        ports:
            - 3307:3306
# Kafka Connect
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- OFFSET_STORAGE_TOPIC: Custom Topic Name for the Offset Storage Topic
# -- STATUS_STORAGE_TOPIC: Custom Topic Name for the Status Storage Topic
# -- CONFIG_STORAGE_TOPIC: Custom Topic Name for the Config Storage Topic
# -- GROUP_ID: Group ID for the instance
# -- BOOTSTRAP_SERVERS: The link to Kafka Deployments (instance 1, 2 and 3 in our case) in the format <container_name>:<port>
# depends_on: Specifies dependency on other containers i.e, wait for the specified containers to come up before starting
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    connect:
        container_name: connect
        environment:
            OFFSET_STORAGE_TOPIC: my_connect_offsets
            STATUS_STORAGE_TOPIC: my_connect_statuses
            CONFIG_STORAGE_TOPIC: my_connect_configs
            GROUP_ID: '1'
            BOOTSTRAP_SERVERS: kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093
        image: quay.io/debezium/connect
        depends_on:
            - zookeeper_instance_1
            - zookeeper_instance_2
            - kafka_instance_1
            - kafka_instance_2
            - kafka_instance_3
            - mysql
        ports:
            - 8083:8083
# Kafdrop
# container_name: Specifies the Name of the container
# image: Specifies the image to use
# environment: Environment Variables for the container. In this case we will be supplying variables as shown below
# -- KAFKA_BROKERCONNECT: The link to Kafka Deployments (instance 1, 2 and 3 in our case) in the format <container_name>:<port>
# ports: List of ports to be exposed for the container in the format of <HOST_PORT>:<CONTAINER_PORT>
    kafdrop:
        container_name: kafdrop
        environment:
            KAFKA_BROKERCONNECT: kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093
        image: obsidiandynamics/kafdrop
        ports:
            - 9000:9000

2. Start the docker-compose services using the below command

# -d will run the containers in detached state
docker-compose up -d

3. View the Kafdrop deployed on http://<IP_ADDRESS>:<PORT>, in our case it is http://localhost:9000, All the deployed kafka instances should be visible on the UI

Kafdrop

4. Login to the mysql container to view the sample database provided in the container by default

docker exec -it mysql /bin/sh

# Login to mysql (Use the root password from the environment variable provided in the docker-compose file)
mysql -uroot -p

# Access the sample database
use inventory;

# View the data in customers table
select * from customers;
Mysql - Kafka

5. Now let us activate the debezium connector through kafka connect

# We will be able to hit the API from curl or postman
# The curl command is as shown below
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "inventory-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysql",  
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "1",  
    "database.server.name": "dbserver1",  
    "database.include.list": "inventory",  
    "database.history.kafka.bootstrap.servers": "kafka_instance_1:9091,kafka_instance_2:9092,kafka_instance_3:9093",  
    "database.history.kafka.topic": "schema-changes.inventory"  
  }
}'

# For the sake of simplicity we will be using postman as shown below
Postman - Kafka

6. To confirm the connector was added, navigate to http://<IP_ADDRESS>:<PORT>/connectors, in this case http://localhost:8083/connectors. A connector named inventory-connector should be seen

Connector - Kafka

7. Now, in the Kafdrop UI we should be able to see topics created by debezium, (dbserver1 in this case)

Kafdrop-debezium

8. Navigate to the “dbserver1.inventory.customers” topic. This topic is used to track the customers table from the inventory db

Kafdrop-debezium_1

9. Click on the “View Messages” Button to view the messages in the topic.

Kafdrop-debezium_2

10. Click on the “View Messages” icon on the above screen. This should display four entries with id 1001, 1002, 1003, 1004 which correspond to the id’s in the customers table as seen before

Kafdrop-debezium_3

Steps to test Debezium

1. Login to mysql as shown before and update a record

docker exec -it mysql /bin/sh

# Login to mysql (Use the root password from the environment variable provided in the docker-compose file)
mysql -uroot -p

# Access the sample database
use inventory;

# View the data in customers table
select * from customers;

# Update a record in the table
update customers set first_name='easycode' where id=1001;

# View the data in customers table
select * from customers;
Kafdrop-debezium_4

2. Refresh the Kafdrop Webpage and we should now be able to see a new entry for the id 1001

Kafdrop-debezium_5

3. As seen above, we have successfully captured the data change through debezium and added it to our message queue. This data can further be used for many purposes like data replication, real time analytics etc.,

On inspection of the payload of the record, we can see the update has been successfully captured by debezium

Kafdrop-debezium_6

Congratulations!! You have now successfully setup Debezium with Kafka to monitor and record DB operations on MySQL

6550cookie-checkChange Data Capture (CDC) with Debezium and Kafka

2 Comments

  1. For monitoring multiple tables, should we create multiple topics ? Is it possible to monitor an particular column change in a table?

    Also are there any analytical tools for correlating data change events received across topics?

Leave a Comment

Your email address will not be published. Required fields are marked *