您的位置 首页 > 德语词汇

compaction是什么意思、读音(Apache DorisDB详解)

大家好,关于compaction是什么意思、读音很多朋友都还不太明白,今天小编就来为大家分享关于Apache DorisDB详解的知识,希望对各位有所帮助!

compaction是什么意思、读音(Apache DorisDB详解)

Doris(原百度Palo)是一款基于大规模并行处理技术的分布式SQL数据库,由百度在2017年开源,2018年8月进入Apache孵化器。

本次将主要从以下三部分介绍ApacheDoris.

我们首先看一下Doris整个的定位。

上图是整个Doris的具体使用场景,主要是它的接收数据源,以及它的一个整体的模块,还有最后它的一个可视化的呈现。后面会有一张更详细的图去介绍它整个的来源,以及最后可以输出的数据流向。

上图是我们开源之后的状况。目前有几家公司已经把Doris用到生产环境里面,主要是小米和美团,他们最近也在跟我们一起做一些联合的开发。

这是上文所说的,Doris更具体的数据流向。用户数据可以存在HDFS或者Kafka上,或者在对象存储上,百度内部我们叫BOS,外部应用最广的对象存储系统就是AmazonS3。我们可以把这个数据读到Doris里面来,然后在Doris里面对数据做各种的存储,以及做各种的多副本管理,最后可以在上面接一个可视化工具,用来展示报表以及各种多维分析的检索的效果。这对于业务方在做可视化分析的时候效果是比较明显的。

前面主要介绍了Doris整体的定位,以及它目前在外部的公司的使用状况,下面我们来介绍一下它的关键技术。

Doris的架构很简洁,只设FE(Frontend)、BE(Backend)两种角色、两个进程,不依赖于外部组件,方便部署和运维。

FE主要有有三个角色,一个是leader,一个是follower,还有一个observer。leader跟follower,主要是用来达到元数据的高可用,保证单节点宕机的情况下,元数据能够实时地在线恢复,而不影响整个服务。

右边observer只是用来扩展查询节点,就是说如果在发现集群压力非常大的情况下,需要去扩展整个查询的能力,那么可以加observer的节点。observer不参与任何的写入,只参与读取。

数据的可靠性由BE保证,BE会对整个数据存储多副本或者是三副本。副本数可根据需求动态调整。

元数据层面,Doris采用Paxos协议以及Memory+Checkpoint+Journal的机制来确保元数据的高性能及高可靠。元数据的每次更新,都首先写入到磁盘的日志文件中,然后再写到内存中,最后定期checkpoint到本地磁盘上。我们相当于是一个纯内存的一个结构,也就是说所有的元数据都会缓存在内存之中,从而保证FE在宕机后能够快速恢复元数据,而且不丢失元数据。Leader、follower和observer它们三个构成一个可靠的服务,这样如果发生节点宕机的情况,在百度内部的话,我们一般是部署一个leader两个follower,外部公司目前来说基本上也是这么部署的。就是说三个节点去达到一个高可用服务。以我们的经验来说,单机的节点故障的时候其实基本上三个就够了,因为FE节点毕竟它只存了一份元数据,它的压力不大,所以如果FE太多的时候它会去消耗机器资源,所以多数情况下三个就足够了,可以达到一个很高可用的元数据服务。

前面介绍了元数据,下面我们来介绍下整个数据在集群中的分布。

数据主要都是存储在BE里面,BE节点上物理数据的可靠性通过多副本来实现,默认是3副本,副本数可配置且可随时动态调整,满足不同可用性级别的业务需求。FE调度BE上副本的分布与补齐。

如果说用户对可用性要求不高,而对资源的消耗比较敏感的话,我们可以在建表的时候选择建两副本或者一副本。比如在百度云上我们给用户建表的时候,有些用户对它的整个资源消耗比较敏感,因为他要付费,所以他可能会建两副本。但是我们一般不太建议用户建一副本,因为一副本的情况下可能一旦机器出问题了,数据直接就丢了,很难再恢复。我们在公司内部的话,一般是默认建三副本,这样基本可以保证一台机器单机节点宕机的情况下不会影响整个服务的正常运作。

这个就是前面说的,Doris和用户交互的协议部分,它主要兼容一个MySQL的协议。

前面基本介绍了Doris的整体架构,以及它的FE跟BE节点所达到的目的。Doris它的架构是比较简单的,编译部署完之后,也只有FE跟BE两个进程,把FE跟BE进程一分发,就可以启动服务了。它不依赖于Hadoop,也不依赖其他的外部组件,数据都是自成一体的,所以可以很方便地部署启动。

下面就会涉及一些Doris查询跟存储的一些具体的技术。

我先说查询的部分,因为用户可能跟查询打交道多一点,因为存储毕竟是一个更下面的东西。

我们最早是借鉴了Impala的查询引擎,把它改造了一下引入到Doris里面形成一个分布式的查询引擎。而我们所做的主要的改造主要就是把Impala的架构给改了。因为如果大家熟悉Impala的话,会知道Impala是一个完全的P2P的结构,每个节点都缓存元数据,对于一个高性能的报表分析来说,它有可能会面临着元数据落后的问题。所以我们把它查询规划所有的部分,都放到了一个FE里面,即在这个图中所看到的整个的逻辑规划,都会由FE来完成。FE来根据用户的查询生成一个完整的逻辑规划,然后这个逻辑规划最后生成一个分布式的逻辑规划,会发给整个集群去执行。

可以看一个具体的例子。上图这样一个查询,展示了最后生成的物理规划是什么样的。左下角这个图就是展示了它的逻辑,就是说查询会生成几个算子,主要有扫描的算子,聚合的算子,还有Join的算子,最后再排序。右边就相当于对整个的算子的一个实际物理划分。这个规划途中单个方框我们称之为一个Fragment,Fragment由单个BE节点执行,Fragment之间的数据交换通过RPC来完成。整个物理规划执行完之后,由FE把数据收集起来反馈给用户。返回用户的方式就是前面提到过的通过一个MySQL的协议。

这基本上就是一个简单的物理的执行规划,以及最后的展现。

如果大家使用Doris的话,可能会涉及到怎么去优化整个引擎查询方式的问题。因为有时候查询写的不太好的时候,或者说用户写的不太好的时候,Doris是有工具可以让你去看一下物理规划,这样主要是可以去分析出这个物理规划的性能可能在哪里不够好,可能稍微改一下查询它的规划就会更契合整个建表的结构,以及整个查询引擎的处理方式。

上面简单地介绍了Doris查询处理的逻辑。因为查询处理是比较复杂的一个部分,我主要是说了一下它的物理规划,还有很多的其他的细节比较复杂,我在这里就不再展开讲。

前面讲完了查询的部分,下面我来再介绍一下存储引擎的部分。这里主要分为两个部分展开,一个是数据模型,一个就是数据组织。

首先讲数据模型。一个正常的模型它肯定会把明细的数据存储在一个数据库中,也就是存在Doris中。但是因为Doris它最早是给凤巢的一个广告报表做的,广告报表有一个很大的特点,就是它只关心统计分析的结果,而不太关心明细的数据,所以Doris最早一代的数据模型,是一个聚合的模型。

如上图,它有三列,左边的三列是一个Key,我们叫做维度列;右边的两列相当于是一个Value列,也就是指标列。这个图是一个典型的一个广告报表的样子,展示的是一个用户按地域消费的情况。

我们会按左边的这三列也就是他的维度列去聚合数据,如果前面的三列数据相同我们会把这些数据合并(compaction)起来。也就相当于在数据库里面存储的其实是一个合并之后的结果,这个结果对于统计分析来说是很有效果的。因为广告报表只关心这种统计之后的数据,现在我们把大量的数据聚合,比如一天的数据可能有一千条,我们聚合成一条,相当于整个的I/O节省一千倍,效果非常明显。

我们当时在做凤巢的广告报表时,每天导入的数据量在一两百亿条的情况下,基本可以做到在一百毫秒以内出广告报表,这样可以满足整个广告用户实时查看广告报表的需求。

我刚才讲了数据在导入的时候会有一次合并,因为我们因为要聚合。还有一种情况是如果我先导入了一批数据,然后又导入了一批数据,这两批的数据之间有相同的时候,也需要进行一个合并。

上图展示的就是合并的策略,它把导入分成了三级,左边是一个Base,也就是最早导入的数据的合并,然后中间是一个增量的部分,最后是每一次导入就生成的一个单独的版本。然后这个单独的版本会逐渐的合并到一个更高的一个增量的版本,增量的版本再合并成一个Base的版本。分几级的目的其实就是想尽量把小文件合并成一个中等规模的文件,然后再把中等规模的文件合并成一个大文件,这样会减少一个大文件直接跟一个小文件合并的读写放大量。因为大文件大量的读取其实是划不来的,所以Doris采用一个分级的策略去做整个的合并。同时这样做还有一个好处就是新来的导入,不会影响正在进行的查询。因为我现在的导入只有在整个批次完成的时候查询(也就是我们前面所讲的FE的元数据)才能感知到,这样它可以做到一个比较好的读写分离,用户在查询历史数据不会受到新来导入的影响。

然后我们当然也保证整个导入的一个原子性,就是说一个单批次的导入要么生效,要么不生效。即使是一个批次,导入多张表,也是同样。比如这几张表是一个关联表,你想一次把它们全部导入,这样我们也保证整个一个批次的导入对这几张表要么同时生效,要么同时不生效,这是所谓的导入的原子性。

前面主要介绍了Doris的数据模型,我们目前来说可以看到,其实数据模型主要是针对报表统计分析是比较有用的。后来逐步扩展一个通用的数据模型之后,它有一些限制。因为有些业务场景做这种分析的时候,是需要明细数据的,它不太关心统计的结果,而是更关心流程分析,更关心的是我要拿着历史的全量数据跟现在的数据做对比。所以我们后来扩展了这个模型。

Doris新提供两类新的数据模型,一个是一个UniqueKey的模型,就是说我们提供一个唯一Key模型,在整个历史数据导入的时候,我们保证Key的唯一,不聚合。第二个模型是一个DuplicateKey的模型,就是说支持一个用户导入之后把这个数据全部放在数据库里面,我们不再做提前的聚合,也不单独保证唯一性,只做一个排序。那么这两个Key的主要面向场景就有所不同了,就相当于跟聚合模式有所不同了,UniqueKey的模型主要面向留存分析或者订单分析的场景,他们需要一个UniqueKey的约束去保证整个数据不丢不重。然后DuplicateKey的模型,就是这个数据可能重复,对于有些日志分析它不太在意数据多几条或者少几条,可能只关心排序,这个时候可能重复Key的模型会更加有效果。

