Tất tần tật về Change Data Capture và Debezium (Phần 2): Demo

Ở phần 1, chúng ta đã nắm được kiến thức cơ bản về kỹ thuật Change Data Capture và công cụ Debezium. Trong phần 2 này, chúng ta sẽ chạy thử demo một hệ thống có sử dụng Debezium.

Upload image

Tất tần tật về Change Data Capture và Debezium (Phần 1)

Sau khi đã nắm rõ về khái niệm, ưu - nhược điểm và các cách để tích hợp Debezium vào hệ thống, tiếp theo chúng ta sẽ đến với phần thú vị nhất, đó chính là chạy demo 1 hệ thống thật có sử dụng Debezium.

Demo thôi!

Trong demo này, ta sẽ thử vận hành 1 hệ thống theo kiến trúc sau:

Upload image

Như bạn có thể thấy, phía source database là MySQL DB và có một Debezium connector đang theo dõi source database này. Debezium connector này sẽ được vận hành và quản lý với Kafka Connect.

Nếu có thay đổi, Debezium sẽ tạo ra change-event, Kafka Connect sẽ truyền và lưu vào Kafka.

Ở phía sink, ta có một JDBC sink connector liên tục lấy change-event từ Kafka ra và update vào PostgreSQL DB.

Với kiến trúc như trên, ta sẽ cần 2 instance của Kafka Connect với hai mục đích, đó là

  • Quản lý source connector
  • Quản lý sink connector

Tuy nhiên, để phần demo được ngắn gọn, ta có thể giảm bớt một instance của Kafka Connect. Ta sẽ dùng chung một instace Kafka Connect để quản lý cả source và sink connector.

Từ đó, ta có được 1 kiến trúc rút gọn như sau:

Upload image

Configuration Management

Version của OS và từng service ta sẽ sử dụng ở phần demo như sau:

Step-by-step demo

Vì các service chúng ta sử dụng trong phần demo sẽ thông qua Docker, nên môi trường mà bạn sử dụng cho các bước tiếp theo cần phải cài đặt sẵn Docker rồi nhé.

Step 1: Khởi động Zookeeper

Step 2: Khởi động Kafka

Step 3: Khởi động server MySQL

Step 4: Khởi động server PostgreSQL

Step 5: Khởi động Kafka Connect

Với các bước từ 1 đến 5, ta sẽ sử dụng một file docker-compose có sẵn để deploy nhanh tất cả các service trên.

Download docker-compose.yaml tại đây

Mở 1 terminal, di chuyển đển nơi để file docker-compose.yaml và chạy câu lệnh sau để khởi tạo các service trên:

sudo DEBEZIUM_VERSION=1.9 docker-compose up

Upload image

Upload image

Step 6: Đăng ký MySQL source connector với Kafka Connect

Sau khi các service trên đã chạy ổn đinh, MySQL Debezium connector sẽ được đăng ký với Kafka Connect thông qua Kafka Connect API. Tất cả những configuration liên quan đến Debezium sẽ được khai báo ở file cấu hình sau.

Upload image

Download file cấu hình trên về nhé.

Mở một terminal mới, di chuyển đến nơi đặt file cấu hình và chạy câu lệnh sau:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Step 7: Đăng ký PostgreSQL sink connector với Kafka Connect

Tương tự như Debezium Connector, PostgreSQL Connector cũng được khai báo ở file cấu hình tại đây.

Upload image

Download file cấu hình trên. Mở một terminal khác di chuyển đến nơi đặt file cấu hình và chạy câu lệnh sau:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Step 8: Kiểm tra dữ liệu ở source database và target database

Giờ ta sẽ kiểm tra xem dữ liệu ở source và target databases đã được đồng bộ với nhau hay chưa.

Step 8.1: Mở một terminal mới và chạy câu lệnh sau để sử dụng MySQL client và kiểm tra dữ liệu ở MySQL:

sudo docker exec -ti unwrap-smt_mysql_1 sh -c 'mysql -h localhost -u root -p'

(password: debezium)

Chạy 2 câu lệnh sau để lấy ra danh sách record của bảng customers

use inventory;

select * from customers;

Upload image

Step 8.2: Mở một terminal mới và chạy câu lệnh sau để sử dụng Psql và kiểm tra dữ liệu ở PostgreSQL:

sudo docker exec -ti unwrap-smt_postgres_1 psql -d inventory -U postgresuser

Chạy câu lệnh sau để lấy ra danh sách record của bảng customers

select * from customers;

Ta sẽ thấy dữ liệu ở 2 bảng customers đang giống y hệt nhau.

Upload image

