百亿大表 Join 提速 300倍!Apache Doris 在约苗数据平台的实时数仓建设实践

用户案例
2023/06/14
约苗平台 李俊龙、马太科技

导读:约苗平台是国内目前最大的成人预防接种管理服务平台。近年来,随着各功能的不断完善,用户数量不断增多,越来越多注册数据、疫苗类别点击数据、页面浏览时长等数据被生成和积累,如何有效利用这些数据进行处理分析,对于约苗提高工作效率、优化运营决策有着不容小觑的作用。基于此约苗平台历经三代架构演进,最终通过 Apache Doris 重构了数据平台架构,统一了数据源出口,实现了近 300 倍的查询提速,目前已在消息系统、运营平台、数据平台、日志系统中得到广泛的应用,接入近百亿的数据量,并且在持续增加中。

四川马太科技有限公司是一家扎根于疾病防控领域,具有专业的研发与运营团队的互联网公司,长期致力于改善和提升中国公众疾病防控水平,助力“健康中国2030”。旗下拥有国内用户量最大的成人疾病预防信息与服务平台“约苗”(以下简称“约苗平台”)。“约苗平台”运用互联网+模式传播健康科普知识,为疾病防控提供先进的服务和管理工具。围绕公共卫生服务机构的疾病预防业务,开展政策、疾病教育及预约服务。发展至今已是国内最大的成人预防接种管理服务平台,作为连接公共卫生服务机构与公众的桥梁,现已连接近 5000 家公共卫生服务机构和 4000 余万用户,累计提供 2000 余万次疫苗预约服务,并产生科普内容阅读数 3.3 亿次。

业务背景

随着约苗平台各功能的不断完善,用户数量不断增多,越来越多注册、疫苗类别点击、页面浏览时长等数据被生成和积累,如何有效利用这些数据进行处理分析,对于提高工作效率、优化运营决策、增加下单率有着不容小觑的作用,因此我们决定搭建约苗数据架构,通过数据分析处理结果赋能业务发展,从而提高企业竞争力。

基于此,我们踏上了数据架构搭建及优化之路,为满足不同场景下的数据处理和分析需求,相关场景对数据架构提出了以下几点要求:

  • 用户行为分析 : 通过用户浏览内容或页面的时长进行用户分层、掌握用户行为喜好,通过对新用户的增量和用户活跃度进行统计,以便工作人员调整运营策略,提高用户留存率和活跃度。在该场景下,要求所有查询需要在 5s 内返回结果。

  • 平台通知: 约苗平台消息推送服务包含 App 推送、内置通知、短信、信公众号推送,在该场景下,期望可达到毫秒级查询响应,解决 C 端通知查询延迟的问题。

  • 市场报表统计: 市场部每天需对相关业务进行报表统计,但由于数据量巨大,通常会出现查询速度较慢的问题,这将严重影响产出计算,期望可以实现秒级查询响应。

为了满足要求,约苗的数据架构已经经历了三代演进。第一代架构基于 Elasticsearch,第二代架构引入了 ClickHouse,目前正在使用的是基于 Apache Doris 的第三代架构。本文将详细介绍这三代架构的演进历程和搭建经验。

基于 Elasticsearch 的第一代架构

约苗实时数仓- Elasticsearch 第一代架构

第一代数据架构是基于 Elasticsearch 来构建的,主要用于处理来自业务各系统和日志系统的数据。其中,业务数据首先存储在 MySQL 中,然后使用 Flink CDC 对 MySQL Binlog 监听,将数据同步到 Elasticsearch 中。当展示层发起聚合请求时,Elasticsearch 中的数据进入聚合层对应的服务中实时计算,最终将结果输出到展示端。

架构搭建完后我们立即投入生产来验证效果,但在使用中我们发现 Elasticsearch 在高并发读取和写入的过程中延迟非常高,而为改善该问题,我们又增设了多套集群配置,但仍然于事无补。除此之外,Elasticsearch 得数据查询性能也会随着数据增长而下降。在这个情况下,如果想要提高 Elasticsearch 响应速度,还需进一步增加集群配置,以提高 Elasticsearch 集群负载能力,成本投入非常大。

引入 ClickHouse 的第二代架构