在开源的版本里面这几个目前都是有的。目前对于报表分析来说,聚合模型是比较有很有效果的,在其他的场景下,另外两个模型可能会相对更好一点,用户在建表的时候就可以去指定用哪一个模型,建表语句里面都会有。

前面介绍了数据模型的部分,当然存储引擎除了数据模型还有一个很重要的部分,就是它的数据组织。Doris作为一个数据库,主要是一个列存的数据库,就是说我们的数据都是以列的形式留在存储引擎,每一列单独存放。因为对分析的场景来说,多数时候用户只关心几列的数据,这个时候如果用一个列存的话,它可以只访问查询涉及的列,大量降低I/O,达到一个比较好的一个I/O的效果。同时因为按列存储,数据类型一致,方便压缩。举个例子,比如我们从磁盘上读1M的数据,其实解压之后展开的数据就会大大扩张,而这相当于变相地节省了整个磁盘的I/O,通过这样一种方式,可以达到一个去快速地检索数据并把数据读取出来,做一个大规模的扫描分析的效果。

上图涉及了具体的一些数据压缩的方法,因为列存有很多具体的压缩方法,对于不同的类型有一些不同的压缩办法。除了简单的压缩之外,我们还有很重要的一个特点就是对数据会排序。多数分析型数据库它都只会去建一个稀疏索引,这个时候把数据排一下序,然后按一个一千行或者一万行,甚至是十万行的维度去见一个稀疏的索引,效果是非常明显的。

Doris存储引擎对于排序列,会存储min/max/sum等智能索引技术,将数据集扫描范围尽可能地缩小,减少磁盘I/O,提升查询性能。比如说这一列排过序了,然后我在这一列的十万行所组成的一个粒度上面,给它加一个min/max,然后这样查询的时候,它就可以快速去过滤这个十万行,这会大大地减少整个数据的扫描量,从而减少I/O。

对于数据组织来说,它同时会去改造一下整个查询在存储引擎上的执行过程,主要就是一个向量化的过程。

对于传统的关系数据库来说,它所返回数据的方式,都是按行进行。这样的问题是:每行一次函数调用,打断CPU流水,不利于分支预测;指令和数据cachemiss高;编译器不友好,不利于循环展开,不利于使用SIMD等CPU加速指令。

但是对于列存的数据库来说,其实可以按列的形式去做,这样达到的效果,其实就是能减少整个缓存失效率。这个时候相当于用一个向量化执行方式来达到这样一个效果,这是列存数据库特有的一个优化技巧,可以达到一个减少CPU的消耗,提升整个CPU的利用率。

我们做过star-schema的测试,向量化执行的方式可以提高3到4倍的性能。

我们刚刚也说过了,Doris没有一个强索引,它只有一个稀疏索引,而稀疏索引有时满足不了用户的场景。比如说我做统计分析的时候,可能有各种维度进行组合分析的一个需求,不是只有一个固定的维度,它当换维度的时候,原来的前缀索引可能就无法命中。所以我们也支持物化视图。

以这个图为例,左下角相当于它的原始数据,右边是把这个数据重新排序,因为改变排序就会相应地去改变整个数据的前缀索引的方式,这样的话就相当于可以去换维度进行分析了,就是右上边的一个表,可以更好地去满足整个查询的需求。最右下角的是说我如果说经常只按一个维度进行组合分析,我可以只选它其中部分列,把它聚合,达到一个更好的聚合效果。这样的方式就可以满足不同的统计分析的需求,以及不同的多维分析的需求。

这个物化视图,其实它核心会去牺牲一部分存储空间,跟原来的表是一个绑定的形式。但是我们在导入的时候会保证它跟原始表一定是一个原子生效的过程,查询的时候我们会自动去判断它满不满足物化视图,满足的话我们就会自动落在它的上面,用户不需要修改任何的查询。用户所需要做的只是说我发现我的业务可能换一个表的结构,会更好地满足整个查询需求,那么他需要去建一个物化视图就行了,物化视图发起之后后续所有的操作,从导入到查询,用户都不需要再关心。

Doris是一个跨节点的分布式数据库,建表的时候我们要求用户指定分区方式,划分数据到集群中。

Doris提供两层分区。第一层分区,主要是一个逻辑分区,比方比较常见的是按时间分区,把历史的数据按天/月/年作一个分区,不同区间的数据会落在不同分区里面。这样如果你经常只看最新的数据,那历史的数据在查询的时候,就会自动帮你判断出来,就是说这些历史数据的所在的分区就不用看了。这样减少了大量历史数据不必要的重复BE/CE,节省了大量的I/O和CPU开销。

时间分区还有一个作用就是可以把冷热的数据区分开,方便新旧数据分离,使用不同的存储介质。比如说在现在一个机器环境下,用户他的机器上是有一部分SATA盘,也有一部分SSD盘的情况下,我们通过一个分区的方式把冷热的数据给区分开来之后,它可以达到一个效果就是,可以把最新的数据放在SSD上,用在一个更好的介质上,历史的数据放在一个SATA盘上。

第二层分区是一个物理分区,而这种分区的作用主要就是把数据打散到整个集群里面去。现在假如说有20台机器所构成的一个集群,然后我要把20台机器的性能全部给用上,我会去指定第二层分区,我们根据哈希分区的方式把这个数据打散到整个集群里面,这样的话查询的时候就可以用上整个分布式集群的性能,去更好的去满足这个效果。

这里有一点点技巧,就是说要根据用户的一个就是读写的一个密集程度做分区。如果是一个读很密集的一个场景,那样的话我们建议用户尽量选择一个区分度很大的列,也就是基数很大的列,我们就可以把这个数据均匀地打散到整个集群里面去,用户查询的时候就可以把整个集群的资源可以用上,相当于没有数据倾斜。如果是写很密集的情况下,就建议用户选择一个区分度少一点的列。这个可以在用户建表的时候根据自己的一个业务场景去选择。可以指定数据放到SSD上或者SATA盘上,也支持根据TTL将冷数据从SSD迁移到SATA上,高效利用SSD提高查询性能

建表对于最后的优化效果是非常关键的。因为会涉及很多建表的场景的优化,如果你建表不好的话,可能整个集群资源用不上,或者说整个资源用上了,但是分布很不均匀,数据倾斜非常严重,这个时候会影响整个查询的性能。所以我们一般会建议用户建表的时候多考虑一下,去看一下我们的官方文档,以及最佳实践,去看看怎么建表会更加的优化。

因为Doris这本身是一个Impala的存储引擎,我们最近半年也扩展了一下它支持的存储引擎,主要就是支持Elasticsearch。想支持它的目的主要是因为Impala只是一个稀疏索引,对于这种大规模点查询来说,随机的I/O会比较严重,所以我们会想去支持Elasticsearch,用到它的倒排索引,去丰富整个Doris的生态。同时因为Elasticsearch本身在支持分布式的查询方面不是特别的友好,因为它缺少一个很良好的分布式的查询框架,所以我们就想把他们两个的功能结合起来,去用到一个Impala里面,这样可以结合两者的优势。

目前来说已经这个功能应该在最新的开源版本里面是有的,并且已经开始逐步在使用。如果大家有兴趣可以去看一下。

这个是一个例子,就是说在怎么在Doris里面建一个外部表,然后外部表给你指定了一个Elasticsearch的表,再怎么通过Doris发一个查询去命中Elasticsearch表,然后它帮我去返回结果的一个过程。

前面主要讲的是整体架构,下面就会单独介绍一下导入的部分。

百度内部有自己的一整套消息队列,但是开源之后用户有很多Kafka导入的需求,就是说数据从Kafka来的时候,然后怎么进入到Doris里面去。最早Doris只支持批量导入或者小批量导入,不太支持Kafka这类流式导入。开源之后有很多用户反馈,他们有大量Kafkamicrobatch导入的需求,也就是几千条或者几万条数据单批次灌入的需求。Doris0.10版本中,原生支持了流式导入的功能。同时为了减少用户的一个使用的成本,Doris内嵌了一个例行机制,也就是说可以在Doris里建一个例行的任务,它帮你去订阅Kafka的变更,帮你把数据灌到Doris里面去,不需要外层业务方自己去写外层脚本去追踪Kafka的变化。

这个图展示的是Doris订阅Kafka的一个过程,这个是比较新的功能,也是在最新的版本里面发布了。

上面是Doris涉及的一些其他的特性,我们目前是支持UDF的。然后我们主要用SchemaChange做改表结构的方式,支持在线的改表结构。不涉及重新排序的情况下,可以把一个TB级别表在分钟级别内完成它的schemachange操作,让业务方可以去迅速地使用。另外Doris作为一个数据库它也支持一部分ACID的特性,主要是保证一个原子性,包括导入的原子性、SchemaChange的原子性等。除此之外我们也做了很多优化,比如LLVM或者说支持Hyperloglog的类型,去优化整个查询的读取的性能。

以上这些基本就是Doris整体的特性以及功能。

下面我主要介绍一下案例的部分。目前Doris在百度内部接入了200多个业务,外部也有一些用户在使用。百度内部以百度统计为例,外部以美团为例。

百度统计主要有两类的大的场景分析。

一是统计报表。报表的特点是它的模式比较固定,经常只需要对几个维度进行组合分析。但是它的并发比较高,在我们内部统计,它单机的并发量可以达到1000。所以它要求必须支持高并发,并且延迟必须非常低,这样才能实时地展现。目前百度统计用的是聚合模型。我们按特定维度聚合数据,使用AggregateKey数据模型,提前聚合数据。在这个模式之下,我们基本上可以做到1000并发在50到60毫秒级别就出查询展示的效果。

上图展示了百度统计的架构,Doris在其中主要起到存储它们整个的数据的作用,最后在Doris上面接一个可视化的报表分析做展示。

统计报表基本都是类似的,前端收集数据做一些预处理,然后再灌到Doris里面去,目前我们做的导入的延迟是在五分钟级别,也就是数据从最左边的产生到最后进到Doris里面展示就用了五分钟。

第二个案例是美团,美团跟我们合作比较多。

