合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
数据处理效率一直是大数据时代的核心话题,它推动着各类数据执行引擎持续迭代产品。从早期的 MapReduce,到今天的 Spark,各行业正不断演进其离线数仓技术架构。
现有以 Spark 为核心的数仓架构在处理大规模数据回刷方面已取得进展,但在资源和时间消耗上仍面临挑战。为了突破这些限制,小红书数据仓库团队将 StarRocks 融入到离线处理流程,替换掉部分 Spark 处理的任务,并优化较为耗时的 Cube 计算,大幅度提高了数据的执行效率。
实践证明,经过改造的离线处理链路,可以有效降低任务资源消耗,提前数据产出时间。将作业执行时间从小时级压缩至分钟级,计算资源使用量降低 90% 以上,日数据产出时间提前 1.5 小时,回刷时间减少 90%,回刷成本减少 99% 以上。
为了更好地管理和使用数据,离线数仓一般会通过分层设计,确保数据高效利用。
离线数仓一般以 Spark 引擎作为主力,它负责数据的清洗、关联和聚合,完成所有数据模型的建设。随后,通过 DTS 任务将 APP 层的数据导入到 OLAP 集群中。小红书主流的 OLAP 引擎包括 StarRocks 和 ClickHouse,它们凭借 OLAP 引擎的查询能力,为数据产品、分析看板和业务工具提供数据查询服务。
虽然 Spark 引擎以其强大的吞吐量和稳定性在离线数仓中被广泛使用,但它在数据查询优化方面存在局限。Spark 并不直接管理数据的分布、存储格式或元信息,无法结合数据存储格式和数据元信息进行查询优化。此外,为了确保稳定性,Spark 在跨节点数据传输时需要将数据写入磁盘,这在大规模数据回刷时会导致资源消耗巨大和处理周期延长。
从本质上来看,Spark 仅仅是一个数据处理引擎,而不是一个理想的数据仓库分析引擎。在实际应用中,这种性能瓶颈尤为明显,开销较大。例如,以交易运营行业为例,若要回刷两年的数据,则需要占用相当于 7 万台机器近 30 天的资源,成本高达上百万元。这种定期数据回刷产生的巨额成本,已经成为数据仓库团队不得不面临的问题。
与 Spark 这类数据处理引擎不同,基于 MPP 架构的 OLAP 引擎在数据查询方面是更具优势的。市面上常见的 OLAP 引擎主要有两个:ClickHouse 和 StarRocks。
ClickHouse 是一个开源的列式数据库管理系统,可用于 OLAP 分析。它采用列式存储,与传统的行式存储相比,这种设计在处理分析型查询时更为高效,因为它能够快速读取和聚合列数据,无需加载整个行。ClickHouse 的 MPP 架构允许查询任务被拆分为多个子任务,并在集群的多个节点上并行执行。每个节点都配备独立的的处理器和存储资源,使得系统能够充分利用集群的计算和存储能力,大幅提升查询速度和系统吞吐量。此外,ClickHouse 的 MPP 架构还支持数据复制和分片,提高数据的可用性和查询性能。即使某个节点发生故障,其他节点也能迅速接管任务,确保服务的连续性。ClickHouse 是用 C++ 编写的,它在单核性能上进行了深度优化。
StarRocks 也是一款高性能分析型数据仓库,可实现多维、实时、高并发的数据分析。StarRocks 采用了向量化、MPP 架构、CBO 优化器、智能物化视图和列式存储引擎等先进技术,因此与同类产品相比,在查询效率上具有较大优势。StarRocks 能够高效地从各类实时和离线数据源导入数据,并直接分析数据湖中的多种格式数据。StarRocks 兼容 MySQL 协议,常用 BI 工具能轻松接入。此外,StarRocks 支持水平扩展,确保了高可用性、可靠性和易于维护。
在小红书内部,StarRocks 版本以存算一体架构为主,其中前端(FE)负责元数据管理和构建执行计划,而后端(BE)则负责数据存储和计算。这种架构使得查询能够直接在 BE 节点上本地执行,避免数据传输与拷贝开销,从而实现极速的查询分析性能。存算一体架构还支持数据的多副本存储,提升了集群在高并发环境下的查询能力和数据可靠性。
StarRocks 对算子和函数进行了向量化加速,并通过 Pipeline 调度框架,充分利用多核计算能力,提升查询性能。虽然 StarRocks 和 ClickHouse 在单表查询性能上相近,但 ClickHouse 在查询并发和不支持分布式 Join 的局限性,使其不适合作为生产数仓模型的查询加速引擎。因此,我们选择了 StarRocks 替换原有的 Cube 计算,期望在数据处理和分析方面达到更高的性能和效率。
为了提升离线数仓的产出效率,我们对架构进行如下优化:
计算 UV 的一般方式是使用 count distinct ,它能够保留原始数据的明细,有较高的灵活性。然而,由于在查询执行的过程中需要进行多次 shuffle(跨节点通过网络传输数据),会导致查询性能随着数据量增大而直线下降。
以下面的 SQL 为例,示例 1 :
select
seller_level,
count(distinct if(buy_num>0, user_id,null)) buy_uv,
count(distinct if(imp_num>0, user_id,null)) imp_uv,
count(distinct if(click_num>0, user_id,null)) click_uv
from
tb
group by seller_level
TOP