约苗实时数仓-ClickHouse 的第二代架构

基于上述问题,我们对架构进行了升级。为避免架构演进对代码带来过大冲击,我们保留了第一代架构中基本的数据同步逻辑,在其基础上增加了 Apisix、Kafka、ClickHouse 同步流程,在此基础上对 Flink 同步流程进行了优化。为了降低 Elasticsearch 的压力,我们将日志数据、行为数据和文件系统数据进行了整体的迁移都 ClickHouse 。在 ClickHouse 同步流程中,我们使用 APISIX 的 Kafka-Logger 对行为采集和日志系统的数据进行直接上报,使用同步工具对上报数据进行清洗过滤,最终存储到 ClickHouse中。在使用 Flink 同步时,我们引入了 RabbitMQ 消息组件来保障数据同步的稳定性(因历史原因未使用 Kafka,建议尽量使用统一的消息队列组件)。

在使用过程中我们发现 ClickHouse 性能固然强悍,但需要不断进行调优,工程师的学习成本非常高,同时随着用户的体量的不断上升,用户活跃度的持续提高,ClickHouse 的运维管理和调优成本也逐步攀高。而过高的集群压力再次导致 C端部分业务出现数据同步和查询出现长达 5s 的延迟,这依旧没有解决根本问题。

基于 Doris 的新一代架构

01 选择 Doris 的原因

为彻底解决早期架构出现的问题,我们进行了深度调研,旨在选择更适合我们的实时数据仓库。在初步了解后,我们首先放弃了 Hadoop。一方面,Hadoop 的开发和运维难度较大,会增加我们的工作量;另一方面,Hadoop 更适合处理大数据量的批处理场景,而我们需要支持流批一体,以解决C端用户的查询延迟较高的问题。进一步了解之后,我们排除了 Kudu 选项。这是因为 Kudu 使用成本非常高,特别是在没有主键的情况下,Kudu 会占用大量存储资源。此外,一般情况下,为了满足读性能,Kudu 会牺牲写性能,这与我们的业务需求不符。在多方面的了解和对比后,我们发现 Apache Doris 最符合我们的要求,因为 Doris 具有以下优势:

  • 支持更灵活的查询:Doris 可以支持灵活且多变的数据分析需求,以满足需求方针对不同群体高频次的营销活动。

  • 支持联邦查询:Apache Doris 1.2 版本推出了多源数据目录(Multi-Catalog)通过简单的命令便可以方便地连接到各自外部数据源并自动同步元数据,实现数据出口统一,实现统一的分析体验。

  • 查询性能优异:Doris Join 能力强大,依托列式存储引擎、现代的 MPP 架构、向量化查询引擎、预聚合物化视图、数据索引的实现,在低延迟和高吞吐查询上,都达到了极速性能。

  • 运维难度低:Doris 对于集群和和数据副本管理上做了很多自动化工作,使得集群运维起来非常的简单,几乎可以实现零门槛运维。与此同时,Apache Doris 社区非常活跃,响应迅速,并且 SelectDB 为社区提供了一支专职的工程师团队,免费为用户提供技术支持服务。

02 数据架构的重建

约苗实时数仓-新架构重建

引入 Doris 后,我们对数据架构进行了重构,Doris 的位置及作用可从上方架构图得知。

由图可知,Doris 的数据来源非常多样化,包括通过 Flink-CDC、Binlog 的业务数据,经由 Kafka 的行为数据和日志数据,以及从自研的全量/增量工具同步的其他数据。此外,还有部分通过 Doris Stream Load 和 Routine Load 同步的数据。这些来自不同工具的数据统一由 ETL 处理,最终存储到 Kafka 或 Doris 集群中。其中存储到 Kafka 中的数据会通过 Routine Load 高效写入到 Doris 中,最终由 Doris 统一提供数据服务。Doris 的引入解决了各类同步工具的数据源统一的问题,不再需要维护大量配置,而统一数据出口也让数据处理变得更加简单和高效。

在新一代架构中,Doris 已完全取代了 ClickHouse,相比于 ClickHouse,Doris 具有更好的性能和更广泛的适用性,在约苗所服务的业务已逐步从统计分析扩展到业务系统。基于 Doris 的高性能、高并发的支持,约苗平台的用户端相关业务延迟问题也得到了成功解决,使得数据查询和分析更加及时和准确。