他们在选型的时候,比较看重的一个特点就是需要有比较丰富的一个SQL特性,要兼容SQL-92标准。因为他们作为平台方希望用户可以快速入手,而大家对SQL的接受度一般是比较高的,所以不需要写太多复杂的查询语句,然后这样可以很快速入手。同时他们如果有一个完整的SQL协议,就可以很好的做一些报表工具的展示,不需要再做额外的开发去自己定制。

第二个他们比较看重的是监控运维的方面。监控的话,我们是集中到Prometheus;运维的话,他们希望尽量方便,不需要依赖太多的组件。Doris的整体架构就比较简单,涉及的组件比较少,可以把问题收敛在几个组件之下,比较符合他们的需求。

第三个他们比较关注的是扩展性跟可用性。因为他们的业务变动比较大而且是一个24小时的服务。所以需要架构本身就能自动扩展,而不用去手动去操作。并且必须要做到高可用,就是说在单台机器,甚至两台以上机器宕机的状况下,也要保证服务是继续可用的状态。

我们采取一下的方式来保证服务的高可用与高扩展

目前美团内部部署的最大的一个集群由70个节点,存储超过18TB的数据。

以上就是Doris整体的原理及实践的介绍。

分享ApacheDorisDB发展历程

DorisDB是由ApacheDoris核心研发团队打造的新一代企业级MPP数据库。它继承了ApacheDoris项目十多年研发成果,累积了线上数千台服务器稳定运行经验,并在此基础上,对传统MPP数据库进行了开创性的革新。DorisDB重新定义了MPP分布式架构,集群可扩展至数百节点,支持PB级数据规模,是当前唯一可以在大数据规模下进行在线弹性扩展的企业级分析型数据库。DorisDB还打造了全新的向量化执行引擎,单节点每秒可处理多达100亿行数据,查询速度比其他产品快10-100倍!

Doris自第一版诞生以来,经过了11年的发展,中间做过无数改进。这?只罗列对Doris发展来说?比较重要的关键节点与事件。

2008Doris1,「筑巢引凤」的重要基石

在Doris1诞生之前,百度使用MySQLSharding方式来为广告主提供广告报表支持。随着百度本身流量的增加,广告流量也随之增加,已有的MySQLSharding方案变得不再能够满足业务的需求。当时数据存储和计算成熟的开源产品很少,Hbase的导入性能只有大约2000条/秒,不能满足业务每小时新增的要求。而业务还在不断增长,来自业务的压力越来越大。在这种情况下,Doris1诞生了,并且在2008年10月份跟随百度凤巢系统一起正式上线。

2009Doris2,解「百度统计」燃眉之急

2008年的百度统计服务大约有50-60台MySQL,但是业务每天有3000万+条增量数据,由于MySQL的存储和查询性能无法满足需求,对存量数据的支撑已经到了极限,问题频出,万般无奈之下百度统计甚至关闭了新增用户的功能,以减少数据量的增加。

Doris1由于当时时间紧、任务重,所以设计、实现的时候只为了能够满足凤巢的业务需求,并没有兼顾其他的应用需求。2009年Doris2研发完成后上线百度统计,并且成功支撑百度统计后续的快速增长,成功的助力百度统计成为当时国内规模最大,性能、功能最强的统计平台。

2010Doris3,让查询再快一点

随着业务数据量的不断增长,Doris2系统的问题也逐渐成为业务发展的瓶颈。首先体现在Doris2无法满足业务的查询性能需求,主要是对于长时间跨度的查询请求、以及大客户的查询请求。其次,Doris2在日常运维方面基本上都需要停服后手动操作,比如SchemaChange、集群扩缩容等,一方面用户体验很差,一方面还会增加集群运维的成本。最后,Doris2本身并不是高可用系统,机器故障等问题还是会影响服务的稳定性,并且需要人肉进行复杂的操作来恢复服务。为了解决Doris2的问题,团队开始了Doris3设计、研发。Doris3的主要架构中,DT(DataTransfer)负责数据导入、DS(DataSeacher)模块负责数据查询、DM(DataMaster)模块负责集群元数据管理,数据则存储在Armor分布式Key-Value引擎中。Doris3依赖ZooKeeper存储元数据,从而其他模块依赖ZooKeeper做到了无状态,进而整个系统能够做到无故障单点。

在数据分布方面Doris3引入了分区的概念。另外Doris3在日常运维SchemaChange,以及扩容、缩容等方面都做了针对性设计,使其能够自动化进行,不依赖线上人工操作。

Doris3在2011年完成开发后逐渐替换Doris2所制成的业务,并且成功的解决了大客户查询的问题。而公司内部后续的新需求,也都由Doris3来承担支持。

2012MySQL+Doris3,百度的第一个OLAP平台

2012年随着Doris3逐步迁移Doris2的同时,大数据时代悄然到来。在公司内部,随着百度业务的发展,各个业务端需要更加灵活的方式来分析已有的数据。而此时的Doris3仍然只支持单表的统计分析查询,还不能够满足业务进行多维分析的需求。所以,为了能够支持业务的多维分析需求,Doris3采用了MySQLStorageHandler的方式来扩展Doris3。

2012OLAPEngine,突破底层存储束缚

Doris3支持报表分析场景时,底层通用Key-Value存储引擎的弊端也逐渐显露。为了能够在底层存储引擎上有所突破,OLAPEngine项目启动了。这个项目的发起者是当时从Google来的高T,为百度带来了当时业界最领先的底层报表引擎技术。

2013用PALO,玩转OLAP

底层技术的发展会激发上层业务的需求,而上层业务的需求同时会为底层的技术带来新的挑战。因此Doris亟需一款拥有分布式计算能力的查询引擎。新产品的名字命名为PALO,意为玩转OLAP。随着PALO1的正式上线,除了迁移所有Doris3已有的的业务外,也成功支持了当时百度内部大部分的OLAP分析场景。

2015PALO2,让架构再简单一点

如果说PALO1是为了解决性能问题,那么PALO2主要是为了在架构上进行优化。通过PALO2的工作,系统架构本身变得相当简洁,并且不需要任何依赖。

2017andFutureApacheDoris(incubating),是更广阔的世界

Palo于2017年正式在GitHub上开源,并且在2018年贡献给Apache社区,并将名字改为ApacheDoris(incubating)进行正式孵化。随着开源,Doris已经在京东、美团、搜狐、小米等公司的生产环境中正式使用,也有越来越多的Contributor加入到Doris大家庭中。

Doris是一个海量分布式KV存储系统,其设计目标是支持中等规模高可用可伸缩的KV存储集群。Doris可以实现海量存储,线性伸缩、平滑扩容,自动容错、故障转移,高并发,且运维成本低。部署规模,建议部署4-100+台服务器。

Doris采用两层架构,Client和DataServer+Store。有四个核心组件,Client、DataServer、Store、Administration。应用程序通过ClientSDK进行Doris的访问,每台服务器上部署一个DataSever做服务器的管理,每台服务器上有自己的存储Store,整个集群的数据存储,每台机器独立部署。数据通过路由选择写入到不同的机器中。Administration为管理中心,提供配置、管理和监控。config指应用程序启动一个DataServer,在启动时要配置管理中心的ip地址,通关管理中心。管理中心会修改配置项感知到集群中加了新机器,对新机器管理,扩容等。待机器处于可用状态,将该机器的配置项通知给KVClient。从而KVClient进行新的路由选择。扩容、下线机器等的控制台界面通过Management管理。Monitor监控机器是否正常。

client写数据,绑定产品的namespace(逻辑隔离),构成新key,路由到具体机器上读写。

集群管理的重要角色ConfigServer,有一个功能是负责发现故障服务器。发现故障的方式有2种:

通常心跳检测是慢的,几秒进行一次心跳检测。更多时候,是clientFail失败报告发现无效服务器,当写入失败时,Client会告诉ConfigServer。ConfigServer校验,也访问失败,则通知其他client。

用户可使用MySQL客户端连接FE,执行SQL查询,获得结果。

①MySQL客户端执行DQLSQL命令。②FE解析,分析,改写,优化和规划,生成分布式执行计划。③分布式执行计划由若干个可在单台be上执行的planfragment构成,FE执行exec_plan_fragment,将planfragment分发给BE,指定其中一台BE为coordinator。④BE执行本地计算,比如扫描数据。⑤其他BE调用transimit_data将中间结果发送给BEcoordinator节点汇总为最终结果。⑥FE调用fetch_data获取最终结果。⑦FE将最终结果发送给MySQLclient。

执行计划在BE上的实际执行过程比较复杂,采用向量化执行方式,比如一个算子产生4096个结果,输出到下一个算子参与计算,而非batch方式或者one-tuple-at-a-time。

用户创建表之后,导入数据填充表.

①用户选择一台BE作为协调者,发起数据导入请求,传入数据格式,数据源和标识此次数据导入的label,label用于避免数据重复导入.用户也可以向FE发起请求,FE会把请求重定向给BE.②BE收到请求后,向FEmaster节点上报,执行loadTxnBegin,创建全局事务。因为导入过程中,需要同时更新base表和物化索引的多个bucket,为了保证数据导入的一致性,用事务控制本次导入的原子性.③BE创建事务成功后,执行streamLoadPut调用,从FE获得本次数据导入的计划.数据导入,可以看成是将数据分发到所涉及的全部的tablet副本上,BE从FE获取的导入计划包含数据的schema信息和tablet副本信息.④BE从数据源拉取数据,根据base表和物化索引表的schema信息,构造内部数据格式.⑤BE根据分区分桶的规则和副本位置信息,将发往同一个BE的数据,批量打包,发送给BE,BE收到数据后,将数据写入到对应的tablet副本中.⑥当BEcoordinator节点完成此次数据导入,向FEmaster节点执行loadTxnCommit,提交全局事务,发送本次数据导入的执行情况,FEmaster确认所有涉及的tablet的多数副本都成功完成,则发布本次数据导入使数据对外可见,否则,导入失败,数据不可见,后台负责清理掉不一致的数据.

更改元数据的操作有:创建数据库,创建表,创建物化视图,修改schema等等.这样的操作需要:

①用户使用MySQLclient执行SQL的DDL命令,向FE的master节点发起请求;比如:创建表.②FE检查请求合法性,然后向BE发起同步命令,使操作在BE上生效;比如:FE确定表的列类型是否合法,计算tablet的副本的放置位置,向BE发起请求,创建tablet副本.③BE执行成功,则修改内存的Catalog.比如:将table,partition,index,tablet的副本信息保存在Catalog中.④FE追加本次操作到EditLog并且持久化.⑤FE通过复制协议将EditLog的新增操作项同步到FE的follower节点.⑥FE的follower节点收到新追加的操作项后,在自己的Catalog上按顺序播放,使得自己状态追上FEmaster节点.

上述执行环节出现失败,则本次元数据修改失败.

查找维度列的前缀的查找过程为:先查找shortkeyindex,获得逻辑块的起始行号,查找维度列的行号索引,获得目标列的数据块,读取数据块,然后解压解码,从数据块中找到维度列前缀对应的数据项.

DorisDB的表和关系型数据相同,由行和列构成.每行数据对应用户一条记录,每列数据有相同数据类型.所有数据行的列数相同,可以动态增删列.DorisDB中,一张表的列可以分为维度列(也成为key列)和指标列(value列),维度列用于分组和排序,指标列可通过聚合函数SUM,COUNT,MIN,MAX,REPLACE,HLL_UNION,BITMAP_UNION等累加起来.因此,DorisDB的表也可以认为是多维的key到多维指标的映射.

在DorisDB中,表中数据按列存储,物理上,一列数据会经过分块编码压缩等操作,然后持久化于非易失设备,但在逻辑上,一列数据可以看成由相同类型的元素构成的数组.一行数据的所有列在各自的列数组中保持对齐,即拥有相同的数组下标,该下标称之为序号或者行号.该序号是隐式,不需要存储的,表中的所有行按照维度列,做多重排序,排序后的位置就是该行的行号.

查询时,如果指定了维度列的等值条件或者范围条件,并且这些条件中维度列可构成表维度列的前缀,则可以利用数据的有序性,使用range-scan快速锁定目标行.

当范围查找时,如何快速地找到起始的目标行呢?答案是shortkeyindex.如下图所示:shortkey索引为稀疏索引,

为了描述方便,我们借鉴关系模式中的主键概念,称DorisDB表的维度列的取值构成数据表的排序键,DorisDB的排序键对比传统的主键具有:

对于摄入(ingest)的主键重复的多行数据,填充于(populate)数据表中时,按照三种处理方式划分:

DorisDB建表的默认模型是明细模型。

一般用明细模型来处理的场景有如下特点:

在数据分析领域,有很多需要对数据进行统计和汇总操作的场景。比如:

适合采用聚合模型来分析的场景具有如下特点:

有些分析场景之下,数据会更新,DorisDB采用更新模型来满足这种需求。比如在电商场景中,定单的状态经常会发生变化,每天的订单更新量可突破上亿。在这种量级的更新场景下进行实时数据分析,如果在明细模型下通过delete+insert的方式,是无法满足频繁更新需求的;因此,用户需要使用更新模型来满足数据分析需求。

以下是一些适合更新模型的场景特点:

常见的四种数据分布方式有:(a)Round-Robin、(b)Range、(c)List和(d)Hash(DeWittandGray,1992)。如下图所示:

dynamic_partition.enable:是否开启动态分区特性,可指定为TRUE或FALSE。如果不填写,默认为TRUE。

dynamic_partition.time_unit:动态分区调度的粒度,可指定为DAY/WEEK/MONTH。

dynamic_partition.start:动态分区的开始时间。以当天为基准,超过该时间范围的分区将会被删除。如果不填写,则默认为Integer.MIN_VALUE即-2147483648。

dynamic_partition.end:动态分区的结束时间。以当天为基准,会提前创建N个单位的分区范围。

dynamic_partition.prefix:动态创建的分区名前缀。

dynamic_partition.buckets:动态创建的分区所对应的分桶数量。

DorisDB中为加速查询,在内部组织并存储数据时,会把表中数据按照指定的列进行排序,这部分用于排序的列(可以是一个或多个列),可以称之为SortKey。明细模型中SortKey就是指定的用于排序的列(即DUPLICATEKEY指定的列),聚合模型中SortKey列就是用于聚合的列(即AGGREGATEKEY指定的列),更新模型中SortKey就是指定的满足唯一性约束的列(即UNIQUEKEY指定的列)。

Doris是基于MPP架构的交互式SQL数据仓库,主要用于解决近实时的报表和多维分析。Doris高效的导入、查询离不开其存储结构精巧的设计。

存储层对存储数据的管理通过storage_root_path路径进行配置,路径可以是多个。存储目录下一层按照分桶进行组织,分桶目录下存放具体的tablet,按照tablet_id命名子目录。

Segment文件存放在tablet_id目录下按SchemaHash管理。Segment文件可以有多个,一般按照大小进行分割,默认为256MB。其中,Segmentv2文件命名规则为:${rowset_id}_${segment_id}.dat。

Segment整体的文件格式分为数据区域,索引区域和footer三个部分

SegmentFooterPB:定义文件的元数据信息4个字节的FooterPB内容的checksum4个字节的FileFooterPB消息长度,用于读取FileFooterPB8个字节的MAGICCODE,之所以在末位存储,是方便不同的场景进行文件类型的识别

Footer信息段在文件的尾部,存储了文件的整体结构,包括数据域的位置,索引域的位置等信息,其中有SegmentFooterPB,CheckSum,Length,MAGICCODE4个部分。

SegmentFooterPB采用了PB格式进行存储,主要包含了列的meta信息、索引的meta信息,Segment的shortkey索引信息、总行数。

OrdinalIndex索引提供了通过行号来查找ColumnDataPage数据页的物理地址。OrdinalIndex能够将按列存储数据按行对齐,可以理解为一级索引。其他索引查找数据时,都要通过OrdinalIndex查找数据Page的位置。因此,这里先介绍OrdinalIndex索引。

在一个segment中,数据始终按照key(AGGREGATEKEY、UNIQKEY和DUPLICATEKEY)排序顺序进行存储,即key的排序决定了数据存储的物理结构。确定了列数据的物理结构顺序,在写入数据时,ColumnDataPage是由Ordinalindex进行管理,Ordinalindex记录了每个ColumnDataPage的位置offset、大小size和第一个数据项行号信息,即Ordinal。这样每个列具有按行信息进行快速扫描的能力。

Column的data数据按照Page为单位分块存储,每个Page大小一般为64*1024个字节。

Page在存储的位置和大小由ordinalindex管理。

DataPage主要为Data部分、PageFooter两个部分。Data部分存放了当前Page的列的数据。当允许存在Null值时,对空值单独存放了Null值的Bitmap,由RLE格式编码通过bool类型记录Null值的行号。PageFooter包含了Page类型Type、UncompressedSize未压缩时的数据大小、FirstOrdinal当前Page第一行的RowId、NumValues为当前Page的行数、NullMapSize对应了NullBitmap的大小。

针对不同的字段类型采用了不同的编码。默认情况下,针对不同类型采用的对应关系如下:

默认采用LZ4F格式对数据进行压缩。

ShortKeyIndex前缀索引,是在key(AGGREGATEKEY、UNIQKEY和DUPLICATEKEY)排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。这里ShortKeyIndex索引也采用了稀疏索引结构,在数据写入过程中,每隔一定行数,会生成一个索引项。这个行数为索引粒度默认为1024行,可配置。

其中,KeyBytes中存放了索引项数据,OffsetBytes存放了索引项在KeyBytes中的偏移。

ShortKeyIndex采用了前36个字节,作为这行数据的前缀索引。当遇到VARCHAR类型时,前缀索引会直接截断。

ZoneMap索引存储了Segment和每个列对应每个Page的统计信息。这些统计信息可以帮助在查询时提速,减少扫描数据量,统计信息包括了Min最大值、Max最小值、HashNull空值、HasNotNull不全为空的信息。

当一些字段不能利用ShortKeyIndex并且字段存在区分度比较大时,Doris提供了BloomFilter索引。

Doris还提供了BitmapIndex用来加速数据的查询。

Doris针对不同场景支持了多种形式的数据写入方式,其中包括了从其他存储源导入BrokerLoad、http同步数据导入StreamLoad、例行的RoutineLoad导入和InsertInto写入等。同时导入流程会涉及FE模块(主要负责导入规划生成和导入任务的调度工作)、BE模块(主要负责数据的ETL和存储)、Broker模块(提供Doris读取远端存储系统中文件的能力)。其中Broker模块仅在BrokerLoad类型的导入中应用。

下面以StreamLoad写入为例子,描述了Doris的整体的数据写入流程如下图所示:

1、FE接收用户的写入请求,并随机选出BE作为CoordinatorBE。将用户的请求重定向到这个BE上。2、CoordinatorBE负责接收用户的数据写入请求,同时请求FE生成执行计划并对调度、管理导入任务LoadJob和导入事务。3、CoordinatorBE调度执行导入计划,执行对数据校验、清理之后。4、数据写入到BE的存储层中。在这个过程中会先写入到内存中,写满一定数据后按照存储层的数据格式写入到物理磁盘上。

数据在经过清洗过滤后,会通过Open/AddBatch请求分批量的将数据发送给存储层的BE节点上。在一个BE上支持多个LoadJob任务同时并发写入执行。LoadChannelMgr负责管理了这些任务,并对数据进行分发。

数据分发和写入过程如下图所示:

DeltaWriter主要负责不断接收新写入的批量数据,完成单个Tablet的数据写入。由于新增的数据可以是增量Delta部分,因此叫做DeltaWriter。

DeltaWriter数据写入采用了类LSM树的结构,将数据先写到Memtable中,当Memtable数据写满后,会异步flush生成一个Segment进行持久化,同时生成一个新的Memtable继续接收新增数据导入,这个flush操作由MemtableFlushExecutor执行器完成。

Memtable中采用了跳表的结构对数据进行排序,排序规则使用了按照schema的key的顺序依次对字段进行比较。这样保证了写入的每一个写入Segment中的数据是有序的。如果当前模型为非DUP模型(AGG模型和UNIQUE模型)时,还会对相同key的数据进行聚合。

在物理存储层面的写入,由RowsetWriter完成。RowsetWriter中又分为SegmentWriter、ColumnWriter、PageBuilder、IndexBuilder等子模块。