Step 9: Insert thêm một record mới ở MySQL

Ta sẽ quay lại MySQL Client để chạy một câu query thêm record:

insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');

Upload image

Step 10: Kiểm tra change-event ở Kafka

Ta sẽ dùng một phần mềm open-source để đọc được message bên trong Kafka tại đây.

Mở một terminal mới và chạy câu lệnh sau:

sudo docker run -it --rm --name kafka_ui -p 8090:8080 --link unwrap-smt_kafka_1:kafka -e KAFKA_CLUSTERS_0_NAME=local -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 --net unwrap-smt_default -d provectuslabs/kafka-ui:latest

Nếu bạn đang run docker ở máy tính localhost thì truy cập vào địa chỉ sau để truy cập giao diện web của Kafka UI: http://localhost:8090

Nếu bạn run docker trên máy ảo thì hãy thay localhost bằng IP của máy ảo đó, trong trường hợp của mình mình đang dùng Azure VM với public IP là 20.29.249.138 nên mình sẽ truy cập vào link: http://20.29.249.138:8090

Giao diện của Kafka UI:

Upload image

Ta vào mục “Topics” ở thanh nav bar bên trái, sau đấy chọn “customers”

Upload image

Sau khi chọn topic “customers”, chọn “messages” để xem được tất cả message của topic.

Message ở offset-4 chính là change-event của record mới được insert vào MySQL

Upload image

Step 11: Kiểm tra record mới ở PostgreDB

Chạy câu lệnh sau ở PSQL Client để xem tất cả record của bảng customers:

select * from customers;

Upload image

Kafka Connect ngất giữa chừng, đừng có lo!

Với phần demo step-by-step trên, chúng ta đã thấy cách Debezium MySQL Connector phát hiện những thay đổi ở phía source database, giờ chúng ta hãy cùng tìm hiểu xem cách Debezium phát hiện và lưu lại những thay đổi ngay cả khi nó không hoạt động nhé.

Kafka Connect tự động quản lý những connector đã được đăng ký (registered), cấu hình của những connector này được Kafka Connect lưu trên Kafka. Vì vậy, nếu Kafka Connect bị tắt và không hoạt động, ngay sau khi được khởi động lại, nó sẽ vẫn khởi động lại được những connector cũ. Điều này có nghĩa là kể cả Debezium không hoạt động, khi chạy lại thì thay đổi ở source database vẫn sẽ được theo dõi và không bỏ sót.

Bản thân Debezium cũng liên tục lưu lại offset về vị trí trong transaction log mà nó đã đọc lần cuối cùng vào Kafka. Vậy nên khi được khởi động lại, Debezium sẽ tiếp tục đọc từ vị trí trước khi tắt. Tính năng này đảm bảo không bỏ lỡ record (avoid missing record) và hạn chế trùng lặp change-event (avoid duplicate change-event).

Ta sẽ thử tính năng này qua các bước sau: Tắt Kafka Connect, insert record mới vào MySQL, restart lại Kafka Connect để thấy được change-events mới và kiểm tra PostgreSQL để thấy sự cập nhật.

Step 1: Mở một terminal mới và chạy câu lệnh sau để tắt Kafka Connect:

sudo docker stop unwrap-smt_connect_1

Step 2: Quay lại terminal đang chạy MySQL Client, insert 1 record mới vào MySQL:

INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");

Upload image

Step 3: Di chuyển đến vị trí đặt file docker-compose.yaml.

Mở lại terminal và khởi động lại Kafka Connect:

sudo DEBEZIUM_VERSION=1.9 docker-compose up connect

Step 4: Gửi request đến Kafka Connect API để kiểm tra các connector xem đã được khởi động lại chưa: Mở terminal mới và chạy câu lệnh sau:

curl -H "Accept:application/json" localhost:8083/connectors/

Upload image

Step 5: Quay lại terminal đang run Psql và kiểm tra dữ liệu của bảng customers:

select * from customers;

Vậy là ta đã hoàn thành phần demo sử dụng Debezium rồi đó.

Công cụ này rất hữu ích và vẫn còn rất nhiều khả năng tuyệt vời để bạn khám phá, mình đã để một số đường link tài liệu tham khảo thêm phía dưới, nếu bạn quan tâm hãy đào sâu hơn tìm hiểu thêm nhé.

Tài liệu tham khảo:

Debezium original document

Embedded Debezium demo tutorial

Debezium with Elastic Search

Other CDC Tools

Debezium Embedded with Amazon Kinesis

Debezium Server with Amazon Kinesis

Debezium Duplicate Events Handling

Atekco - Home for Authentic Technical Consultants