您当前位置:资讯中心 >云计算 >浏览文章

不可思议!亿级数据竟然如此轻松同步至ES!

来源:CTO 日期:2024/4/7 0:00:00 阅读量:(0)

1 这是一个背景

最近接了一个需求,要提供一个随意组合多个条件来查询订单数据的功能,看着数据库里过亿的订单量,头发不争气的又脱落了两根代表这个需求不简单。

图片图片

脱落的两根头发,不是技术实现上很难,其实技术实现上清晰明了,就是通过数据异构,将数据同步到ES,利用ES的倒排索引、缓存等能力,提供多条件复杂查询的能力,而ES集群我们已经有了。

但有些数据,在目前的ES索引中是不存在的,也就是说,我需要将过亿的订单数据从订单数据库重新刷一遍到ES中,而这一顿操作下来得需要一周的时间!

什么?你不信,那咱们来捋一捋。

2 捋一捋订单数据同步到ES中的复杂度

2.1 数据同步ES索引流程

图片图片

如上图所示,就是将数据同步到ES索引的过程。

首先需要从订单数据库查询所有的订单数据,然后根据订单数据上保存的用户ID,商品ID等信息从用户服务,商品服务查询相关信息,经过处理与组装后落到ES集群中。

之所以要查询用户信息和商品信息,是因为异构在ES索引中的订单数据,并不会与mysql中的数据一一对应,有很多根据商品类目,用户信息等查询订单信息的诉求存在,因此在这里就需要查询很多的上游服务来组装信息。

2.2 来梳理下是否有难点?

  1. 从数据库把上亿的订单数据读取出来。这个操作不能影响到线上业务,因此查询的订单数据库一般是从库,OK,配置多数据源来读取数据吧,而且上亿的订单一般采用的都是分库分表来存储的,我们是分了16个库,每个库16个表,总共256张表,嘿嘿。
  2. 上亿的订单数据不能一次性全部读取到内存吧,不然内存冒烟都存不下啊。所以得考虑分页,分页直接limit也不好,随着数据量越大,速度越慢,所以得考虑一个游标,嗯,选一个字段当游标吧,游标最好唯一且递增。
  3. 从多个服务获取数据,这些数据所在的服务一般都属于公司的其它部门,读取数据的时候也不能影响到人家的服务吧,你这里查询的是嘎嘎猛,一看人家的服务都崩了,这个黑锅就飞来了。所以这里得考虑限流吧,得考虑隔离吧?不说全链路隔离,成本太高,起码关键服务得隔离一下。
  4. 数据同步一段时间,产品来问,同步多久了啊,大概还有多久能完成啊,数据量大概是多少啊,一脸懵,不知道啊。
  5. 如果中途同步失败了,咋处理啊,是不是得重试,咋重试,重试策略是啥?失败有没有报警,能不能及时感知并处理啊?如果同步一段时间中断了咋整啊?有没有记录从哪中断的?能否从中断处继续同步啊,不然从头开始又得N天,哭了。
  6. 同步了一部分,发现有问题需要暂停一会,咋整?
  7. 如果只想同步部分数据不一致的订单数据,可能就2,3个订单,咋整,是不是还得提供按照手动输入订单ID同步ES数据的能力?
  8. 同步过程是咋样的?开始时间?结束时间?共耗时多久?操作人是谁?这些统计数据从哪来?
  9. 想夜深人静的时候同步数据,这有时候对业务的影响小,定个闹钟晚上起?
  10. 现在不单需要同步订单的数据了,还需要同步商品ES集群的数据,这些逻辑还得重新写一遍?

啊啊啊啊,想想都头疼啊!

所以,一些事情看着简单,其实并没有那么简单。

3 神奇的服务

为了让头发更有归属感,针对上述的难点开发了一款神奇的服务,那就是ECP。它可以将整个流程自动化、可视化的处理,降低数据异构到ES的成本任务界面如下所示:

图片图片

3.1 ECP的简单运行流程

简单来说,ECP的作用就是将数据从数据源读取出来,然后推送给ES写服务。因为数据处理的逻辑因不同的业务而异,ES写服务由各个对接方来实现,因此一个简单的流程如下图:

图片图片

这里面涉及到一些技术细节,比如如何进行多数据源数据读取,数据源配置,sql校验,动态限流、SPI机制、重试策略与故障感知、探活与故障恢复,环境隔离等等。

下面一一介绍下:

3.2 多数据源数据读取

ECP支持目前支持三个数据源数据的读取,分别为ID源,文本源、以及脚本源。

3.2.1 ID源

有个文本框用来输入ID。这种场景适用于小数据的数据同步,比如发现一些数据库和ES的数据不一致了,就简单的刷一下数据。

图片图片

3.2.2 文件源

文件源指的是数据源来源于文本文件,适合中等数据的同步。ECP和对象存储进行了对接,用户可以上传文件至对象存储,在任务执行时,ECP会读取对象存储中的文本数据。

这种情况需要注意的是,用户上传的文件有可能会比较大,直接都读取到内存再处理不现实,因此这里采用的是流的方式进行读取,读取一批处理一批,再释放一批,不会造成OOM。

图片图片

简化的处理方式如下:

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }

  // 以流的方式读取文件数据
  InputStream inputStream = response.body().byteStream();
  BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

}
关键字:
声明:我公司网站部分信息和资讯来自于网络,若涉及版权相关问题请致电(63937922)或在线提交留言告知,我们会第一时间屏蔽删除。
有价值
0% (0)
无价值
0% (10)

分享转发:

发表评论请先登录后发表评论。愿您的每句评论,都能给大家的生活添色彩,带来共鸣,带来思索,带来快乐。