新数据架构搭建经验

在新架构的使用过程中,我们积累了许多实践经验,希望能够与大家分享。以下是我们总结出来的一些经验:

01 Routine Load 实现数据快速导入

在数据导入方面,我们强烈建议使用 Stream Load 和 Routine Load 这两种工具,因为它们的性能远远高于其他导入方式。

Stream Load 和 Routine Load 是 Doris 常见的两种数据导入方式,Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Routine Load 功能支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。

我们曾经尝试过其他的数据导入方式,经过多次压力测试和性能对比,并结合我们的实际数据类型,最终选择了 Stream Load 和 Routine Load 进行数据导入。Routine Load 是一种高效、稳定的数据导入工具,它依赖于 Kafka 并具有极好的可靠性。与其他数据导入方式相比,Routine Load 支持的配置丰富,能够根据配置来过滤和合并数据,同时需要开通的网络策略也更少,能够很好地满足同步数据的需求。最重要的是,Routine Load 可以达到每分钟数百万行的数据导入,完全符合我们对数据导入速度的要求。

在使用 Routine Load 进行数据导入时,难免会出现全量和增量数据同时同步的情况,或者在多线程消费下出现数据积压的情况,这就需要保证数据的一致性。因此我们可以通过 Doris 支持的 Sequence 列来保证数据的一致性,用户可以在导入时指定 Sequence 列,相同 Key 列下,REPLACE 聚合类型的列将按照 Sequence 列的值进行替换,较大值可以替换较小值,反之则无法替换。该方法将顺序的确定交给了用户,由用户控制替换顺序。具体的实现方式如下:

  1. 在定义表时,先定义一个列,比如定义为 order_id

  2. 接着在创建 Routine Load 时指定根据 order_id 列进行写入,以保证数据写入顺序。

  3. 在写入数据时,需要保证每条数据都拥有 order_id 列有值。

  4. 在进行全量同步时,可以使用 Java-sync 来实现,全量同步时需要保证 order_id 始终为 0。

  5. 在进行增量同步时,可以使用读取 Binlog 的方式进行同步,增量同步的 order_id 需要从 1 开始。另外可以记录增量同步的 order_id 值,以保证数据准确写入 Unique 表。

02 分区分桶和查询实践

在使用 Doris 进行数据存储时,建表时需要尽量贴近实际使用场景,以此来创建分区列和分桶列。这里将分享我们我们在创建表时的一些经验。

  • 分区:分区是建立一张大表时非常关键的因素,它直接影响到可以检索到的数据大小和需要处理的数据量。如果一个分区的数据量太大,那么就需要更多的 CPU 和内存来处理,从而导致查询效率降低。如果利用时间分区,可以考虑使用动态分区来让 Doris 自行管理分区。动态分区可以根据时间自动创建新的分区,并删除旧的分区,从而保持数据的最新性和存储效率。同时,动态分区还可以帮助我们保留部分数据或者区分冷热数据,从而更好地管理数据存储和查询。
  • 分桶:考虑在一个分区下如何让分桶足够均衡,是每个表必须要做的事情。如果分桶列数太多,可能会影响点查询的性能,如果分桶不够均衡,查询速度会受到最大桶处理时间的影响。因此需要在性能和查询需求之间做出取舍。另外,推荐大家使用Apache Doris 1.2.2 版本的 Auto Bucket 自动分桶推算功能,分桶个数不再依赖于人工设置,通过规则的智能计算即可保证合理的数据划分,降低用户学习成本的同时还可以最大化提升用户开发效率。

举例来说,当我们需要记录行为记录宽表时,一般会使用 Duplicate 模型进行记录。在建表时,我们可以把report_timeevent_idproduct_idplate_iduser_id 作为 Key 列,并使用 report_time 按月进行动态分区,每月记录数据 1.5 TB 左右,使用 product_idplate_idevent_id 进行自动分桶。

