Hologres(中文名交互式分析)是阿里云自研的一站式及时数仓,这个云原生体系融合了及时服务和分析大数据的场景,全面兼容PostgreSQL协议并与大数据生态无缝打通,能用同一套数据架构同时支持及时写入及时查问以及及时离线联邦分析。它的出现简化了业务的架构,与此同时为业务提供及时决策的能力,让大数据发挥出更大的商业价值。
Hologres作为HSAP服务分析一体化的落地最佳实践,其查问引擎是一个完全自研的施行引擎,它的核心设计目标是支持所有类型的分布式分析和服务查问,并做到极致查问机能。为了做到这一点,我们借鉴了各种分布式查问体系,包括分析型数据库,及时数仓等,吸取了各方面的优势从零开始打造出一个全新的施行引擎。
为什么要选择从零开始做一个新的查问引擎?开源的分布式分析查问体系主要有两大类:
一类是传统的 Massively Parallel Processing 体系,能够支持通用的 SQL 查问,但是对及时场景支持不够好,机能不够理想。一类是 Apache Druid 和 ClickHouse这些及时数仓,是专门为及时场景设计和优化的,能够比较好地支持一些常见的单表及时查问,但是复杂查问的机能比较差。另外大数据生态圈基于 MapReduce 的引擎比较适合批处置 ETL,一般不太适合在线服务和多维分析的场景,机能也差不少。
Hologres 施行引擎是在一个能支持复杂查问和上述高机能及时服务查问的通用架构,先首先实现了常用的及时数仓场景,深入优化并用内部 Benchmark 验证了机能和稳定性超过包括专用及时数仓的其它竞品之后,再扩展到其它复杂查问的支持。扩展的过程中,在不可避免地体系变得越来越复杂的同时,也用 Benchmark 帮助保持简单及时查问的机能没有回退。如果在已有的查问引擎上做改进,因为很多架构和设计上的选择已经定型,牵一发而动全身,就很难达到这样的效果。
Hologres施行引擎从开发到落地实践面临了非常多的挑战,但也给我们提供了机会把这个领域的各种新进展都结合利用起来,并超越已有体系做到对各种查问类型的高机能处置,其背后主要是基于以下特点:
分布式施行模型:一个和保存计较分离架构配合的分布式施行模型。施行设计由异步算子组成的施行图 DAG(有向无环图) 表示,可以表达各种复杂查问,并且完美适配 Hologres 的数据保存模型,方便对接查问优化器,利用业界各种查问优化技术。全异步施行:端到端的全异步处置框架,可以避免高并发体系的瓶颈,充分利用资源,并且最大可能地避免保存计较分离体系带来的读数据延缓的影响。向量化和列处置:算子内部处置数据时最大可能地使用向量化施行,和保存引擎的深度集成,通过灵活的施行模型,充分利用各种索引,并且最大化地延缓向量物化和延缓计较,避免不必要的读数据和计较。自适应增量处置:对常见及时数据应用查问模式的自适应增量处置。特定查问深度优化:对一些查问模式的独特优化
下面将会对各个模块一一介绍。
分布式施行模型
Hologres 是能够弹性无限水平扩展数据量和计较能力的体系,需要能够支持高效的分布式查问。
Hologres 查问引擎施行的是由优化器生成的分布式施行设计。施行设计由算子组成。因为 Hologres 的一个表的数据会根据 Distribution Key 分布在多个 Shard 上,每个 Shard 内又可以包含很多 Segment,施行设计也会反映这样的结构,并分布到数据所在的节点去施行。每个Table Shard 会被加载到一个计较节点,数据会被缓存到这个节点的内存和本地保存。因为是保存计较分离的架构,如果一个节点出错,其服务的 Shard 可以被重新加载到任意一个计较节点,只是相当于清空了缓存。
例如一个比较简单的查问。
select key, count(value) as total from table1 group by key order by total desc limit 100。
如果是单机数据库,可以用这样的施行设计。如果数据和计较分布在多个节点上,就需要更复杂的施行设计。
在分布式表上,为了更高效地施行,尽量减少数据传输,可以把施行设计分为不同片段(Fragment)分布到相应节点施行,并且把一些操作下推来减少 Fragment 输出的数据,可能就变成这样的施行设计:
根据数据的特性,优化器可能会生成不同的设计。例如在某一个局部聚合并没有显著减少数据量的时候,可以省略这个算子。又例如在 Key 就是 Distribution key 的时候,可以优化为:
从这些例子可以看出,Hologres 的施行设计根据数据的特性切分为不同的片段之后分布式并发施行。片段之间通过 Exchange 算子进行数据交换。更复杂的比如多表关联(Join)查问,会有更多的片段和更复杂的数据交换模式。
比如以下SQL
select user_name, sum(value) as total from t1 join t2 on t1.user_id = t2.user_id where … group by user_name order by total limit 100
在Hologres中可以是这样的施行设计
如果 Join key 和 Distribution Key 一致,可以优化为如下施行设计,减少远程数据传输。根据需要的查问合理地设置 Distribution Key,可能显著提高查问机能。
根据过滤条件和统计信息等等,优化器还可能生成不同的优化施行设计,比如包含动态过滤,局部聚合等等。
这样的分布式施行设计足够通用,可以表达所有的 SQL 查问和一些其它查问。施行设计和大部分 Massively Parallel Processing (MPP) 体系也比较类似,方便借鉴和集成业界的一些适用的优化。稍微独特一些的地方是很多查问设计片段的实例是和 Hologres 的保存结构对齐的,能够进行高效的分区裁剪和文件裁剪。
同时,Hologres 实现了 PostgreSQL 的 Explain 和 Explain Analyze 系列语句,可以展示文本格式的施行设计和相应的施行信息,方便用户自助了解施行设计,并针对性做出SQL优化调整。
全异步施行
高并发体系,特别是有大量 I/O 的体系,频繁地等待或者任务切换是常见的体系瓶颈。异步处置是一种已经被证明行之有效的避免这些瓶颈,并把高并发体系机能推到极致的方法。
Hologres 的整个后端,包括施行引擎、保存引擎和其它组件,统一使用 HOS(Hologres Operation System) 组件提供的异步无锁编程框架,能够最大化异步施行的效果。每个 Fragment 的实例使用 HOS 的一个 EC (逻辑调度单位),使得一个 Fragment 里的所有算子和保存引擎可以异步施行并且无锁安全访问绝大多数资源。
算子和 Fragment 都是类似这样的接口:
future<> Open(const SeekParameters& parameters, …)
future<RecordBatchPtr, bool> GetNext(…)
future<> Close(…)
除了一般异步处置的好处外,异步算子接口较好地规避了保存计较分离架构下相对较高的读数据延缓对查问机能的影响,并且对分布式查问的施行模型本身也有独特的好处。
DAG 施行引擎一般可以分为拉数据的模性(比如火山模型)和推的模型(比如很多大数据的分阶段施行模型),各有其优缺点。而 Hologres采用的异步的拉模型能够取得两种模型的好处并且避免其缺点(已经申请了专利)。举一个常见的 Hash Join 来说明:
火山模型可以简单做到先拉完 b 的数据构建 hash table,然后流式处置 a 的数据不用全放在内存里。但是当 a 或者 b 需要读数据的时候,简单的实现需要等待不能把 CPU 打满,需要通过提高 Fragment 的并发数或者引入复杂的 pre-fetch 机制来充分利用资源,而这些又会引入别的机能问题。
推数据的模型,比较容易做到并发读数据请求并在完成的时候触发下游处置,但是上述 Join算子的实现会比较复杂。比如 a 处置完一批数据推到 Join 算子而 b 的 hash table 还没有构建完成,这批数据就需要暂存到内存里或者盘上,或者引入反压机制。在 Fragment 的边界也会有类似问题,造成一些在拉数据模型下不需要的数据缓存。
Hologres 的算子和 Fragment 的异步拉数据模型,可以像火山模型一样简单做到按需从上游获取数据,而同时又可以像推数据模型一样简单做到读数据并发,只要向上游发出多个异步 GetNext,上游处置完成时会自然触发后续处置。异步GetNext 的数目和时机,可以看做是天然的流控机制,可以有效做到提高 CPU 利用率并且避免不必要的数据暂存。
Hologres 已经用这个异步模型实现了一个完整的查问引擎,可以支持所有 PostgreSQL 的查问。
列处置和向量化
按列处置和向量化施行都是分析查问引擎常用的优化机制,可以大幅度提高数据处置的效率。Hologres 也不例外,在能使用向量处置的时候尽量使用。
Hologres 在内存里也采用列式保存。在内存里按列保存数据能够使用更多的向量处置。列式组织数据还有一个好处,就是对延缓计较比较友好。比如 select … where a = 1 and b = 2 …,对一批数据(一般对应保存的一个row group),Hologres的 scan 算子输出的 a 和 b 可以是延缓读取的 a 和 b 的信息,在处置 a = 1 的时候会读取这一批的 a。如果 a=1 对这一批的所有行都不满足,这一批的 b 这一列就根本不会被读取。
但是对某些按行处置的算子,比如 Join,按列保存的数据可能会造成更多的 CPU cache miss ,带来较大的机能问题。很多查问引擎会在不同的点引入按列保存和按行保存的转换,但是频繁的转换本身会带来不小的开销,而且列转行会造成上述延缓读取列被不必要地读取,还有一些其它的机能问题。
自适应增量处置
很多及时数据应用经常会对一个查问用不同的时间段反复施行。比如一个监控指标页面打开后,会定期施行 select avg(v1) from metrics where d1 = x and d2 = y and ts >= ‘2020-11-11 00:00:00’ and ts < ‘2020-11-11 03:01:05’ and … group by d3 … 这样的查问,下一次会改成 ts < ‘2020-11-11 00:03:10’,再下一次 ts < ‘2020-11-11 00:03:15’。
流计较或者增量计较可以对这种查问进行非常高效的处置。但是对这种用户可以随意生成的交互式查问,通常不可能对所有组合都配置流计较或者增量计较任务。如果每次都简单施行查问,又可能有大量的重复计较造成资源浪费和机能不理想。
Hologres充分利用保存引擎和计较引擎的深度集成和列式保存大部分数据在只读文件中的特性,在能提供包含最新写入数据的查问结果的同时尽量避免重复计较,对这种类型的查问能够显著提升机能和减少资源使用。
针对特定查问模式的深度优化
Hologres 对一些特定查问模式有独特的优化。这里以Filter Aggregate 优化为例子。
很多数据应用都有开放列的需求,相当于可以动态添加逻辑列而不用改 Table Schema。比如有一列是多值列 tags(Postgres 可以用 Array 类型)里面存了'{c1:v1, c2:u1}’ 这样的多个逻辑列的值。查问的时候,如果使用普通列,一类常见的查问是
— Q1:
select c1, sum(x) from t1 where c1 in (v1, v2, v3) and name = ‘abc’ group by c1
使用开放列后,这样的查问会转变为
— Q2:
select unnest(tags), sum(x) from t1 where name = ‘abc’ and tags && ARRAY[‘c1:v1’, ‘c1:v2′, c1:v3’]
group by unnest(tags)
having unnest(tags) in (‘c1:v1’, ‘c1:v2′, c1:v3’)
这种查问,Hologres 可以利用位图索引快速计较过滤条件得到相关的行,但是之后从多值列里面取出相关数据操作不能使用向量处置,机能不能达到最优。经过调研,可以把查问的施行转换为
Q3:
select ‘c1:v1’, sum(x) from t1 where tags && ARRAY[‘c1:v1’]
UNION ALL
select ‘c1:v2’, sum(x) from t1 where tags && ARRAY[‘c1:v2’]
UNION ALL
…
这样每个 UNION ALL 分支可以只读取 name 和 tags 的位图索引计较过滤条件,然后用 x 列的数据和过滤条件进行向量计较 SUM_IF 即可得出想要的结果。这样的问题是,每个分支都要过一遍 t1,读取 x 列以及 name 列的位图索引,带来重复计较。最后引入了一个 filter aggregate 的特殊算子来把这类常用查问优化到极致机能,可以只过一遍 t1 并且去掉重复操作,只用向量计较即可得到结果,不需要读取 tags 列的数据。在一个几十 TB的表上实测机能提升 3 倍以上。
类似的优化,Hologres 的施行引擎都会尽量抽象为比较通用的算子,可以适用于更多场景。Filter Aggregate 算子也是 Hologres 申请的专利之一。
总结
Hologres 施行引擎在一个架构里集中了相关分布式查问体系的几乎所有最高效的优化方式(包括各种类型的索引)并作出了特有的改进。通过和保存引擎深度整合,能充分发挥异步模型的优势,并高效利用各种类型的索引来加速查问。所有这些加起来,带来了超越已有体系的机能,并在阿里巴巴双 11 的数据规模下通过了实战的考验,(2020年双11顶住了5.96亿/秒的及时数据洪峰,基于万亿级数据对外提供多维分析和服务,99.99%的查问可以在80ms以内返回结果),对外高并发高机能地提供分布式 HSAP 查问服务。
原创文章,作者:阿里云大数据AI技术,如若转载,请注明出处:https://www.iaiol.com/news/hologres-jie-mi-shen-du-jie-xi-gao-xiao-lyu-fen-bu-shi-cha/