合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
面对日益增长的数据量,Lambda 架构使用离线/实时两条链路和两种存储完成数据的保存和处理。这种繁杂的架构体系带来了不一致的问题,需要通过修数、补数等一系列监控运维手段去弥补。为了统一简化架构,提高开发效率,减少运维负担,我们实施了基于数据湖 Hudi+Flink 的流批一体架构,达到了降本增效的目的。
如下图所示,总体架构包括数据采集、ETL、查询、调度、监控、数据服务等。要解决的是数据从哪里来到哪里去,怎么过去,怎么用,以及过程中的调度和监控、元数据管理、权限管理等问题。
“数据从哪里来”,我们的数据来自 MySQL、MongoDB、Tablestore、Hana。“数据到哪里去”,我们的数据会写入到 Hudi、Doris,其中 Doris 负责存储部分应用层的数据。“数据怎么过去”,将在后面的实时入湖部分进行介绍。“数据用在哪里”,我们的数据会被 OLAP、机器学习、API、BI 查询使用,其中 OLAP 和 BI 都通过 Kyuubi 的服务进行查询。
任务的调度主要通过 DolpuinScheduler 来执行,基于 quartz 的 cronTrigger 完成 shell、SQL 等调度。监控部分则是通过 Prometheus 和 Grafana,这是业界通用的解决方案。元数据采集通过 DataHub 完成,采用了 datahub 的 ingestion framework 框架来采集各种数据源的元数据。权限管理主要包括 Kyuubi 服务端的统一认证和引擎端的独立鉴权。
数据入湖方案设计上,我们比较了三种入湖的实现思路。
如下图所示,包含了两条支线:
这种方案的主要优点是 Flink 和 CDC 组件都经过了充分验证,已经非常稳定成熟了。而主要缺点是 Flink SQL 需要定义表 DDL。但我们已经开发 DDL 列信息从元数据系统获取,无须自定义。并且写 Hudi 是每张表一个 Flink 任务,这样会导致资源占用过多。另外 Flink CDC 还不支持 Schema 演变,一旦 Schema 变更,需要重新拉取数据。
这一方案是在前一个方案分支二的基础上进行了一定的改进,通过 Dinky 完成整库数据同步,其优点是同源数据合并成一个 source 节点,减轻源库压力,根据 schema、database、table 分流 sink 到对应表。其缺点是不支持 schema 演变,表结构变更须重新导数。如下图所示,mysql_biz 库中有3张表,从 flink dag 图看到 mysql cdc source 分3条流 sink 到 Hudi 的3张表。
主要流程如下图所示。其主要优点是支持 Schema 演变。Schema 变更的信息由 Debezium 注册到 Confluence Schema Registry,schema change 的信息通过 DeltaStreamer 执行任务变更到 Hudi,使得任务执行过程中不需要重新拉起。其主要缺点是依赖于 Spark 计算引擎,而我们部门主要用 Flink,当然,这会因各个公司实际情况而不同。
下图分别是 Yarn 的 deltastreamer 任务, Kafka schema-change topic 的 DML message 和 Hudi 表变更后的数据。
在方案选型时,可以根据下面的流程图进行比较选择:
(1) 先看计算框架是 Spark 还是Flink,如果是Spark 则选择方案三,即 Deltastreamer,这一方案适用于表结构变更频繁,重新拉取代价高,主要技术栈是Spark 的情况。
(2) 如果是 Flink,再看数据量是否较少,以及表结构是否较稳定,如果是的话,选择方案二,Dinky 整库同步方案支持表名过滤,适用数据量较少且表结构较稳定的表。
(3) 如果否,再考虑 mysql 能否抗较大压力,如果否,那么选择方案一下分支,即 Kafka Connect,Debezium 拉取发送 Kafka,从 Kafka 读取后写 Hudi。适用数据量较大的多张表。
(4) 如果是,则选择方案一上分支,即 Flink SQL mysql-cdc 写 Hudi,适用于对实时稳定要求高于资源敏感的重要业务场景。
我们的入湖场景是 Flink Stream API 读取Pulsar 写 Hudi MOR 表,特点是数据量大,并且源端的每条消息都只包含了部分的列数据。我们通过使用 Hudi 的 MOR 表格式和 PartialUpdateAvroPayload 实现了这个需求。使用 Hudi 的 MOR 格式,是因为 COW 的写放大问题,不适合数据量大的实时场景,而 MOR 是增量数据写行存 Avro 格式log,通过在线或离线方式压缩合并至列存格式 parquet。在保证写效率的同时也兼顾了查询的性能。不过需要通过合并任务定期地对数据进行合并处理,这是引入复杂度的地方。
以下面这张图为例,recordKey 是 ID1 的3条 msg,每条分别包含一个列值,其余字段为空,按 ts 列 precombine,当 ts3 > ts2 > ts1时,最终 Hudi 存的 ID1 行的值是 v1,v2,v3,ts3。
此入湖场景痛点包括,MOR 表索引选择不当,压缩异常导致越写越慢,直至 checkpoint 超时,某分区存在重复文件导致写任务出错,MOR 表某个压缩计划 pending阻碍此 bucket 的压缩及后续的压缩计划生成,以及如何平衡效率与资源等。
我们在实践过程中针对一些痛点实施了相应的解决方案。
Hudi 表索引类型选择不当,导致越写越慢至 CK 超时,这是因为 Bucket 索引通过 hash 映射 recordKey 到 fileGroup。而 Bloom 索引是保存 recordKey 和 partition、fileGroup 值来实现,因此 checkpoint size 会随数据量的增加而增长。Bloom Filter 索引基于布隆过滤器实现,索引信息存储在 parquet 的 footer 中,Bloom 的假阳性问题也会导致更新越来越慢,假阳性是指只能判断数据一定不在某个文件而不能保证数据一定在某个文件,因此存在多个文件都可能存在某条数据,即须读取多个文件才能准确判断。
我们做的优化是使用 Bucket 索引代替 Bloom 索引,Hudi 目前也支持了可以动态扩容的 Bucket 参数。
MOR 表压缩执行异常,具体来说有以下三个场景:
此3种现象的原因都是 Sink:compact_commit 算子的并行度 > 1,我们做的优化是降低压缩过程的并发度,设置 compact_commit Parallelism = 1。并行度改成1后1G的 log 压缩正常。整张表size 明显减少。log 到 parquet 的压缩比默认是0.35。
MOR 表某分区存在重复文件,导致写任务出错。出现这个问题的原因是某个 instant 已写 log 文件但未成功提交到 timeline 时,发生异常重启后未 rollback 这个 instant,即未清理已有 log,继续写此 instant 则有重复。
我们做的优化是在遇到重复文件时,通过 Hudi-Cli 执行去重任务,再恢复执行。具体来说,需要拆分成以下四个步骤:
repair deduplicate --duplicatedPartitionPath 20220604 --repairedOutputPath hdfs:///hudi/hudi_tis.db/track_detail_3_repair/20220604 --dedupeType upsert_type --sparkMaster local
MOR 表某个压缩计划 pending,阻碍此 bucket 的压缩及后续的压缩计划生成。这个问题是由于环境问题导致的 zombie compaction 或 bug。上图中第一列是compaction instant time,即压缩计划生成时间,第二列是状态,第三列是此压缩计划包含的文件数。8181的 instant 卡住,且此压缩计划包含2198个文件,即涉及到大量的 file group,涉及的 file group不会有新的压缩计划生成。导致表的 size 增加,写延时。
我们做的优化是回滚不正常的合并任务,重新处理。即利用较多资源快速离线压缩完。保证之后启动的 Flink 任务在相对少的资源情况下仍然可以保证更新和在线压缩的效率。
具体来说,包括下面的命令:
sh bin/hudi-compactor.sh hudi_tis track_detail_3 100
compaction unschedule --instant 20230613180604970 --parallelism 200 --sparkMaster local --sparkMemory 5g
经过多次的修改和验证,我们的入湖任务在性能和稳定性上取得了明显的改善。在稳定性上,做到了在十几天内任务无异常。在时延上,做到了分钟级别的 checkpoint 和数据可见。在资源使用上,对 Hadoop YARN 资源的占用明显减少。
下图总结了我们对实时入湖做的参数优化方案,包括:
Flink增量checkpoint:Rockdb #Flink ck存储,rockdb支持增量ck,减少单ck数据量,提高写吞吐。
jobmanager 5G #Flink jobmanager内存,减少oom,保证稳定。
taskmanager 50G 20S #Flink taskmanager内存与slot数,slot与并发度、bucket数一致。
实时任务入湖的优化思路流程包括下面几个步骤:
在引入 Kyuubi 前,我们通过 JDBC、Beeline、Spark Client、Flink Client 等客户端访问服务层执行查询,没有统一入口,多个平台不互通,多账号权限体系。用户的痛点是跨多平台开发体验差,低效率。平台层的痛点是问题定位运维复杂,存在资源浪费。
在引入 Kyuubi 后,我们基于社区版 Kyuubi 做了一定的改造,包括 JDBC 引擎开发、JDBC 引擎 Ranger 鉴权开发、BI、JDBC 客户端元数据适配修改、Spark 引擎大结果集存 HDFS、支持导数开发、JDBC 引擎 SQL 拦截控流开发等,实现了统一数据服务入口,做到了统一认证权限管理和统一易用原则。
下图展示了 Kyuubi 的架构和权限管控:
Kyuubi 查询流程是:客户端请求通过 LDAP 认证后,连接 Kyuubi Server 生成 Kyuubi session,之后 Kyuubi server 根据连接用户以及用户隔离级别路由到已经启动的 engine 或启动一个新的 engine。Spark 引擎会先申请 container 运行 AppMaster,后申请 container 运行 executor 执行 task。Flink 引擎会完成 StreamGraph 至 JobGraph 至 executionGraph 构建并通过 Jobmanager 和 taskmanager 运行。其中 engine 端 RangerPlungin 会在 SQL 解析后拉取 RangerAdmin 由用户配置的策略进行鉴权。RangerAdmin 完成用户同步,策略刷新等。
Kyuubi on Flink 跨库查询的目的是尝试基于 Flink实现流批一体,支持跨数据源导数 SQL 化。我们的实现方案是通过 Flink Metadata Catalog Connector 的开发,即基于元数据系统以统一 datasource.db.table 的格式查询所有数据源,且让用户免于自定义 DDL。其中元数据采集是用 datahub 的 ingestion framework 采集各种数据源的元数据,并生成对应 Flink 表属性。Flink 端是扩展 AbstractCatalog 查询 metadata DB,实现 CatalogFactory 接口。
其基本流程如下图所示:
完整流程是1 发起采集请求2和3是采集服务调 Datahub ingestion framework 完成元数据采集并写到 metadata DB 同时写 Flink 表属性。4是 用户发送 SQL 到 Kyuubi server 5是 Kyuubi server 发送 SQL 到 Flink engine 6和7是 Flink metadata catalog 会读取 metadata DB 根据 Flink 表属性读取对应数据源。
Kyuubi on JDBC Doris 可以通过外表查询 Hudi,但在 Doris 1.2 版本,仍然有一定的限制,Hudi 目前仅支持Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read Optimized Query。后续将支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。
Doris 的架构示意和其基本使用流程如下图所示:
TOP