合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
与Hive不同,Hudi数据在Spark/Flink写入后,下游可以继续使用Spark/Flink引擎以流读的形式实时读取数据。同一份Hudi数据源既可以批读也支持流读。
Flink、Hive、Spark的流转批架构:
Hudi流批同源架构:
Hudi支持COW(Copy On Write)和MOR(Merge On Read)两种类型:
(1)COW写时拷贝:
每次更新的数据都会拷贝一份新的数据版本出来,用户通过最新或者指定version的可以进行数据查询。缺点是写入的时候往往会有写内存放大的情况,优点是查询不需要合并,直接读取效率相对比较高。JDK中的CopyOnWriteArrayList/
CopyOnWriteArraySet 容器正是采用了 COW 思想。
COW表的数据组织格式如下:
(2)MOR读时合并:
每次更新或者插入新的数据时,并写入parquet文件,而是写入Avro格式的log文件中,数据按照FileGroup进行分组,每个FileGroup由base文件(parquet文件)和若干log文件组成,每个FileGroup有单独的FileGroupID;在读取的时候会在内存中将base文件和log文件进行合并,进而返回查询的数据。缺点是合并需要花费额外的合并时间,查询的效率受到影响;优点是写入的时候效率相较于COW快很多,一般用于要求数据快速写入的场景。
MOR数据组织格式如下:
Hudi表会针对COW和MOR表制定不同的文件合并方案,分别对应Clustering和Compaction。
Clustering顾名思义,就是将COW表中多个FileGroup下的parquet根据指定的数据大小重新编排合并为新的且文件体积更大的文件块。如下图所示:
Compaction即base parquet文件与相同FileGroup下的其余log文件进行合并,生成最新版本的base文件。如下图所示:
当前主流的OLAP引擎等都是从HMS中获取Hudi的分区元数据信息,从InputFormat属性中判断需要启动HiveCatalog还是HudiCatalog,然后生成查询计划最终执行。当前StarRocks、Presto等引擎都支持以外表的形式对Hudi表进行查询。
图片
Hudi 支持多种Procedure,即过程处理程序,用户可以通过这些Procedure方便快速的处理Hudi表的相关逻辑,比如Compaction、Clustering、Clean等相关处理逻辑,不需要进行编码,直接通过sparksql的语句来执行。
1). 按时效性要求进行分类
秒级延迟:
图片
分钟级延迟:
当前Hudi主要还是应用在准实时场景:
上游从Kafka以append模式接入ods的cow表,下游部分dw层业务根据流量大小选择不同类型的索引表,比如bucket index的mor表,在数据去重后进行dw构建,从而提供统一数据服务层给下游的实时和离线的业务,同时ods层和dw层统一以insert overwrite的方式进行分区级别的容灾保障,Timeline上写入一个replacecommit的instant,不会引发下游流量骤增,如下图所示:
图片
实时场景:
支持1亿条/min量级准实时写入;流读延迟稳定在分钟级
离线场景:
支持千亿级别数据单批次离线写入;查询性能与查询Hive持平(部分线上任务较查询Hive提高20%以上)
小文件治理:
95%以上的合并任务单次执行控制在10min内完成
当前线上所有Hudi的版本已从0.12 升级到 0.14,主要考虑到0.14版本的组件能力更加完备,且与社区前沿动态保持一致。
1). 限流
数据积压严重的情况下,默认情况会消费所有未消费的commits,往往因消费的commits数目过大,导致任务频繁OOM,影响任务稳定性;优化后每次用户可以摄取指定数目的commits,很大程度上避免任务OOM,提高了任务稳定性。
2). 外置clean算子
避免单并行度的clean算子最终阶段影响数据实时写入的性能;将clean单独剥离到
compaction/clustering执行。这样的好处是单个clean算子,不会因为其生成clean计划和执行导致局部某些Taskmanager出现热点的问题,极大程度提升了实时任务稳定性。
3). JM内存优化
部分大流量场景中,尽管已经对Hudi进行了最大程度的调优,但是JM的内存仍然在较高水位波动,还是会间隔性出现内存溢出影响稳定性。这种情况下我们尝试对 state.backend.fs.memory-threshold 参数进行调整;从默认的20KB调整到1KB,JM内存显著下降;同时运行至今state相关数据未产生小文件影响。
1). Bucket index下的BulkInsert优化
0.14版本后支持了bucket表的bulkinsert,实际使用过程中发现分区数很大的情况下,写入延迟耗时与计算资源消耗较高;分析后主要是打开的句柄数较多,不断CPU IO 频繁切换影响写入性能。
因此在hudi内核进行了优化,主要是基于partition path和bucket id组合进行预排序,并提前关闭空闲写入句柄,进而优化cpu资源使用率。
这样原先50分钟的任务能降低到30分钟以内,数据写入性能提高约30% ~ 40%。
优化前:
优化后:
2). 查询优化
0.14版本中,部分情况下分区裁剪会失效,从而导致条件查询往往会扫描不相关的分区,在分区数庞大的情况下,会导致driver OOM,对此问题进行了修复,提高了查询任务的速度和稳定性。
eg:select * from `hudi_test`.`tmp_hudi_test` where day='2023-11-20' and hour=23;
(其中tmp_hudi_test是一张按日期和小时二级分区的表)
修复前:
修复后:
优化后不仅包括减少分区的扫描数目,也减少了一些无效文件RPC的stage。
3). 多种OLAP引擎支持
此外,为了提高MOR表管理的效率,我们禁止了RO/RT表的生成;同时修复了原表的元数据不能正常同步到HMS的缺陷(这种情况下,OLAP引擎例如Presto、StarRocks查询原表数据默认仅支持对RO/RT表的查询,原表查询为空结果)。
图片
1). 序列化问题修复
0.14版本Hudi在文件合并场景中,Compaction的性能相较0.12版本有30%左右的资源优化,比如:原先0.12需要6G资源才能正常启动单个executor的场景下,0.14版本 4G就可以启动并稳定执行任务;但是clustering存在因TypedProperties重复序列化导致的性能缺陷。完善后,clustering的性能得到30%以上的提升。
可以从executor的修复前后的火焰图进行比对。
修复前:
修复后:
2). 分批compaction/clustering
compaction/clustering默认不支持按commits数分批次执行,为了更好的兼容平台调度能力,对compaction/clustering相关procedure进行了改进,支持按批次执行。
同时对其他部分procedure也进行了优化,比如copy_to_table支持了列裁剪拷贝、
delete_procedures支持了批量执行等,降低sparksql的执行时间。
3). clean优化
Hudi0.14 在多分区表的场景下clean的时候很容易OOM,主要是因为构建
HoodieTableFileSystemView的时候需要频繁访问TimelineServer,因产生大量分区信息请求对象导致内存溢出。具体情况如下:
对此我们对partition request Job做了相关优化,将多个task分为多个batch来执行,降低对TimelineSever的内存压力,同时增加了请求前的缓存判断,如果已经缓存的将不会发起请求。
改造后如下:
此外实际情况下还可以在FileSystemViewManager构建过程中将 remoteview 和 secondview 的顺序互调,绝大部分场景下也能避免clean oom的问题,直接优先从secondview中获取分区信息即可。
当前计算平台支持用户表级别生命周期设置,为了提高删除的效率,我们设计实现了直接从目录对数据进行删除的方案,这样的收益有:
删除前会对compaction/clustering等instants的元数据信息进行扫描,经过合法性判断后区分用户需要删除的目录是否存在其中,如果有就保存;否则直接删除。流程如下:
我们分别在流批场景、小文件治理、生命周期管理等方向做了相关优化,上线后的收益主要体现这四个方向:
同时跟进用户实际使用情况,发现了一些有待优化的问题:
针对上述问题,我们也做了如下后续计划:
TOP