🗒️基于Binlog、FlinkCDC、Doris实现实时数据同步
00 分钟
2023-11-29
2023-11-29
type
status
date
slug
summary
tags
category
icon
password
Language

Docker部署MySQL

  • 通过挂载的方式开启一个mysql镜像
备注:需提前在宿主机目录下创建一个文件用于保存mysql的数据集,我这里创建的目录是 /usr/docker/mysql/data
  • 使用客户端连接工具连接上mysql,观察一下mysql_binlog的开启情况
notion image
  • 开启bin_log
  • 重启mysql镜像
  • 再次查看 show VARIABLES like '%log_bin%'
      • notion image
  • 准备数据
    • 这时在宿主机下已经产生了日志文件
        • notion image

    Doris 单机版部署

    • 解压到指定目录
    • 配置FE和BE
    • 启动 FE
    • 验证FE
    • 启动BE
    • MySQL命令行连接FE,这里新安装的Doris集群默认用户是root和admin,密码是空
    • 将BE节点加入到集群中
    • 查看BE

    Docker 部署 Doris

    创建挂载的文件夹
    运行Fe
    运行Be
    查看容器IP
    测试服务是否正常
    添加BE至FE中
    查看Be状态
    修改数据库root密码
    notion image

    Flink安装配置

    准备Flink安装包 flink-1.14.4-bin-scala_2.12.tgz
    准备两个Jar放到Flink/lib目录下
    启动Flink
    notion image

    同步数据到Doris

    创建Doris数据库及表
    进入Flink SQL Client
    开启 checkpoint,每隔10秒做一次 checkpoint , Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。Source在启动时会扫描全表,将表按照主键分成多个chunk。并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk。当发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义
    注意: 这里是演示,生产环境建议checkpoint间隔60秒
    SET execution.checkpointing.interval = 10s;
    创建MySQL CDC表
    在Flink SQL Client 下执行下面的 SQL
    ‘database-name’ = ‘emp_[0-9]+’: 这里是使用了正则表达式,同时连接多个库‘table-name’ = ‘employees_[0-9]+’:这里是使用了正则表达式,同时连接多个表
    查询CDC表
    notion image
    image-20220902142456374
    创建 Doris Sink 表
    参数说明:
    1. connector : 指定连接器是doris
    1. fenodes:doris FE节点IP地址及http port
    1. table.identifier : Doris对应的数据库及表名
    1. username:doris用户名
    1. password:doris用户密码
    1. sink.properties.two_phase_commit:指定使用两阶段提交,这样在stream load的时候,会在http header里加上 two_phase_commit:true ,不然会失败
    1. sink.label-prefix : 这个是在两阶段提交的时候必须要加的一个参数,才能保证两端数据一致性,否则会失败
    1. 其他参数参考官方文档 https://doris.apache.org/zh-CN/
    查询Doris sink表 这时候还没有数据
    将数据插入到Doris表里
    notion image
    image-20220902143356445
    可以看到Flink WEB UI上的任务运行信息
    notion image
    查看Doris数据
    notion image
    测试MySQL删除
    notion image
    Doris同步更新
    notion image
    验证Doris数据删除 , mysql更新数据, doris中也会更新

    踩坑

    • Flink 连接 Doris Be 超时
        • notion image
         
      • 原因 : flink和be的网不通 , 因为我这里是WSL
          • notion image
            notion image
             

    评论