type
status
date
slug
summary
tags
category
icon
password
Language
Docker部署MySQL
- 通过挂载的方式开启一个mysql镜像
备注:需提前在宿主机目录下创建一个文件用于保存mysql的数据集,我这里创建的目录是 /usr/docker/mysql/data
- 使用客户端连接工具连接上mysql,观察一下mysql_binlog的开启情况
- 开启bin_log
- 重启mysql镜像
- 再次查看
show VARIABLES like '%log_bin%'
- 准备数据
- 这时在宿主机下已经产生了日志文件
Doris 单机版部署
- 解压到指定目录
- 配置FE和BE
- 启动 FE
- 验证FE
- 启动BE
- MySQL命令行连接FE,这里新安装的Doris集群默认用户是root和admin,密码是空
- 将BE节点加入到集群中
- 查看BE
Docker 部署 Doris
创建挂载的文件夹
运行Fe
运行Be
查看容器IP
测试服务是否正常
添加BE至FE中
查看Be状态
修改数据库root密码
Flink安装配置
准备Flink安装包
flink-1.14.4-bin-scala_2.12.tgz
准备两个Jar放到Flink/lib目录下
启动Flink
同步数据到Doris
创建Doris数据库及表
进入Flink SQL Client
开启 checkpoint,每隔10秒做一次 checkpoint , Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。Source在启动时会扫描全表,将表按照主键分成多个chunk。并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk。当发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义注意: 这里是演示,生产环境建议checkpoint间隔60秒
SET execution.checkpointing.interval = 10s;
创建MySQL CDC表
在Flink SQL Client 下执行下面的 SQL
‘database-name’ = ‘emp_[0-9]+’: 这里是使用了正则表达式,同时连接多个库‘table-name’ = ‘employees_[0-9]+’:这里是使用了正则表达式,同时连接多个表
查询CDC表
image-20220902142456374
创建 Doris Sink 表
参数说明:
- connector : 指定连接器是doris
- fenodes:doris FE节点IP地址及http port
- table.identifier : Doris对应的数据库及表名
- username:doris用户名
- password:doris用户密码
- sink.properties.two_phase_commit:指定使用两阶段提交,这样在stream load的时候,会在http header里加上
two_phase_commit:true
,不然会失败
- sink.label-prefix : 这个是在两阶段提交的时候必须要加的一个参数,才能保证两端数据一致性,否则会失败
- 其他参数参考官方文档 https://doris.apache.org/zh-CN/
查询Doris sink表 这时候还没有数据
将数据插入到Doris表里
image-20220902143356445
可以看到Flink WEB UI上的任务运行信息
查看Doris数据
测试MySQL删除
Doris同步更新
验证Doris数据删除 , mysql更新数据, doris中也会更新
踩坑
- Flink 连接 Doris Be 超时
- 原因 : flink和be的网不通 , 因为我这里是WSL
- 作者:何以问
- 链接:https://heyiwen.com/article/bigdata-1
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。