1.当一个Memtable写满时(默认为100M),将Memtable的数据会flush到磁盘上,这时Memtable内的数据是按key有序的。然后逐行写入到RowsetWriter中。2.RowsetWriter将数据同样逐行写入到SegmentWriter中,RowsetWriter会维护当前正在写入的SegmentWriter以及要写入的文件块列表。每完成写入一个Segment会增加一个文件块对应。3.SegmentWriter将数据按行写入到各个ColumnWriter的中,同时写入ShortKeyIndexBuilder。ShortKeyIndexBuilder主要负责生成ShortKeyIndex的索引Page页。具体的ShortKeyIndex索引格式可以参见《Doris存储层设计介绍1——存储结构设计解析》文档。4.ColumnWriter将数据分别写入PageBuilder和各个IndexBuilder,PageBuilder用来生成ColumnData数据的PageBuilder,各个IndexBuilder包括了(OrdinalIndexBuilder生成OrdinalIndex行号稀疏索引的Page格式、ZoneMapIndexBuilder生成ZoneMapIndex索引的Page格式、BitMapIndexBuilder生成BitMapIndex索引的Page格式、BloomFilterIndexBuilder生成BloomFilterIndex索引的Page格式)。具体参考Doris存储文件格式解析。5.添加完数据后,RowsetWriter执行flush操作。6.SegmentWriter的flush操作,将数据和索引写入到磁盘。其中对磁盘的读写由FileWritableBlock完成。7.ColumnWriter将各自数据、索引生成的Page顺序写入到文件中。8.SegmentWriter生成SegmentFooter信息,SegmentFooter记录了Segment文件的原数据信息。完成写入操作后,RowsetWriter会再开启新的SegmentWriter,将下一个Memtable写入新的Segment,直到导入完成。

1.DeltaWriter统计当前RowsetMeta元数据信息,包括行数、字节数、时间、Segment数量。2.保存到RowsetMeta中,向FE提交导入事务。当前导入事务由FE开启,用来保证一次导入在各个BE节点的数据的同时生效。3.在FE协调好之后,由FE统一下发Publish任务使导入的Rowset版本生效。任务中指定了发布的生效version版本信息。之后BE存储层才会将这个版本的Rowset设置为可见。4.Rowset加入到BE存储层的Tablet进行管理。

目前Delete有两种实现,一种普通的删除类型为DELETE,一种为LOAD_DELETE。

DELETE的支持一般的删除操作,实现较为简单,DELETE模式下没有对数据进行实际删除操作,而是对数据删除条件进行了记录。存储在Meta信息中。当执行BaseCompaction时删除条件会一起被合入到Base版本中。Base版本为Tablet从[0-x]的第一个Rowset数据版本。具体流程如下:

1.删除时由FE直接下发删除命令和删除条件。2.BE在本地启动一个EngineBatchLoadTask任务,生成新版本的Rowset,并记录删除条件信息。这个删除记录的Rowset与写入过程的略有不同,该Rowset仅记录了删除条件信息,没有实际的数据。3.FE同样发布生效版本。其中会将Rowset加入到Tablet中,保存TabletMeta信息。

LOAD_DELETE支持了在UNIQUEKEY模型下,实现了通过批量导入要删除的key对数据进行删除,能够支持大量数据删除能力。整体思路是在数据记录中加入删除状态标识,在Compaction流程中会对删除的key进行压缩。

聚合模型的特点就是将表中的列分为了Key和Value两种。Key就是数据的维度列,比如时间,地区等等。Value则是数据的指标列,比如点击量,花费等。每个指标列还会有自己的聚合函数,包括sum、min、max和bitmap_union等。数据会根据维度列进行分组,并对指标列进行聚合。

首先是导入数据,原始数据在导入过程中,会根据表结构中的Key进行分组,相同Key的Value会根据表中定义的AggregationFunction进行聚合。

由于Doris采用的是MVCC机制进行的并发控制,所以每一次新的导入都是一个新的版本。我们把这种版本称为Singleton。

不断的导入新的数据后,尽管同一批次的数据在导入过程中已经发生了聚合,但不同版本之间的数据依旧存在维度列相同但是指标列并没有被聚合的情况。这时候就需要通过Compaction机制进行二次聚合。

Compaction的意思其实就是将不同版本的数据进行合并。它分为两个阶段,第一个阶段是:当Singleton的数据版本个数到达Doris设置的阈值时,就会触发Cumulative级别的Compaction。这个级别的Compaction会将一个区间段内的版本数据根据定义好的聚合函数进行再聚合。

说完聚合模型,再介绍一种聚合模型上的提升查询效率的方式——构建Rollup

Rollup也就是上卷,是一种在多维分析中比较常用的操作——也就是从细粒度的数据向高层的聚合。

在Doris中,我们提供了在聚合模型上的构建Rollup功能,将数据根据更少的维度进行预聚合。将本身在用户查询时才会进行聚合计算的数据预先计算好,并存储在Doris中,从而达到提升用户粗粒度上的查询效率。

Rollup还有一点好处在于,由于Doris具有在原始数据上实时计算的能力,因此不需要对所有维度的每个组合都创建Rollup。尤其是在维度很多的情况下,可以取得一个存储空间和查询效率之间的平衡。

在创建Rollup的时候首先你需要有一个聚合模型的Base表,然后就可以取部分维度创建一个Rollup表。

聚合模型的优点就在于:划分维护和指标列后,数据本身已经进行过预聚合,对于分析型查询效率提升明显。

基于以上问题,我们增加了对明细数据模型的支持。

明细数据模型刚好和聚合模型相反,不区分维护和指标列,并不对导入的数据做任何聚合,每条原始数据都会保留在表中。

明细模型就像Mysql中的表一样,优势就在于你可以详细追溯每个用户行为或订单详情。但劣势也很明显,分析型的查询效率不高。

物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询的需求。

从定义上来说,就是包含了查询结果的数据库对象,可能是对远程数据的本地Copy;也可能是一个表或多表Join后结果的行或列的子集;也可能是聚合后的结果。说白了,就是预先存储查询结果的一种数据库对象。

在Doris中的物化视图,就是查询结果预先存储起来的特殊的表。

它的优势在于:1.对于那些经常重复的使用相同的子查询结果的查询性能大幅提升2.Doris自动更新物化视图的数据,保证Base表和物化视图表的数据一致性。无需额外的维护成本3.查询的时候也可以自动匹配最优的物化视图

目前支持的聚合函数包括常用的sum、min、max、count以及pv、uv,留存率等计算时常用的去重算法hll_union,和用于精确去重计算count(distinct)的算法bitmap_union。

使用物化视图功能后,由于物化视图实际上是损失了部分维度数据的。所以对表的DML类型操作会有一些限制。

使用物化视图功能后,由于物化视图实际上是损失了部分维度数据的。所以对表的DML类型操作会有一些限制。

对于物化视图和Rollup来说,他们的共同点都是通过预聚合的方式来提升查询效率。实际上物化视图是Rollup的一个超集,在覆盖Rollup的工作同时,还支持更灵活的聚合方式。

因此,如果对数据的分析需求既覆盖了明细查询也存在分析类查询,则可以先创建一个明细模型的表,并构建物化视图。

SQL解析在下文中指的是将一条sql语句经过一系列的解析最后生成一个完整的物理执行计划的过程。

这个过程包括以下四个步骤:词法分析,语法分析,生成逻辑计划,生成物理计划。

DorisSQL解析架构的设计有以下目标:

DorisSQL解析具体包括了五个步骤:词法分析,语法分析,生成单机逻辑计划,生成分布式逻辑计划,生成物理执行计划。

具体代码实现上包含以下五个步骤:Parse,Analyze,SinglePlan,DistributedPlan,Schedule。

下图展示了一个简单的查询SQL在Doris的解析实现。

词法分析采用jflex技术,语法分析采用javacupparser技术,最后生成抽象语法树(AbstractSyntaxTree)AST,这些都是现有的、成熟的技术,在这里不进行详细介绍。

AST是一种树状结构,代表着一条SQL。不同类型的查询select,insert,show,set,altertable,createtable等经过Parse阶段后生成不同的数据结构(SelectStmt,InsertStmt,ShowStmt,SetStmt,AlterStmt,AlterTableStmt,CreateTableStmt等),但他们都继承自Statement,并根据自己的语法规则进行一些特定的处理。例如:对于select类型的sql,Parse之后生成了SelectStmt结构。

SelectStmt结构包含了SelectList,FromClause,WhereClause,GroupByClause,SortInfo等结构。这些结构又包含了更基础的一些数据结构,如WhereClause包含了BetweenPredicate(between表达式),BinaryPredicate(二元表达式),CompoundPredicate(andor组合表达式),InPredicate(in表达式)等。

抽象语法树是由StatementBase这个抽象类表示。这个抽象类包含一个最重要的成员函数analyze(),用来执行Analyze阶段要做的事。

不同类型的查询select,insert,show,set,altertable,createtable等经过Parse阶段后生成不同的数据结构(SelectStmt,InsertStmt,ShowStmt,SetStmt,AlterStmt,AlterTableStmt,CreateTableStmt等),这些数据结构继承自StatementBase,并实现analyze()函数,对特定类型的SQL进行特定的Analyze。

例如:select类型的查询,会转成对selectsql的子语句SelectList,FromClause,GroupByClause,HavingClause,WhereClause,SortInfo等的analyze()。然后这些子语句再各自对自己的子结构进行进一步的analyze(),通过层层迭代,把各种类型的sql的各种情景都分析完毕。例如:WhereClause进一步分析其包含的BetweenPredicate(between表达式),BinaryPredicate(二元表达式),CompoundPredicate(andor组合表达式),InPredicate(in表达式)等。

这部分工作主要是根据AST抽象语法树生成代数关系,也就是俗称的算子数。树上的每个节点都是一个算子,代表着一种操作。

ScanNode代表着对一个表的扫描操作,将一个表的数据读出来。HashJoinNode代表着join操作,小表在内存中构建哈希表,遍历大表找到连接键相同的值。Project表示投影操作,代表着最后需要输出的列,图片表示只用输出citycode这一列。

有了单机的PlanNode树之后,就需要进一步根据分布式环境,拆成分布式PlanFragment树(PlanFragment用来表示独立的执行单元),毕竟一个表的数据分散地存储在多台主机上,完全可以让一些计算并行起来。