在使用过程中我们发现,当某一类事件远远超过了其他事件的总数时,会严重导致数据倾斜严重。为了解决这个问题,我们增加了在分桶列后新增了user_id,从而达到了数据均衡的效果。通过这一优化,查询速度由原来的 13s 提升到 5s。另外我们在这个基础上使用了物化视图进一步对查询性能进行优化,物化视图介入之后,查询速度从 5 秒提升到毫秒级别。

Doris 在约苗的应用实践

01 灵活组合查询,提高广告投放效率

由于疫苗的特殊性,业务侧同学在推广的过程中需要更精准的用户信息,例如年龄、性别、地区等业务同学指定的信息。这种情况下,数据平台就需要提供详细的用户群体信息,以帮助业务同学实现精细化信息触达。

在接入 Doris 后,我们主要利用 Doris 的 Join 实现了一套完全可以支持任意字段组合查询的工具。类似于常见的 SQL 管理工具,不同的是,无需自行编写 SQL,只需选择需要的字段进行组合配置即可查询,得到想要的结果,以此来满足需求多变的业务场景,从而提高业务侧工作效率。同时,Doris 还具有高效的数据写入和查询能力,使得运营人员可以更加快速地获取所需数据,进行数据分析和应用开发。

基于此,业务同学可以通过多种业务数据联合检索,快速筛选出满足自己需求的用户信息,并导出最终检索结果。例如,可以对年龄在 19 岁至 45 岁之间,浏览过 HPV 九价疫苗页面并且页面停留时长大于 1 分钟的用户数据进行检索,并导出结果。通过这一方式,业务同学可以快速获取目标用户信息,从而精准地进行营销推广和活动推送。

02 联邦查询实践,百万数据分钟级导入

在使用 Doris 之前,某些业务场景需要数据组的同事自行编写大量复杂代码来完成数据导入任务。在接入 Doris 之后,我们利用 Muti-Catalog 功能建立了 ES 和 MySQL 的外表,在遇到数据导入任务时,我们仅需要花几分钟时间编写 SQL,依靠 Doris 快速完成数据导入任务。同样的数据导入任务(80 万数据的)之前需要 1 -2 天的时间完成,而使用 Doris 之后,仅仅需要几分钟即可完成数据导入。

除此之外,Apache Doris 提供的 Multi Catalog 功能也帮助我们的统一了来自多种同步工具的数据源,成功统一了数据出口,使得数据处理变得更加简单和高效,同时我们也无需投入过多的人力和精力去维护大量其他数据相关的配置,极大的节约了成本的投入。

03 Join 能力加持,实现 300 倍查询速度提升

除此之外,Doris 的 Join 性能十分优异,当 Doris 集群完成业务数据的全同步后,我们对 1亿 和近百亿的两张表进行 Join 操作,可以在 5 秒内输出数据结果,相较之前有接近 300 倍的查询速度提升。为了验证其是否偶然,我们接着对 10 张分别有 4000万数据的表进行 Join,Doris 可以在 10s 内返回查询结果。而在物化视图加持下,可以达到毫秒级别的查询响应。这充分说明了 Doris 在处理大规模数据的优势。

04 极低运维成本,助力降本提效

在之前的架构中,我们使用 ClickHouse 进行数据存储和查询,并且需要单独维护Zookeeper 来管理集群的配置信息和状态信息,这些都增加了系统的运维成本和难度。而 Doris 架构简单,只有 FE 和 BE 两个进程,扩缩容快捷方便,运维难度和成本得到极大的降低,此外,Doris 还提供了更加友好和易用的数据备份和恢复功能,可有效保障数据的安全性和完整性。

总结规划

当前约苗已经基于 Apache Doris 搭建了一套完整的实时数据仓库,并在消息系统、运营平台、数据平台、日志系统中得到广泛的应用,目前已经接入百亿级别的数据量,并且在持续增加中。未来我们将基于 Doris 不断探索,扩大 Doris 使用范围,逐步推广至秒杀、订阅、黑名单、预约等各个业务场景,同时我们也将尝试使用 Doris 的新特性,以实践结果回馈社区,为社区发展献一份力,为社区做出实质性的贡献。

最后,感谢 SelectDB 技术团队长期以来快速响应和技术支持,为我们稳定高效应用 Doris 保驾护航。