Toggle navigation
直到世界的尽头
主页
搜索
flink实时读取mongodb方案调研-实现mongodb cdc
计算机相关
2022-01-27 16:31:56.0
# 背景介绍 mongodb目前是很多企业业务上常用的nosql数据库。我们需要对这些业务mongodb数据库进行 数据同步到 数据仓库中进行 数据分析处理。 # 技术选型 ## CDC介绍 CDC (Change Data Capture) 是一种用于捕捉数据库变更数据的技术 CDC 技术的应用场景非常广泛: 数据同步:用于备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。 CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: ### 基于查询的 CDC 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的延迟。 ### 基于日志的 CDC 实时消费日志,流处理,例如 Mongodb 的 oplog 日志完整记录了数据库中的变更,可以把 oplog 文件当作流的数据源; 保障数据一致性,因为 oplog 文件包含了所有历史变更明细; 保障实时性,因为类似 oplog 的日志文件是可以流式消费的,提供的是实时数据。 对比常见的开源 CDC 方案,我们可以发现:  对比增量同步能力, 基于日志的方式,可以很好的做到增量同步; 而基于查询的方式是很难做到增量同步的。 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合? 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据; 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。 ## 传统的cdc etl方案  传统的基于 CDC 的 ETL 分析中,数据采集工具是必须的,国外用户常用 Debezium,国内用户常用阿里开源的 Canal,采集工具负责采集数据库的增量数据,一些采集工具也支持同步全量数据。采集到的数据一般输出到消息中间件如 Kafka,然后 Flink 计算引擎再去消费这一部分数据写入到目的端,目的端可以是各种 DB,数据湖,实时数仓和离线数仓。 注意,Flink 提供了 changelog-json format,可以将 changelog 数据写入离线数仓如 Hive / HDFS;对于实时数仓,Flink 支持将 changelog 通过 upsert-kafka connector 直接写入 Kafka debezium官网 https://debezium.io/ debezium github地址 https://github.com/debezium 使用debezium实时清洗mongodb参考: https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-streaming-changes [debezium连接mysql示例](https://blog.csdn.net/weixin_40898246/article/details/108327247 "debezium连接mysql示例") 传统方式[Flink+Debezium 实现 CDC 原理及代码实战](https://zhuanlan.zhihu.com/p/338945255 "Flink+Debezium 实现 CDC 原理及代码实战") [debezium mongodb 集成测试](https://www.cnblogs.com/rongfengliang/p/10176261.html "debezium mongodb 集成测试") ## flink cdc etl-- Flink CDC Connectors Flink CDC 替换了传统cdc etl中虚线框内的采集组件和消息队列,从而简化分析链路,降低维护成本。同时更少的组件也意味着数据时效性能够进一步提高。 Flink 从 1.11 版本开始原生支持 CDC 数据(changelog)的处理,目前已经是非常成熟的变更数据处理方案。 flink cdc 2.1版本 已经正式支持mongodb cdc, 全量和增量都支持 原理就是伪装为集群副本 通过oplog。  Flink CDC Connectors 是 Flink 的一组 Source 连接器,是 Flink CDC 的核心组件,这些连接器负责从 MySQL、PostgreSQL、Oracle、MongoDB 等数据库读取存量历史数据和增量变更数据。Flink CDC Connectors 是一个独立的开源项目,从2020年 7 月份开源以来,社区保持了相当高速的发展,平均两个月一个版本,在开源社区的关注度持续走高,也逐渐有越来越多的用户使用 Flink CDC 来快速构建实时数仓和数据湖。 Flink CDC Connectors github地址 https://github.com/ververica/flink-cdc-connectors/wiki Flink CDC 可以同步数据库数据并写入到 数据仓库比如TiDB中,用户直接使用 Flink SQL 创建了产品和订单的 MySQL-CDC 表,然后对数据流进行 JOIN 加工,加工后直接写入到下游数据库。通过一个 Flink SQL 作业就完成了 CDC 的数据分析,加工和同步。 可以使用纯 SQL 作业,这意味着只要会 SQL 的 BI,业务线同学都可以完成此类工作。与此同时,用户也可以利用 Flink SQL 提供的丰富语法进行数据清洗、分析、聚合。  此外,利用 Flink SQL 双流 JOIN、维表 JOIN、UDTF 语法可以非常容易地完成数据打宽,以及各种业务逻辑加工。  ## Flink CDC 项目发展 2020 年 7 月由云邪提交了第一个 commit,这是基于个人兴趣孵化的项目; 2020 年 7 中旬支持了 MySQL-CDC; 2020 年 7 月末支持了 Postgres-CDC; 2021年 11月中旬2.1版本支持 Mongodb-CDC; 一年的时间,该项目在 GitHub 上的 star 数已经超过 800。 更多信息 https://github.com/ververica/flink-cdc-connectors/releases ## flink cdc1.0存在的问题 MySQL CDC 是 Flink CDC 中使用最多也是最重要的 Connector,本文下述章节描述 Flink CDC Connector 均为 MySQL CDC Connector。 随着 Flink CDC 项目的发展,得到了很多用户在社区的反馈,主要归纳为三个:  1、全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。 2、不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。 3、全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。 ## flink cdc2.0针对痛点问题的优化 2.0 的设计方案,核心要解决上述的三个问题,即支持无锁、水平扩展、checkpoint。  整体流程可以概括为,首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader,每个 Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读,SourceReader 读取时支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 读取完成后,下发一个 binlog chunk 进行增量部分的 binlog 读取,这便是 Flink CDC 2.0 的整体流程 Flink CDC 是一个完全开源的项目,项目所有设计和源码目前都已贡献到开源社区,Flink CDC 2.0 也已经正式发布,此次的核心改进和提升包括: 提供 MySQL CDC 2.0,核心feature 包括 并发读取,全量数据的读取性能可以水平扩展; 全程无锁,不对线上业务产生锁的风险; 断点续传,支持全量阶段的 checkpoint。 搭建文档网站,提供多版本文档支持,文档支持关键词搜索 用 TPC-DS 数据集中的 customer 表进行测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段: MySQL CDC 2.0 用时 13 分钟; MySQL CDC 1.4 用时 89 分钟; 读取性能提升 6.8 倍。 设计原理参考: http://www.dawuzhe.cn/152938.html 2.1 版本包含 23 位贡献者贡献的 100+ PR,重点提升了 MySQL CDC 连接器的性能和生产稳定性,重磅推出 Oracle CDC 连接器和 MongoDB CDC 连接器。 MySQL CDC 支持百亿级数据的超大表,支持 MySQL 全部数据类型,通过连接池复用等优化大幅提升稳定性。同时提供支持无锁算法,并发读取的 DataStream API,用户可以借此搭建整库同步链路; 新增了 Oracle CDC 连接器, 支持从 Oracle 数据库获取全量历史数据和增量变更数据; 新增了 MongoDB CDC 连接器,支持从 MongoDB 数据库获取全量历史数据和增量变更数据;  所有连接器均支持了 metadata column 功能, 用户通过 SQL 就可以访问库名,表名,数据变更时间等 meta 信息,这对分库分表场景的数据集成非常实用; 丰富 Flink CDC 入门文档,增加多种场景的端到端实践教程。 从 ChangeStreams API 获取的更新事件中,对于 update 事件,没有 update 事件的前镜像值,即 MongoDB CDC 数据源只能作为一个 upsert source。不过 Flink 框架会自动为 MongoDB CDC 附加一个 Changelog Normalize 节点,补齐 update 事件的前镜像值(即 UPDATE_BEFORE 事件),从而确保 CDC 数据的语义正确性。 使用 MongoDB CDC 连接器,用户只需要声明如下 Flink SQL 就能实时捕获 MongoDB 数据库中的全量和增量变更数据,借助 Flink 强大的集成能力,用户可以非常方便地将 MongoDB 中的数据实时同步到 Flink 支持的所有下游存储。  整个数据捕获过程,用户不需要学习 MongoDB 的副本机制和原理,极大地简化了流程,降低了使用门槛。MongoDB CDC 也支持两种启动模式: 默认的initial 模式是先同步表中的存量的数据,然后同步表中的增量数据; latest-offset 模式则是从当前时间点开始只同步表中增量数据。 此外,MongoDB CDC 还提供了丰富的配置和优化参数,对于生产环境来说,这些配置和参数能够极大地提升实时链路的性能和稳定性。 # flink-mongodb-cdc 实践 (后续文章详细说明具体步骤) master文档 https://ververica.github.io/flink-cdc-connectors/master/content/about.html 使用文档 https://ververica.github.io/flink-cdc-connectors/release-2.1/ 下载地址 https://github.com/ververica/flink-cdc-connectors/releases