这个步骤的主要目标是最大化并行度和数据本地化。主要方法是将能够并行执行的节点拆分出去单独建立一个PlanFragment,用ExchangeNode代替被拆分出去的节点,用来接收数据。拆分出去的节点增加一个DataSinkNode,用来将计算之后的数据传送到ExchangeNode中,做进一步的处理。

这一步采用递归的方法,自底向上,遍历整个PlanNode树,然后给树上的每个叶子节点创建一个PlanFragment,如果碰到父节点,则考虑将其中能够并行执行的子节点拆分出去,父节点和保留下来的子节点组成一个parentPlanFragment。拆分出去的子节点增加一个父节点DataSinkNode组成一个childPlanFragment,childPlanFragment指向parentPlanFragment。这样就确定了数据的流动方向。

这一步是根据分布式逻辑计划,创建分布式物理计划。主要解决以下问题:

Doris除了支持HLL近似去重,也是支持Runtime现场精确去重的。实现方法和Spark,MR类似。

对于上图计算PV的SQL,Doris在计算时,会按照下图进行计算,先根据page列和user_id列groupby,最后再count。

显然,上面的计算方式,当数据量越来越大,到几十亿,几百亿时,使用的IO资源,CPU资源,内存资源,网络资源就会越来越多,查询也会越来越慢。

那么,下面一个自然而然的问题就是,应该如何让Doris的精确去重查询性能更快呢?

要在Doris中预计算,自然要用到Doris的聚合模型,下面简单看下Doris中的聚合模型:

Doris的聚合模型分为Key列和Value列,Key列就是维度列,Value列是指标列,Key列全局有序,每个Value列会有对应的聚合函数,相同Key列的Value会根据对应的聚合函数进行聚合。上图中,Year,City是Key列,Cost是Value列,Cost对应的聚合函数式Sum。Doris支持根据不同维度组合建立不同的Rollup表,并能在查询时自动路由。

所以要在Doris中实现CountDistinct的预计算,就是实现一种CountDistinct的聚合指标。那么可以像Sum,Min,Max聚合指标一样直接实现一种CountDistinct聚合指标吗?

Doris中聚合指标必须支持上卷。但如果只保留每个City的User的去重值,就没办法上卷聚合出只有Year为维度的时候User的去重值,因为去重值不能直接相加,已经把明细丢失了,不知道在2016或2017年,北京和上海不重合的User有多少。

所以去重指标要支持上卷聚合,就必须保留明细,不能只保留一个最终的去重值。而计算机保留信息的最小单位是一个bit,所以很自然的想到用Bitmap来保留去重指标的明细数据。

RoaringBitmap的核心思路很简单,就是根据数据的不同特征采用不同的存储或压缩方式。为了实现这一点,RoaringBitmap首先进行了分桶,将整个int域拆成了2的16次方65536个桶,每个桶最多包含65536个元素。

所以一个int的高16位决定了,它位于哪个桶,桶里只存储低16位。以图中的例子来说,62的前1000个倍数,高16位都是0,所以都在第一个桶里。

然后在桶粒度针对不同的数据特点,采用不同的存储或压缩方式:

默认会采用16位的Short数组来存储低16位数据,当元素个数超过4096时,会采用Bitmap来存储数据。

第3类RunContainer是优化连续的数据,Run指的是RunLengthEncoding(RLE)

在做字典映射时,使用比较广泛的数据结构是Trie树。

Trie树的问题是字典对应的编码值是基于节点位置决定的,所以Trie树是不可变的。这样没办法用来实现全局字典,因为要做全局字典必然要支持追加。

如何让同一个String永远映射到同一个ID。一个简单的思路是把String对应的ID直接序列化下来,因为全局字典只需要支持String到ID的单向查找,不需要支持ID到String的反向查找。

当全局字典越来越大的时候,就会面临内存不足的问题。一个自然的想法就是Split。当全局字典拆成多个子树之后,必然会涉及到每个子树的按需加载和删除,这个功能是使用Guava的LoadingCache实现的。

为了解决读写冲突的问题,实现了MVCC,使得读写可以同时进行。全局字典目前是存储在HDFS上的,一个全局字典目录下会有多个Version,读的时候永远读取最新Version的数据,写的时候会先写到临时目录,完成后再拷贝到最新的Version目录。同时为了保证全局字典的串行更新,引入分布式锁。

目前基于Trie树的全局字典存在的一个问题是,全局字典的编码过程是串行的,没有分布式化,所以当全局字典的基数到几十亿规模时,编码过程就会很慢。一个可行的思路是,类似RoaringBitmap,可以将整个Int域进行分桶,每个桶对应固定范围的ID编码值,每个String通过Hash决定它会落到哪个桶内,这样全局字典的编码过程就可以并发。

正是由于目前基于Trie树的全局字典无法分布式构建,滴滴的同学引入了基于Hive表的全局字典。

这种方案中全局字典本身是一张Hive表,Hive表有两个列,一个是原始值,一个是编码的Int值,然后通过上面的4步就可以通过Spark或者MR实现全局字典的更新,和对事实表中Value列的替换。

基于Hive表的全局字典相比基于Trie树的全局字典的优点除了可以分布式化,还可以实现全局字典的复用。但是缺点也是显而易见,相比基于Trie树的全局字典,会使用多几倍的资源,因为原始事实表会被读取多次,而且还有两次Join。

CreateTable(为了有更好的加速效果,最好建下ROLLUP)

CREATETABLE`pv_bitmap`(`dt`int,`page`varchar(10),`user_id`bitmapbitmap_union)AGGREGATEKEY(`dt`,page)DISTRIBUTEDBYHASH(`dt`)BUCKETS2;ALTERTABLEpv_bitmapADDROLLUPpv(page,user_id);

LoadData

catdata|curl--location-trusted-uuser:passwd-T--H"columns:dt,page,user_id,user_id=$BITMAP_LOAD_FUNCTION(user_id)"http://host:8410/api/test/pv_bitmap/_stream_loadTO_BITMAP(expr):将0~4294967295的unsignedint转为bitmapBITMAP_HASH(expr):将任意类型的列通过Hash的方式转为bitmapBITMAP_EMPTY():生成空bitmap,用于insert或导入的时填充默认值

Query

selectbitmap_count(bitmap_union(user_id))frompv_bitmap;selectbitmap_union_count(user_id)frompv_bitmap;selectbitmap_union_int(id)frompv_bitmap;BITMAP_UNION(expr):计算两个Bitmap的并集,返回值是序列化后的Bitmap值BITMAP_COUNT(expr):计算Bitmap的基数值BITMAP_UNION_COUNT(expr):和BITMAP_COUNT(BITMAP_UNION(expr))等价BITMAP_UNION_INT(expr):和COUNT(DISTINCTexpr)等价(仅支持TINYINT,SMALLINT和INT)

InsertInto(可以加速无需上卷的精确去重查询场景)

insertintobitmaptable1(id,id2)VALUES(1001,tobitmap(1000)),(1001,to_bitmap(2000));insertintobitmaptable1selectid,bitmapunion(id2)frombitmap_table2groupbyid;insertintobitmaptable1selectid,bitmaphash(id_string)fromtable;基于Bitmap的用户行为分析

用户行为分析从字面意思上讲,就是分析用户的行为。分析用户的哪些行为呢?可以简单用5W2H来总结。即Who(谁)、What(做了什么行为)、When(什么时间)、Where(在哪里)、Why(目的是什么)、How(通过什么方式),Howmuch(用了多长时间、花了多少钱)。

其终极目的就是为了不断优化产品,提升用户体验,让用户花更多的时间,花更多的钱在自己的产品上。

目前用户行为分析的解法大概有这么几种:

第一种就数据库的Join解法,一般效率是比较低的。我们在Doris中是可以用这种思路实现的。第二种是基于明细数据的,UDAF实现。Doris也是支持的。第三种是基于Bitmap的UDAF实现的,也就是今天要分享的。第四种是用专用的系统来做用户行为分析,专用系统的好处是可以针对特定场景,做更多的优化。

现在已经在Doris的聚合模型中支持了Bitmap,所以可以基于Bitmap实现各类UDF,UDAF,来实现大多数用户行为分析。

selectintersect_count(user_id,dt,'20191111')asfirst_day,intersect_count(user_id,dt,'20191112')assecond_day,intersect_count(user_id,dt,'20191111','20191112')asretention,fromtablewheredtin('20191111','20191112')

假如有user_id和page的信息,我们希望知道在访问美团页面之后,又有多少用户访问了外卖页面,也可以用intersect_count来进行计算。

selectintersect_count(user_id,tag_value,'男','90后','10-20万')fromuser_profilewhere(tag_type='性别'andtag_value='男')or(tag_type='年龄'andtag_value='90后')or(tag_type='收入'andtag_value='10-20万')

最后也可以通过intersect_count来进行一些特定用户的筛选。例如原始表里有user_id,tag_value,tag_type这些信息,我们想计算年收入10-20万的90后男性有多少,就可以用这个简单的SQL来实现。

如果应用基数在百万、千万量级,并拥有几十台机器,那么直接选择countdistinct即可;

如果希望进行用户行为分析,可以考虑IntersectCount或者自定义UDAF。

外卖业务为大家提供送餐服务,连接商家与用户,这是一个劳动密集型的业务,外卖业务有上万人的运营团队来服务全国几百万的商家,并以“商圈”为单元,服务于“商圈”内的商家。“商圈”及其上层组织机构是一个变化维度,当“商圈”边界发生变化时,就导致在往常日增量的业务生产方式中,历史数据的回溯失去了参考意义。在所有展现组织机构数据的业务场景中,组织机构的变化是一个绕不开的技术问题。此外,商家品类、类型等其它维度也存在变化维的问题。如下图所示:

数据爆炸,每日使用最新维度对历史数据进行回溯计算。在Kylin的MOLAP模式下存在如下问题:

既然变化维的历史数据预计算成本巨大,最好的办法就是现用现算,但现用现算需要强大的并行计算能力。OLAP的实现有MOLAP、ROLAP、HOLAP三种形式。长期以来,由于传统关系型DBMS的数据处理能力有限,所以ROLAP模式受到很大的局限性。随着分布式、并行化技术成熟应用,MPP引擎逐渐表现出强大的高吞吐、低时延计算能力,号称“亿级秒开”的引擎不在少数,ROLAP模式可以得到更好的延伸。例如:日数据量的ROLAP现场计算,周、月趋势的计算,以及明细数据的浏览都可以较好的应对。

