Flink-MySQL-CDC同步表数据

资料

Flink-CDC Github地址open in new window
文档地址open in new window

简介

CDC 即 Change Data Capture 变更数据捕获,为Flink 1.11中一个新增功能。我们可以通过 CDC 得知数据源表的更新内容(包含Insert、Update和Delete),并将这些更新内容作为数据流发送到下游系统。捕获到的数据操作具有一个标识符,分别对应数据的增加,修改和删除。

常见开源CDC比较

Flink CDCDebeziumCanal
增量同步VVV
断点续传VVV
全量同步VVX
全量+增量VVX
架构分布式单机单机
生态🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟
  • 全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。
  • 不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。
  • 全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。
  • 无锁
  • 水平扩展
  • 支持checkpoint

借鉴Netflix的DBlog paper 全程无锁
基于Flink Flip-27 Source实现

简单用法

本文描述监听MySQL某个表数据变动后,处理对应的逻辑,然后落入到新表等,首先引入以下依赖


<dependencies>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>1.17.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.27</version>
        <exclusions>
            <exclusion>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

模拟编写如下代码,配置数据源以及监听的表。

package org.example.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


/**
 * @author dingqianwen
 */
@Slf4j
public class MySqlSourceExample {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("数据库地址")
                .port(3306)
                .databaseList("监听的数据库")
                .tableList("监听的数据库.表")
                .username("数据库用户")
                .password("数据库密码")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        // enable checkpoint
        env.enableCheckpointing(5000);
        env.setParallelism(1);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String value) {
                        log.info("对数据做处理转换======>" + value);
                        return value;
                    }
                })
                .addSink(new RichSinkFunction<String>() {
                    @Override
                    public void invoke(String value, Context context) {
                        log.info("后置数据处理======>" + value);
                    }
                });
        env.execute("Print MySQL Snapshot + Binlog");
    }

}

启动应用程序后,如果监听的表数据发生变动,则会打印如下信息:

...
16:36:18.613 [Map (11/12)#2] INFO org.example.flink.MySqlSourceExample - 对数据做处理转换======>{"before":{"id":2,"code":"bbb","name":"bbb-1","created":"2023-07-12T09:04:07Z"},"after":{"id":2,"code":"bbb","name":"bbb-2","created":"2023-07-12T09:04:07Z"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689323705000,"snapshot":"false","db":"test","sequence":null,"table":"user","server_id":3306,"gtid":"b01993bc-d9de-11ed-9f73-e4434bb57578:627577","file":"logsmysql-bin.000007","pos":78894988,"row":0,"thread":263022480,"query":null},"op":"u","ts_ms":1689323705736,"transaction":null}
...

项目打jar后提交到Flink即可运行。

更新日期:
作者: dingqw