概述目标:以Debezium捕获数据库变更日志(INSERT/UPDATE/DELETE),写入Kafka主题,为实时计算与数据入湖提供可靠来源。适用:PostgreSQL/MySQL等数据库的事务级变更同步。核心与实战PostgreSQL连接器配置(JSON):{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "db", "database.port": "5432", "database.user": "repl", "database.password": "secret", "database.dbname": "app", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.autocreate.mode": "filtered", "tombstones.on.delete": "false", "include.schema.changes": "false", "table.include.list": "public.orders,public.users", "topic.prefix": "appdb", "snapshot.mode": "initial" } } MySQL连接器要点:-- 需开启binlog与行格式 ROW;Debezium使用server.id与include list控制范围 Kafka主题治理:kafka-topics.sh --create --topic appdb.public.orders --partitions 6 --replication-factor 3 --bootstrap-server broker:9092 kafka-configs.sh --alter --entity-type topics --entity-name appdb.public.orders --add-config min.insync.replicas=2 --bootstrap-server broker:9092 示例提交连接器(Kafka Connect REST):curl -X POST http://connect:8083/connectors -H 'Content-Type: application/json' -d @postgres-connector.json 验证消息:kafka-console-consumer.sh --topic appdb.public.orders --bootstrap-server broker:9092 --from-beginning --property print.key=true 变更格式(示例):{"before":null,"after":{"id":1,"status":"PAID"},"op":"c","ts_ms":1732617600000} 验证与监控连接器状态:curl -s http://connect:8083/connectors/postgres-connector/status | jq 复制槽与发布:SELECT * FROM pg_replication_slots; SELECT * FROM pg_publication_tables; 消费延迟与积压:监控`consumer lag`与分区数据量,确保下游处理能力匹配。常见误区未开启逻辑复制或binlog导致无变更;需配置数据库日志方式。主题分区与ISR不足导致写入风险;需设置`min.insync.replicas`与`acks=all`。删除生成墓碑消息影响下游;可设置`tombstones.on.delete=false`或在下游处理。结语Debezium为数据库CDC提供稳定通道,结合Kafka主题治理与监控可支撑实时入湖与流式处理的核心链路。

发表评论 取消回复