下图是MOLAP模式与ROLAP模式下应用方案的比较:

综上所述,在变化维、非预设维、细粒度统计的应用场景下,使用MPP引擎驱动的ROLAP模式,可以简化模型设计,减少预计算的代价,并通过强大的实时计算能力,可以支撑良好的实时交互体验。

select*fromt1joint2ont1.id=t2.idwheret1.id=1

Doris开源版本默认会对t2表进行全表Scan,这样会导致上面的查询超时,进而导致外卖业务在Doris上的第一批应用无法上线。

于是在Doris中实现了第一个优化:Join谓词下推的传递性优化(MySQL和TiDB中称之为ConstantPropagation)。Join谓词下推的传递性优化是指:基于谓词t1.id=t2.id和t1.id=1,可以推断出新的谓词t2.id=1,并将谓词t2.id=1下推到t2的Scan节点。这样假如t2表有数百个分区的话,查询性能就会有数十倍甚至上百倍的提升,因为t2表参与Scan和Join的数据量会显著减少。

如上图所示,Doris默认在每个节点上为每个算子只会生成1个执行实例。这样的话,如果数据量很大,每个执行实例的算子就需要处理大量的数据,而且无法充分利用集群的CPU、IO、内存等资源。

一个比较容易想到的优化手段是,可以在每个节点上为每个算子生成多个执行实例。这样每个算子只需要处理少量数据,而且多个执行实例可以并行执行。

下图是并发度设置为5的优化效果,可以看到对于多种类型的查询,会有3到5倍的查询性能提升:

ColocateJoin(LocalJoin)是和ShuffleJoin、BroadcastJoin相对的概念,即将两表的数据提前按照JoinKeyShard,这样在Join执行时就没有数据网络传输的开销,两表可以直接在本地进行Join。

整个ColocateJoin在Doris中实现的关键点如下:

对于下面的SQL,DorisColocateJoin和ShuffleJoin在不同数据量下的性能对比如下:

selectcount(*)FROMAt1INNERJOIN[shuffle]Bt5ON((t1.dt=t5.dt)AND(t1.id=t5.id))INNERJOIN[shuffle]Ct6ON((t1.dt=t6.dt)AND(t1.id=t6.id))wheret1.dtin(xxxdays);

Bitmap精确去重

Doris之前实现精确去重的方式是现场计算的,实现方法和Spark、MapReduce类似:

对于上图计算PV的SQL,Doris在计算时,会按照下图的方式进行计算,先根据page列和user_id列groupby,最后再Count:

显然,上面的计算方式,当数据量越来越大,到几十亿几百亿时,使用的IO资源、CPU资源、内存资源、网络资源会变得越来越多,查询也会变得越来越慢。

于是在Doris中新增了一种Bitmap聚合指标,数据导入时,相同维度列的数据会使用Bitmap聚合。有了Bitmap后,Doris中计算精确去重的方式如下:

可以看到,当使用Bitmap之后,之前的PV计算过程会大幅简化,现场查询时的IO、CPU、内存,网络资源也会显著减少,并且不再会随着数据规模而线性增加。

实践证明,以Doris引擎为驱动的ROLAP模式可以较好地处理汇总与明细、变化维的历史回溯、非预设维的灵活应用、准实时的批处理等场景。而以Kylin为基础的MOLAP模式在处理增量业务分析,固化维度场景,通过预计算以空间换时间方面依然重要。

业务方面,通过外卖数仓Doris的成功实践以及跨事业群的交流,美团已经有更多的团队了解并尝试使用了Doris方案。而且在平台同学的共同努力下,引擎性能还有较大提升空间,相信以Doris引擎为驱动的ROLAP模式会为美团的业务团队带来更大的收益。从目前实践效果看,其完全有替代Kylin、Druid、ES等引擎的趋势。

目前,数据库技术进步飞速,近期柏睿数据发布全内存分布式数据库RapidsDBv4.0支持TB级毫秒响应(处理千亿数据可实现毫秒级响应)。可以预见,数据库技术的进步将大大改善数仓的分层管理与应用支撑效率,业务将变得“定义即可见”,也将极大地提升数据的价值。

因此需要一个合适的数据查询引擎来替代原有系统,考虑到团队的人力和研发能力,选择使用开源的OLAP引擎来替换原有系统。

为广告主提供在线报表数据查询服务,因此该OLAP查询引擎必须满足:可以支持高并发查询,可以毫秒级返回数据,且可以随着业务的发展水平扩展。此外也承接了越来越多运营和采销同事的多维数据分析的需求,因此希望该OLAP引擎也可以支持高吞吐的Ad-hoc查询。

需要同时支持离线(T+1)大规模数据和实时(分钟级间隔)数据的导入,数据导入后即可查询,保证数据导入的实时性和原子性。离线数据(几十G)的导入任务需要在1小时内完成,实时数据(百M到几G)的导入任务需要在10分钟内完成。

在“618”这类大促前通常会进行扩容,因此需要新系统扩容方便,无需重刷历史数据来重新分布数据,且扩容后原有机器的数据最好可以很方便地迁移到新机器上,避免造成数据倾斜。

根据日常业务的需要,经常会进行SchemaChange操作。由于原有系统对这方面的支持很差,希望新系统可以进行OnlineSchemaChange,且对线上查询无影响。

由于业务的日常变更会对一些表进行数据修复,因此新系统需要支持错误数据的删除,从而无需重刷全部历史数据,避免人力和计算资源的浪费。

目前开源的OLAP引擎很多,但由于面临大促的压力,需要尽快完成选型并进行数据迁移,因此只考察比较出名的几个OLAP系统:ClickHouse,Druid和Doris。

最终选择Doris来替换原有系统,主要基于以下几方面的考虑:

经过对系统的改造,目前使用Doris作为系统中的一个数据存储层,汇总了离线和实时数据,也为上层查询系统提供统一的效果数据查询接口。如下图所示:

日常实时数据主要包含展现/点击跟单数据,DMP人群包的效果数据以及十几条产品线的点击,展现和消耗数据,导入时间间隔从1分钟到1小时不等,数据量在百M左右的可以秒级导入,数据量在1G左右的可以在1分钟内完成。离线数据主要包含搜索词的效果数据和各种营销工具的基础数据,大多数都是T+1数据,每日新增数据量在20G-30G,导入耗时在10-20分钟。

大多数效果数据报表,广告主的查询维度相对固定且可控,但要求能在毫秒级返回数据,所以必须保证这些查询场景下的性能。Doris支持的聚合模型可以进行数据的预聚合,将点击,展现,消耗等数据汇总到建表时指定的维度。

此外,Doris支持建立Rollup表(即物化视图)也可以在不同维度上进行预聚合,这种自定义的方式相比Kylin的自动构建cube,有效避免了数据的膨胀,在满足查询时延的要求下,降低了磁盘占用。Doris还可以通过Rollup表对维度列的顺序进行调整,避免了Kylin中因过滤维度列在HBaseRowKey后部而造成的查询性能低下。

对于一些为广告主提供的营销工具,维度和指标通常会有30~60列之多,而且大部分查询要求按照所有维度列进行聚合,由于维度列较多,这种查询只能依赖于现场计算能力。目前对于这种类型的查询请求,会将其数据尽量均匀分布到多台BE上,利用DorisMPP架构的特性,并行计算,并通过控制查询时间范围(一个月),可以使TP99达到3s左右。

正是由于Doris具有自定义的预计算能力和不俗的现场计算能力,简化了日常工作。以为广告主提供的营销工具“行业大盘”为例,如图所示,这种业务场景下,不仅要计算广告主自身的指标数据,还需计算广告主所在类目的指标数据,从而进行对比。

原有系统数据分片只能按照指定列进行散列,没有分布式查询计划,就不能汇总类目维度数据。原先为了解决这种业务场景,虽然底层是同一数据源,但需要建两个表,一个是广告主维度表,一个是类目维度表,维护了两个数据流,增大了工作负担。

使用了Doris之后,广告主维度表可以Rollup出类目维度表。查询广告主维度数据时可以根据分区分桶(按照时间分区,按照广告主ID分桶)确定一个Tablet,缩小数据查询范围,查询效率很高。查询类目维度时,数据已经按照广告主ID进行分片,可以充分利用Doris现场计算的能力,多个BE并行计算,实时计算类目维度数据,在我们的线上环境也能实现秒级查询。这种方案下数据查询更加灵活,无需为了查询性能而维护多个预计算数据,也可以避免多张表之间出现数据不一致的问题。

Doris支持聚合模型,可以提前聚合好数据,对计算广告效果数据点击,展现和消耗十分适合。对一些数据量较大的高基数表,可以对查询进行分析,建立不同维度或者顺序的的Rollup表来满足查询性能的需求。

Doris支持OnlineSchemaChange,相比原有系统SchemaChange需要多个模块联动,耗费多个人力数天才能进行的操作,Doris只需一条SQL且在较短时间内就可以完成。对于日常需求来说,最常见的SchemaChange操作就是加列,Doris对于加列操作使用的是LinkedSchemaChange方式,该方式可以无需转换数据,直接完成,新导入的数据按照新的Schema进行导入,旧数据可以按照新加列的默认值进行查询,无需重刷历史数据。

Doris通过HLL列和BITMAP列支持了近似/精确去重的功能,解决了之前无法计算UV的问题。

日常数据修复,相较于以前有了更多的方式可以选择。对于一些不是很敏感的数据,我们可以删除错误数据并进行重新导入;对于一些比较重要的线上数据,我们可以使用SparkonDoris计算正确数据和错误数据之间的差值,并导入增量进行修复。这样的好处是,不会暴露一个中间状态给广告主。还有一些业务会对一个或多个月的数据进行重刷。目前在测试使用Doris0.12版本提供的TempPartition功能,该功能可以先将正确数据导入到TempPartition中,完成后可以将要删除的Partition和TempPartition进行交换,交换操作是原子性的,对于上层用户无感知。

Doris添加新的BE节点后可以自动迁移Tablet到新节点上,达到数据负载均衡。通过添加FE节点,则可以支撑更高的查询峰值,满足大促高并发的要求。

大促期间数据导入量会暴增,而且在备战期间,也会有憋单演练,在短时间内会产生大量数据导入任务。通过导入模块限制Load的并发,可以避免大量数据的同时导入,保证了Doris的导入性能。

