合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
图片
爱奇艺大数据应用分为两个方面:
爱奇艺大数据实时应用包括实时广告系统、实时推荐和搜索、实时热度,以及实时风控。下方右图是应用搜索界面,搜索推荐基于用户及其他个性化信息实时计算生成。
图片
图片
以上提及的产品通过爱奇艺大数据服务体系实现。如上图,爱奇艺大数据服务体系主要由数据采集、数据处理、数据应用等方面构成。
我们的集群部署模式由通用集群和少量专用集群组成,总共有1万多台机器、500PB以上的存储,每天会运行50多万个批处理任务及4000多个流计算任务。
图片
技术演进过程分为三个阶段:
第一阶段是比较原始的Hive on MapReduce。在此阶段,我们借助Hive工具实现了SQL化的分析,通过SQL在Hadoop上构建离线数仓。SQL避免用户自己写MapReduce的Java代码,解决了大数据的初步问题之一。但随着业务发展、实时性需求增加,离线分析的处理延时难以满足业务需要。
第二阶段采用基于Spark SQL分析,大大加速离线数仓的构建进程,同时也探索基于Flink的实时数据处理。这个阶段我们初步引入Flink,用户直接写Flink任务代码,相比于基于SQL的离线分析,开发和运维的难度高了不少。同时,因为需要维护离线和实时两条链路,成本较高,且存在流与批数据不一致等问题。
第三阶段主要进行两项工作:一方面是实时计算SQL化,我们引入了统一元数据,简化了Flink SQL的开发,使得撰写实时计算的逻辑与Hive SQL一样简单;另一方面,引入数据湖Iceberg,初步实现流批一体。
爱奇艺的Spark SQL服务基于开源的Apache Kyuubi搭建,因为直接使用Spark Thrift Server服务有很多缺点,比如不支持多租户、资源隔离较难实现等。
图片
引入Kyuubi后,整体架构如上图所示。我们在Hadoop集群上层搭建了Kyuubi Server集群,再上层通过Pilot统一SQL网关(自研服务)接入,最上层是离线计算的Gear工作流调度系统和魔镜即席查询平台,分别承接定时工作流以及Ad-hoc的即席查询。
除此之外,Kyuubi Server和Spark任务引擎会注册到ZooKeeper服务发现集群中,供其调用方进行服务发现,由此实现了高可用性,去除了单点故障。
基于Kyuubi的这套体系具有以下6个好处:
爱奇艺在作为Apache Kyuubi用户的同时,也积极参与社区讨论,回馈社区,目前共有70多个patches被社区接受。
图片
除了通过Kyuubi建立Spark SQL的服务之外,我们也对Spark本身进行了优化,使得计算速度更快,资源更节省。
由于我们平台上每天都会运行大量的任务,用户很难为每个任务配置一个合适的资源量,因此经常出现任务的并行度不足或资源浪费的问题。DRA是Spark提供的已经比较成熟的功能,开启之后能够根据并行度需求自动申请或释放资源,在避免资源浪费的同时,还能在一定程度上加快任务运行。
AQE的定义是根据Spark任务运行时的数据,动态修改查询计划。因此它是一个优化框架,而非特定功能,用户可以扩展各种优化规则。
社区版Spark自带的优化规则包括Shuffle分区合并、自动转换为广播Join、Join倾斜优化等。我们基于Kyuubi进行功能扩展,比如自动合并小文件、末级Stage配置隔离等。其中,末级Stage配置隔离是一个非常好用的优化规则,它允许在配置层面,为普通Stage和末级Stage分别配置处理并行度。
这样,我们可以在末级Stage上按目标文件大小设置并行度,以合并小文件;在普通Stages上配置较高的并行度,加速任务处理,达成两者兼顾的效果。
爱奇艺内部使用Apache Atlas管理数据血缘,为此我们将其元数据和血缘投递的逻辑集成到了Kyuubi中,使得Kyuubi在运行Spark SQL任务时,能够自动向Atlas投递血缘。我们已将这一功能贡献给了社区(KYUUBI #4814),将在即将发布的Kyuubi V1.8版本中可用。
在Spark中使用Remote Shuffle Service是近两年来比较流行的一个趋势。爱奇艺采用的是Apache Uniffle这个开源产品。
在引入Apache Uniffle前,存在两种问题:一个是Shuffle不稳定,比如大数据量情况下,下载数据失败,出现fetch failure的报错;另一个是存算分离的云原生架构下,计算节点容量、IO性能不足。
引入Apache Uniffle后,原有问题得到改善:
爱奇艺作为Apache Uniffle的共同贡献者,深度参与了社区讨论和贡献。欢迎大家试用并提出反馈意见。
图片
在支持Spark SQL后,已有的大量Hive任务需要迁移过来。迁移过程会遇到两种问题:
为了解决迁移的问题,我们基于Pilot SQL网关开发了“双跑对数”的功能,在迁移前自动预测迁移结果,运行步骤如下:
使用“双跑对数”功能之后,我们在迁移的过程中发现了一些问题,其中有部分可以通过优化Spark SQL的兼容性来解决,进一步降低用户迁移的工作量:
用户在Hive中设置很多参数,比如reduce的个数,但这些参数在Spark中原本无法被识别,我们通过参数映射,将其转化为Spark的相应参数,尽可能保留用户的SQL逻辑。
最后,使用“自动降级”功能令迁移顺利进行,即首次使用Spark运行失败后,重试时降级为Hive引擎。由此,迁移分为两个阶段:第一阶段开启自动降级,用户可以放心迁移,并通过降级的记录梳理出迁移失败的任务;第二阶段,将这些失败的任务修复后,再完全切换到Spark。
目前Hive迁移的总体进度已经达成90%,对于这些迁移的任务,平均性能提升了67%,资源(包括CPU、内存使用量)也降低了近一半。
图片
在使用Spark SQL提高实时性的同时,我们也尝试引入Flink SQL,希望能够真正做到实时计算。但原生的Flink SQL如上示左图,比Hive SQL长很多,需要定义输入输出表,字段名称和类型,以及背后的数据源配置。应如何解决使用过程繁琐的问题?
我们引入了“统一元数据中心”的概念,类似于Hive的Metastore。因为Hive具有Metastore,所以无需反复定义输入输出表,写SQL非常简单,如上图中写三行语句即可。
我们将内部的各种数据,包括流式的Kafka和RocketMQ,传统数据库MySQL、Redis、HBase,以及数据湖产品,都集成到统一元数据中心,并开发了Flink Catalog、Flink Connectors与其对接。这样依赖,我们无需在每个任务中,重新定义表的结构以及连接串等信息,做到开箱即用,有效提升开发效率。
可能有同学会有疑问,SQL到底能否足够表达流计算?
因为传统SQL(比如Hive、MySQL),输入是一个表,输出也是一个表,从表到表的SQL究竟能否表达流式的计算逻辑?我们认为是可以的。
这个观点具有理论支撑,一位来自Google的工程师Akidau在其著作《Streaming Systems》中,提出了流和表的“相对论”。他认为流和表本质上是数据的两种表现形式。他拆解了传统SQL表到表的过程,将其拆分为表到流、流到表、流到流三种操作的组合。
以上图右边的SQL举例,输入是一组用户得分,按照团队进行聚合,计算出每个团队的总分,输出到新的表中。它的输入表由4个字段组成:用户、团队、得分和时间。
让我们来拆解这个SQL的执行逻辑(假设这是离线计算)。首先,原始表并不是一次性加载到内存的,而是通过一个SCAN算子,一条一条地读入,变成内部的流。然后经过SELECT算子,去掉无用字段,保留team和score字段,得到了一个新的流。
最后,流的数据全部到齐后,一次性计算聚合的值,即把每个team的所有分数相加得到总分,再输出到目标表。由此看出,第一个操作SCAN是表到流,第二个操作SELECT是流到流,第三个操作GROUP BY是流到表。
从上面的SQL执行逻辑拆解可以看出,将传统SQL描述为表到表的操作,黑盒地看是对的,但在微观层面是不准确的,实际上是表到流、流到表、流到流三种操作的组合,唯一不存在的是直接的流到流的操作。
流计算的过程中包括很多要素,比如Map或Filter可以认为是一个流到流的操作,分组的聚合或窗口的聚合,就是流到表的过程;而通过定时的trigger或Watermark引起的trigger,是表到流的过程。
当把上面的SQL看成流计算时,会发现其拆解过程与离线计算一模一样,都是由SCAN(表到流)、SELECT(流到流)、GROUP BY(流到表)组成的。
因此,SQL对于流计算和离线计算来说,没有本质区别,所以它非常适合流计算的开发。SQL开发优势如下:
在爱奇艺的实时计算平台上,目前SQL的任务占比已经达到2/3,已经能覆盖大部分的功能,所以较推荐内部用户使用SQL进行流计算的开发。
图片
我们在存储侧也做了技术革新。传统方案使用Lambda架构,即离线一条通路、实时一条通路,在下游合并这两条通路。但这种架构存在明显问题:
我们通过引入数据湖技术,可以做到流批一体架构,即使用Flink与数据湖交互,实时写入、实时更新。数据湖技术解决了两条链路、实时性、以及实时通路容量不足的问题。由于无需维护两条通路,计算成本与存储成本比之前的模式更低。
图片
爱奇艺选择的数据湖产品是Apache Iceberg,其具体好处将通过案例介绍。
上图是会员订单分析的应用场景。爱奇艺的会员业务有10多年的历史,每个会员订单都对应一条记录,订单表存储在MySQL中,这些表非常大。会员团队进行用户会员运营分析时,如果直接用MySQL对这些表进行查询,速度非常慢,因为MySQL对这种OLAP分析的场景支持不佳。
最原始的方案是通路1(上图标号1和2),先用内部数据集成工具BabelX将MySQL表全量导出到Hive,再使用Hive、Spark SQL或Impala查询。这条通路的问题是,MySQL的全量导出是一个天级别的任务,数据分析的时效性很差;每次导出的数据量很大,对MySQL产生很大压力;每天都在反复导出相同的数据,效率很低。
后来会员团队和我们合作了另外一条通路2(即上图标号3和4),通过内部工具,将MySQL的变更流实时导出到Kudu,用Impala进行查询。Kudu介于HDFS和HBase之间,既有实时写入的能力,又有分析型查询能力。这条通路的问题在于:
基于这些痛点,我们调研后发现Iceberg比较适合完成这种任务,我们选择了图中最下面的新通路:通过内部的RCP平台,使用Flink CDC技术实时导出到Iceberg中,在下游使用Spark SQL进行查询。
改造效果如下:
在查询性能上,我们做了两处优化:
1)小文件智能合并:Iceberg表在写入过程中会产生很多小文件,积累到一定程度会严重影响查询性能;而合并小文件时,如果每次都全表合并,又会造成严重的写放大。为此我们开发了智能合并策略,基于分区下文件大小均方差,自动选择待合并的分区,最大程度地避免了写放大。
2)写Parquet文件开启BloomFilter:BloomFilter可以判断一组数据中是否不含指定数据,被Parquet等存储格式广泛使用,用来降低读取数据量。爱奇艺将这一特性集成到Iceberg中,在写Parquet文件时允许开启BloomFilter,在内部场景中取得了很好的效果。这一功能已贡献给社区(PR #4831)。
最终,查询的时间从900秒降低到10秒,达到了交互式查询的性能,很好地满足了会员运营分析的需求。
图片
爱奇艺的实时计算主要又两个平台:负责通用型计算任务的RCP实时计算平台、负责特定分析型需求的RAP实时分析平台。
基于原始数据,可通过RCP进行通用分析,将结果写入新的流、数据库或Iceberg,供线上服务和数据分析直接使用。如需根据事件流,制作实时报表等特定的复杂目标,可使用RAP平台。
图片
RCP(Real-time Computing Platform,爱奇艺统一实时计算平台)的特点是:
如上图架构所示,Server层负责资源管理、任务管理、任务提交、监控报警等功能。Launcher层负责直接提交任务到运行集群,这一层包含内部的Flink版本和Spark版本,对于Flink,又包含了JAR/SQL/DAG引擎、接入统一元数据、以及各种数据源的connector。
图片
RCP平台能结合各个数据库的Connnector,将传统数据库接入实时计算。
上图是针对广告库存计算的实时化改造。业务需要对多个MySQL表做Join,写入Redis中,供下游的实时任务查询。
原有方案是,使用Spark批处理作业,每10分钟全量拉取这些MySQL表,在Spark任务里进行Join。这个方案的问题是,每10分钟进行全量拉表,对MySQL压力较大,且整体写入Redis的数据时效性较差,至少延迟10分钟,这会导致业务数据的准确性下降。
改造后的方案见图中绿框,我们采用Flink CDC的方案。在Flink任务中配置三个CDC的source,由此实现对MySQL全量同步以及自动转增量拉取的过程;紧接着一个Join节点,负责实时计算Join结果。如此一来,Join的输出是实时更新的结果,上游MySQL表的更新会实时地反映到Redis中。
改造效果:
RCP支持了各类CDC connector,降低了将数据库接入实时计算的门槛,主要的优势有:
RCP平台支持故障诊断功能。针对单个任务,平台可自助排查故障原因,如下图所示:
图片
如下图所示,平台展示了该任务上游、下游的血缘关系。
图片
如果需要分析整条链路,平台也提供了一键链路诊断的功能。只需点击一下,即可对链路上的所有实时计算作业,进行健康度情况分析,获取其最近的重启次数和消费延迟等信息。
图片
图片
爱奇艺RAP(Real-time Analytics Platform)实时分析平台,提供一站式的大数据摄取、计算和分析能力,支持超大规模实时数据多维度的分析,并生成分钟级延时的可视化报表。主要特色是:
RAP的架构包含4个模块:
1)典型案例
图片
上图是一张直播报表,展示直播实时的卡顿比(HCDN团队)情况。
右图是直播期间每分钟的总UV值(同步在线人数),只需三个步骤就能完成该报表的配置:
总体来说,只需少量的页面操作,即可配置一张实时报表,整个过程非常迅速。原先使用通用型工具开发此类报表,可能需要一周时间,但在RAP进行配置,仅需小时级别的时间,并且支持灵活的需求变更。目前,RAP平台已在爱奇艺的直播、会员监控等业务中广泛应用。
下一步,爱奇艺实时计算的发展方向包括:
Q1:实时计算平台后续演进规划是怎样的?
A1:进一步提升SQL化开发的成熟度,优化调试和诊断功能,对Flink SQL进行性能优化,流批一体。
Q2:数据服务支持实时计算的同时,能否保存所有数据?存储有期限吗?
A2:可以,Iceberg存储利用HDFS集群实现,其容量非常大。但用户仍需要配置期限,无论何种数据、何种容量,所有数据都无限保存是不实际的,成本方面也不经济。
Q3:实时计算场景有必要提升到秒级延迟吗?
A3:延迟级别由业务场景决定。比如天级别的运营报表本身具有意义,如果提升到秒级,数据量非常大,就失去了统计意义。但如果是前文分享的广告案例,数据越实时,准确性越高,业务上的效果就越好。
Q4:RCP和Apache DolphinScheduler一样吗?
A4:不一样,DolphinScheduler具有工作流调度的功能,RCP主要负责实时计算的流任务管理。
刘骋昺
TOP