Doris在团队已经经历了数次大促,在所有大促期间无事故发生,查询峰值4500+qps,日查询总量8千万+,TP99毫秒级,数据日增量近300亿行,且实时导入数据秒级延迟。

Doris支持低延时的高并发查询和高吞吐的Ad-hoc查询,但是这两类查询会相互影响,迁移到Doris的初期日常线上的主要问题就是高吞吐的查询占用资源过多,导致大量低延时的查询超时。后来使用两个集群来对两类查询进行物理隔离,解决了该问题。

Doris在0.11版本时FE的MySQL服务IO线程模型较为简单,使用一个Acceptor+ThreadPool来完成MySQL协议的通信过程,单个FE节点在并发较高(2000+qps左右)的时候会出现连接不上的问题,但此时CPU占用并不高。在0.12版本的时候,Doris支持了NIO,解决了这个问题,可以支撑更高的并发。也可以使用长连接解决这个问题,但需要注意Doris默认对连接数有限制,连接占满了就无法建立新的连接了。

首先为什么要做思域精细化运营呢,这起源于两个痛点:

然后针对这两个问题,产品上面提出了一个解决方案--就是分层运营,它主要分为两部分:一个是运营触达,还有一个是精细化的人群。

首先给大家简单介绍遇到的四个难点:

1.TB级数据。数据量特别大,前面讲到是基于画像和行为去做的一个用户分层,数据量是特别大的,每天的数据量规模是1T+2.查询的频响要求极高,毫秒级到秒级的一个要求。前面介绍B端视角功能时大家有看到,有一个预估人数的功能,用户只要点击”预估人数“按钮,需要从TB级的数据量级里面计算出筛选出的人群人数是多少,这种要在秒级时间计算TB级的数量的一个结果的难度其实可想而知3.计算复杂,需要动静组合。怎么理解?就是现在很多维度是没办法去做预聚合的,必须去存明细数据,然后去实时的计算,这个后面也会细讲4.产出用户包的时效性要求高。这个比较好理解,如果产出特别慢的话,肯定会影响用户体验

针对第一个难点-->压缩存储,降低查询的数量级。具体选型就是使用Bitmap存储,这解法其实很好理解,不管现在主流的OLAP引擎有多么厉害,数据量越大,查询肯定会越慢,不可能说数据量越大,我查询还是一直不变的,这种其实不存在的,所以我们就需要降低存储。

针对第二和第三个难点-->选择合适计算引擎在调研了当前开源的包括ClickHouse,Doris,Druid等多种引擎后,最终选择了基于MPP架构的OLAP引擎Doris。这里可以简单跟大家介绍一下选择Doris的原因,从性能来说其实都差不多,但是都Doris有几个优点:第一:它是兼容Mysql协议,也就是说你的学习成本非常低,基本上大家只要了解mysql,就会用Doris,不需要很大的学习成本。第二:Doris运维成本很低,基本上就是自动化运维。

针对第四个难点-->选择合适的引擎通过对比Spark和Doris,我们选择了Doris,后面会详细讲为什么会用Doris。

架构的话分为两部分,就是在线部分跟离线部分。

在线部分:分为了四层:服务层、解析层、计算层跟存储层,然后还有调度平台和监控平台。

解析层,是对DSL的一个解析、优化、路由以及Sql模板:

比如要查在线预估人数,首先会在解析层做一个DSL的解析,之后根据不同情景做DSL的优化,比如选择了近七天活跃且近七天不活跃的用户,这种要七天活跃和七天不活跃的交集显然就是零了,对不对?像这样情况在优化层直接将结果0返回给用户就不会再往下走计算引擎,类似还有很多其他优化场景。然后优化完之后会使用DSL路由功能,根据不同查询路由到不同的Sql模板进行模板的拼接。

计算层,计算引擎使用Spark和Doris:

离线部分的话主要是对需要的数据源(比如说画像、关注、行为等数据源)做ETL清洗,清洗完之后会做一个全局字典并写入Doris。任务最终会产出用户包,并会分发给小程序B端跟百度统计:

图中大家可以看到有几个标红的地方,同时也用数字1、2、3做了标记,这几个标红是重点模块,就是针对于上面提到的四个难点做的重点模块改造,接下来会针对这三个重点模块一一展开进行讲解。

首先讲解全局字典这个模块,全局字典的目的主要是为了解决难点一:数据量大,需要压缩存储同时压缩存储之后还要保证查询性能。

为啥要用全局字典:这里大家可能会有一个疑问,就是说用BitMap存储为啥还要做全局字典?这个主要是因为Doris的BitMap功能是基于RoaringBitmap实现的,因此假如说用户ID过于离散的时候,RoaringBitmap底层存储结构用的是ArrayContainer而不是BitMapContainer,ArrayContainer性能远远差于BitMapContainer。因此我们要使用全局字典将用户ID映射成连续递增的ID,这就是使用全局字典的目的。

全局字典的更新逻辑概况:这里是使用Spark程序来实现的,首先加载经过ETL清洗之后各个数据源(画像、关注、行为这些数据源)和全局字典历史表(用来维护维护用户ID跟自增ID映射关系),加载完之后会判断ETL里面的用户ID是否已经存在字典表里面,如果有的话,就直接把ETL的数据写回Doris就行了,如果没有就说明这是一个新用户,然后会用row_number方法生成一个自增ID,跟这个新用户做一次映射,映射完之后更新到全局字典并写入Doris。

接下来介绍第二个重点模块Doris。

分桶策略的目的是为了解决难点二:查询频响要求高。

之前使用了全局字典保证用户的连续递增,但是发现用了全局字典之后,BitMap的查询性能其实并没有达到预期。Doris其实是分布式的一个集群,它会按照某些Key进行分桶,也就是分桶之后用户ID在桶内就不连续,又变成零散的了。

方案其实就是在表里面增加了一个hid的字段,然后让Doris按照hid字段进行分桶,这里hid生成算法是:

大家可以看一下:userid是六个即0~5,所以M=6;分为三个桶,N=3;因此M除以N就等于二。这样的话我就要用userid去除以二,然后取整作为hid。可以看一下,比如说userid是零,0÷2取整为0,userid是一的话,hid还是这样,因为1÷2的整数部分是零;同理2÷2、3÷2是一,4÷2、5÷2是二,这样的话就把userid跟hid做对应,然后再根据hid做分层。大家可以看到分层结果,hid=0时userid是0、1,hid=1时userid是2、3,hid=2时userid是4、5,这样就保证了桶内连续。

画像标签优化解决的难点也是难点二:查询频响要求高。

方案一:tag_type,tag_value。tag_type是用来记录标签的类型,tag_value是用来记录标签的内容。如图所示:比如说tag_type是性别,tag_value可能是男或女,bitmap这里就是存储所有性别是男的用户id列表。同样对于tag_type是地域、tag_value是北京,bitmap存储的是所有地域在北京的用户id列表。

方案二:大宽表,使用大宽表在一行记录了所有的标签,然后使用bitmap记录这个标签的用户id列表。

最终选择方案二,为什么没有选方案一呢?因为方案一它是一个标签对应一个用户bitmap,当想查一个联合的结果就比较耗时,比如想查询性别是男且区域是北京的所有用户,这样的话需要取出“男”的用户和“北京“的用户,两者之间做一个交集。肯定会有计算量会有更多的时间消耗,但是如果用大宽表去做存储的话,就可以根据用户常用的查询去构建一个物化视图,当用户的查询(比如在北京的男性)命中了物化视图,就可以直接去取结果,而不用再去做计算,从而降低耗时。

这里还有一个知识点跟大家分享一下:在使用Doris的时候,一定要尽量去命中它的前缀索引跟物化视图,这样会大大的提升查询效率。

动静组合查询,对应的难点是难点三:计算复杂。

首先介绍一下什么叫动静组合查询:

然后小程序用户分层,相比于同类型的用户分层功能增加了用户行为筛选,这也是小程序产品的特点之一。比如说可以查近30天用户支付订单超过30元的男性,这种”近30天用户支付订单超过30元“的查询是没办法用bitmap做记录的,也没办法说提前计算好,只能在线去算。这种就是一个难点,就是说怎么用非bitmap表和bitmap做交并补集的运算,为了解决这个问题,结合上面的例子把查询拆分为四步:查近30天用户支付订单超过30元的男性,且年龄在20~30岁的用户(具体查询语句参考PPT图片)

第一步先查20~30岁的男性用户。因为是比较固定,这里可以直接查bitmap表。

第二步要查近30天用户支付订单超过30元的用户。这种的话就没办法去查bitmap表了,因为bitmap没有办法做这种聚合,只能去查行为表。

第三步就是要做用户ID跟在线bitmap的一个转化。Doris其实已经提供了这样的功能函数:to_bitmap,可以在线将用户id转换成bitmap。

第四步是求交集。就是第一步和第四步的结果求交集。

然后,整篇的核心其实是在第三步:Doris提供了to_bitmap的功能,它帮我们解决了非bitmap表和bitmap联合查询的问题。

以上是基于Doris用户分层方案的一个讲解,基于上述方案整体的性能收益是:

如何快速产出用户包

现在讲一下第三部分:用户包。这部分主要是用来解决难点四:产出用户包要求时效性高。这个其实也有两个方案:

方案一:调度平台+spark。这个其实比较容易理解,因为要跑离线任务很容易就想到了spark。在这个调度平台里面用了DAG图,分三步:先产出用户的cuid,然后再产出用户的uid,最后是回调一下做一次更新。

方案一使用的是Spark,它存在几个问题:比如Yarn调度比较耗时,有时候也会因为队列的资源紧张而会有延迟,所以有时候会出现一个很极端的情况就是:产出零个用户,也要30分钟才能跑完,这种对用户的体验度非常不好。

方案二的话就是利用了Doris的SELECTINTOOUTFILE产出结果导出功能,就是查出的结果可以直接导出到AFS,这样的效果就是最快不到三分钟就可以产出百万级用户,所以Doris性能在某些场景下比Spark要好很多。

关于compaction是什么意思、读音的内容到此结束,希望对大家有所帮助。

本站涵盖的内容、图片、视频等数据,部分未能与原作者取得联系。若涉及版权问题,请及时通知我们并提供相关证明材料,我们将及时予以删除!谢谢大家的理解与支持!

Copyright © 2023