Spilling
Background
Pollux 中的溢出功能允许查询在某些运算符累积大量状态时使用有限的内存成功执行。例如,哈希聚合运算符将中间聚合状态存储在哈希表 中, 并在处理完所有输入后开始生成结果。在高基数工作负载(大量组)中,哈希表的大小会超出查询的内存限制。
为了使此类查询成功执行,溢出功能会启动并将运算符的部分状态写入磁盘。运算符收到所有输入后,会从磁盘读回溢出的状态,并将其与内存中未溢出的状态 合并以生成结果。溢出过程包含两个阶段:溢出和恢复。溢出阶段在运算符处理输入时执行。它决定要溢出运算符状态的哪些部分以及如何将溢出的状态存储在磁盘 上。恢复阶段在算子处理完所有输入后执行。它从磁盘读取溢出状态,并将其与内存中未溢出的状态合并以生成结果。不同的算子使用不同的溢出算法。本文档讨论了哈 希聚合、排序依据和哈希连接算子所使用的算法。
Spilling Framework
Pollux 中的溢出功能由一个溢出框架和一组构建于其之上的可溢出操作符组成。溢出框架提供了常用的溢出函数,包括溢出数 据收集、分区、排序、序列化/反序列化以及存储读写。每个可溢出操作符使用这些函数来实现各自的溢出算法。
Spill Objects
溢出框架由以下主要软件对象组成:
Spiller
Spiller 对象为所有运算符提供溢出功能,并帮助它们管理磁盘上的溢出状态。每个运算符都会创建一个 Spiller 对象。Spiller 对象在构造时会接受一个行容器对象(哈希探测运算符除外,它不缓冲输入,而是直接将溢出行写入磁盘)。 行容器是一个逐行的内存数据存储,由运算符拥有,用于存储其内存状态,当内存不足时,这些状态可以溢出到磁盘。例如,哈希 聚合运算符的行容器存储中间聚合状态,每个组(分组键值的唯一组合)一行。当运算符需要溢出时,Spiller 会扫描行容器中 的所有行,计算每行的分区号,确定哪些分区需要溢出,然后将其写出为文件列表。Spiller 可以在写入磁盘之前对数据进行排序。 如果启用了排序,Spiller 会为每次排序后的运行创建一个单独的文件。恢复时,Spiller 会读回溢出的数据以恢复内存状态。如 果启用了排序,它会创建一个合并排序读取器来读取排序后的数据。此功能由哈希聚合和排序运算符使用。
Spiller 实现了以下主要功能:
溢出数据分区:溢出时,我们只想将最少量的数据溢出到磁盘,并将算子状态的其余部分保留在内存中。溢出器会将行容器中的行划分为多个 分区,并根据需要仅溢出其中一部分。这种技术降低了 IO 成本并加快了恢复过程。未溢出的分区使用常规(内存中)快速执行路径进行处理。每 个溢出分区在磁盘上都有一组单独的文件。溢出的分区一次恢复和处理一个。算子会根据溢出算法指定一组用于计算分区号的列(分区列)。哈希聚 合算子使用分组键对数据进行分区,以确保同一组中的所有行都一起溢出并恢复(作为同一分区的一部分)。
选择要溢出的分区:溢出开始时,它会选择数据最多的分区。如果这些分区有足够的内存可以释放,它就会继续使用这些分区。可溢出数据以行容 器中行占用的内存字节数来衡量。使用排序溢出的运算符应避免溢出数据量较少的分区,即使这些分区之前已经溢出过。排序溢出会为每次排序运行创 建一个新文件,并且不支持将数据附加到 现有文件。非排序溢出可以根据需要将新数据附加到现有文件。
溢出时对数据进行排序:使用排序溢出的算子会使用排序合并算法将溢出文件与内存数据合并。内存数据也将作为已排序运行之一进行排序。溢出器会 根据算子指定的一组比较标志,按分区列对行进行排序。 例如,order by 算子需要确保已排序运行遵循查询计划节点指定的排序顺序。
溢出数据 IO:溢出器通过 SpillFileList 和 SpillFile 对象处理与存储系统的交互,如下所述。它管理溢出文件的生命周期,包括创建、写入、 读取和删除。溢出写入操作会被卸载到专用的 IO 执行器,每个溢出分区写入操作都是一个线程执行单元。溢出读取操作在驱动程序执行器中执行。读取和写入操作均 为同步 IO 操作。
溢出器提供以下溢出 API 供算子使用:
Spill APIs
带目标溢出:操作符指定要溢出的行数和字节数作为目标。溢出器会选择一定数量的分区进行溢出以达到目标值。溢出操作在内部运行,并在溢出完成后返回。 溢出过程不受操作符控制,但可以通过溢出统计 API 检查哪些分区已溢出以及溢出了多少数据。
void Spiller::spill(uint64_t targetRows, uint64_t targetBytes);
SpillPartitionNumSet Spiller::spilledPartitionSet() const;
Stats Spiller::stats() const;
溢出分区:操作符指定要溢出的分区,然后Spiller 将指定分区中的所有行溢出到磁盘。在这种情况下,溢出过程由操作符控制。哈希构 建操作符使用它来协调所有构建操作符运行溢出。当溢出触发时,将选择一个操作符对所有操作符运行溢出(在下面的讨论中也称为组溢出)。它首先 通过 Spiller::fillSpillRuns() 从所有操作符收集可溢出的统计信息, 然后选择多个要溢出的分区。
void Spiller::spill(const SpillPartitionNumSet& partitions);
void Spiller::fillSpillRuns(std::vector<SpillableStats>& statsList);
溢出向量:该运算符将行向量溢出到指定分区。溢出器直接将行向量附加到该分区当前打开的溢出文件中。溢出过程也由该运算符控制。它用于哈希连接进行 溢出。哈希构建运算符和哈希探测运算符如果相应的分区已溢出,则将输入行溢出到磁盘。对于哈希构建运算符,如果某个分区已溢出,则该分区的所有输入行都 必须溢出,因为我们无法使用该分区的行子集构建哈希表进行连接。对于哈希探测运算符,它本身不可溢出,但如果关联的分区已被哈希构建溢出,则它需要溢出 输入行。我们将在哈希连接溢出部分进一步讨论这一点。
void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector);
Restore APIs
已排序溢出恢复:由 order by 和哈希聚合运算符使用。 运算符首先调用 Spiller::finishSpill() 来标记溢出完成。Spiller 从未溢出的分区中收集行并将其返回给运算符。运算符处理未溢出的分区,发出结 果并释放 RowContainer 中的空间。然后,它逐个加载溢出的分区。它为每个溢出分区调用 SpillPartition::createOrderedReader() 来创建一个已排 序的读取器来恢复溢出的分区状态。
void Spiller::finishSpill(SpillPartitionSet& partitionSet);
std::unique_ptr<TreeOfLosers<SpillMergeStream>>
SpillPartition::createOrderedReader();
未排序溢出恢复:由哈希构建和哈希探测操作符使用。操作符首先调用 Spiller::finishSpill() 来标记溢出完成。Spiller 收集溢出分区的元数据 并将其返回给操作符。操作符处理未溢出的分区,并发出结果并释放 RowContainer 中的空间。然后,它逐个加载溢出的分区。它为每个溢出分 区调用 SpillPartition::createReader() 来创建未排序的读取器,以恢复溢出分区的状态。
void Spiller::finishSpill(SpillPartitionSet& partitionSet);
std::unique_ptr<UnorderedStreamReader<BatchStream>>
SpillPartition::createReader();
SpillFileList and SpillFile
SpillFileList 对象管理单个分区的溢出文件。每个溢出文件由一个 SpillFile 对象管理,该对象通过 Pollux 文件系统接口提供与存储系 统的低级 I/O 操作。在溢出路径中,SpillFileList 对象以行向量作为输入,创建一个 VectorStreamGroup 来序列化该行向量,并通过相应的 SpillFile 对 象将序列化的字节流写入当前打开的溢出文件中。如果当前文件的大小超出目标文件大小,则 SpillFileList 对象会启动一个新的溢出文件。在恢复路径中,SpillFile 对象 从底层存储系统读取序列化的字节流,并使用 VectorStreamGroup 将其反序列化为行向量。
Spill Triggers
溢出功能将与 Pollux 内存管理系统集成,以便在系统内存不足时回收内存。当某个运算符无法分配或预留新内存时,内存仲裁器将选择多个 Pollux 任务来缩减其 内存使用量,以满足新的内存分配或预留请求。每个选定的任务将尝试从其可溢出运算符处回收内存。后者则通过将其(部分)内存状态溢出到磁盘来释 放内存。溢出功能与内存管理系统的集成正在开发中。
Spill Parameters
Spill File Size
在溢出数据量相同的情况下,溢出文件大小决定了磁盘上溢出文件的数量。一方面,我们应该避免生成过多的小溢出文件,因为这可能会使存储系统的 元数据服务过载。另一方面,我们也希望有足够数量的溢出文件来并行执行恢复工作。例如,要从溢出分区构建哈希表,我们可以在多个哈希构建运算符之 间并行执行构建工作,方法是为每个运算符分配一个溢出文件分片。有两个配置属性需要控制。
max_spill_file_size <../configs> 设置溢出文件的最大大小限制。对于无序溢出,由于我们会不断向同一个溢出文件追加数据,这有助于防止溢出文
件变得过大。对于有序溢出,每个文件仅存储一个有序的数据运行,因此溢出文件大小是可溢出数据大小和此配置限制中的最小值。
min_spill_run_size <../configs> 设置排序溢出用于选择溢出分区的最小数据大小。每个排序溢出文件只能存储一个排序后的数据运行。如果可能,溢出
器会尝试从同一组分区溢出。
通过设置此配置限制,我们可以避免从数据量较小的分区溢出,从而避免生成过多的小溢出文件。
这两个配置属性都可以根据底层存储系统的 IO 特性进行调整。我们预计在实际应用中 不需要进行过多的调整。
Spill Target Size
溢出目标大小决定每次溢出的数据量。如果溢出目标大小太小,溢出会频繁中断算子 执行并生成大量小文件。如果溢出目标大小太大,算子执行速度会因溢出大量数据
到磁盘而变慢。配置属性 spillable_reservation_growth_pct <../configs> 将溢出目标大小设置为查询内存限制的一个因素。我们可能
需要在实践中稍微调整此参数,以了解其对性能的影响。
Spill Compression
当溢出文件大小可能超出磁盘空间时,为了减小溢出文件的大小,
我们可以启用溢出压缩。溢出器会在将序列化的字节流写入磁盘之前对其进行压缩。配置属性 spill_compression_codec <../configs> 设置要使用的压缩编解码器。
Data Storage
溢出只需要底层存储系统存储一些 命名的流字节。它可能需要也可能不需要命名空间支持。如果存储系统支持命名空间,我们可以将查询中溢出的文件存储在一个目录中,并在查询完成时,通过 Spark 驱动程序(适用于 Sapphire)或 Presto Coordinator (适用于 Prestissmo) 一次性删除所有文件。如果存储系统不支持命名空间层次结构,Pollux 会逐个删除文件。当系统崩溃时,存储系统上很可能会残留一些 溢出文件,因此我们需要某种垃圾收集支持。 对于支持生存时间 (TTL) 的存储系统,我们可以利用该功能来实现溢出文件垃圾收集。如果不支持,我们可能需要 构建一个带外运行的轻量级垃圾收集 (GC) 服务。
std::string makeOperatorSpillPath(
const std::string& spillPath,
const std::string& taskId,
int driverId,
int32_t operatorId);
Spilling Algorithm
Hash Aggregation
.. image:: images/spill-aggregation-spill.png :width: 300 :align: left
.. image:: images/spill-aggregation-restore.png :width: 300 :align: right
哈希聚合运算符将中间聚合状态存储在哈希表中,每个组对应一个表条 目。当触发溢出时,运算符的 Spiller 对象会扫描行容器中的所有行,以选择一组数据 最多的分区,这些分区共同满足溢出目标。溢出行的表条目将从哈希表中删除。溢出完成后,运算符将继续处理输入,直到触发下一次溢出,并重复上述过程。 如果相同的分区具有足够多的可溢出数据,则 Spiller 对象会优先再次溢出这些分区。如果可能,此技术会将溢出限制在聚合状态的子集内。
处理完所有输入后,哈希聚合运算符会合并内存和磁盘上的状态,从而生成结果。对于每个溢出分区,该运算符会将行容器中剩余的所有行排序为单个排 序运行。磁盘上的每个溢出文件也是一个排序运行。然后,该运算符创建一个包含所有排序运行的排序合并读取器,将具有相同分组键的中间状态合并 为一个最终聚合状态以供输出。一个组的中间状态可以在运算符执行期间多次溢出。请注意,排序基于分组键。
OrderBy
order by 运算符将所有输入行存储在一个行容器中,并在收到所有输入后对它们进行排序。当触发溢出时, 溢出器会收集足够数量的行进行溢出以满足溢出目标。与哈希聚合溢出不同,我们不对溢出的行进行分区,因为 order by 运算符需要对所有输入行生成总排序以进行输出。溢出完成后,该运算符将继续处理输入,直到触发下一个溢出运行,并重复上述过程。
处理完所有输入后,order by 运算符首先将行容器中剩余的所有行排序为单个排序运行,磁盘上的每个溢出文件也都是一个排序运行。然后, 该运算符创建一个包含所有排序运行的排序合并读取器,以生成最终的排序输出。请注意,此处的排序需要使用查询计划节点指定的比较选项。
Hash Join
哈希连接由哈希构建和哈希探测两种类型的运算符实现,它们各自属于一个独立的驱动管道,并且这两个管道通过共享的哈希连接桥数据结构连接。哈希构 建运算符接收构建侧(即连接的右侧)的输入来构建哈希表。构建完成后,其中一个哈希构建运算符会通过共享的哈希连接桥将构建好的表发送给所有哈 希探测运算符。哈希探测运算符接收探测侧(即连接的左侧)的输入,并一次批量地与哈希表进行连接。
哈希探测运算符每次最多在内存中保存一批探测输入行,因此哈希探测处理不会占用过多的内存。哈希构建运算符可能会使用大量内存来构建哈希表,并在整 个哈希连接处理过程中将其保存在内存中。与 order by 处理类似,每个哈希构建运算符将构建端的输入存储到行容器中,当所有哈希构建运算符都处理 完输入后,其中一个运算符会使用从所有哈希构建运算符收集的行构建单个聚合哈希表。
为了防止哈希连接耗尽内存,如果行容器在处理过程中变得过大,哈希构建运算符需要将部分构建端输入溢出到磁盘。哈希构建运算符会相互协调 溢出操作,以确保所有运算符溢出同一组分区。如果运算符独立溢出,则可能导致所有分区都被溢出。要构建哈希表,我们需要一个或多个分区中的所有行。 与哈希聚合和排序不同,哈希连接溢出由哈希构建运算符明确控制。
哈希探测运算符本身不可溢出,但我们需要 对其进行扩展,以支持在构建端发生的溢出。如果哈希构建运算符溢出了分区 N,则哈希探测运算符也必须溢出 所有属于分区 N 的输入行,并且仅将其余探测输入与构建的表进行连接。相应地,当哈希构建运算符稍后从分区 N 构建哈希表时,哈希探测运算符也需要 从磁盘上的溢出数据中读回相应的探测输入。需要注意的是,哈希连接使用连接键列作为分区列,并且与哈希聚合和排序不同,哈希连接不需要对溢出数据进行排序。
如果构建端过大,在恢复先前溢出的分区时,我们可能会再次耗尽内存。如果发生这种情况,我们将执行递归溢出,将溢出的分区(下文也称为父分区)进一步拆分为 多个子分区(下文也称为子分区),并以递归方式执行该过程。为了支持递归溢出,我们将用于计算溢出分区号的分区位向前移动(或右移)。假设父分区的分区位偏移 量为第 29 位,我们使用 3 位进行 8 路分区,则父分区的位范围为 [29, 31],其子分区在第一级递归溢出中为 [32, 35],孙分区在第二级递归溢出中 为 [36, 38],依此类推。
基于此,我们可以对能够支持的最大构建表大小 (T) 进行简单的计算,具体参数如下:查询内存限制为 M,分区位数为 N,溢出级别为 L(1 表示 初始溢出,2 表示第一级递归溢出,以此类推):
T = M * ((2 ^ N) ^ L)
下表列出了不同溢出级别支持的最大表大小,*M* = 1*GB*,*N* = 3:
| Spill Level | Total Partition Bits | Max Table Size |
|---|---|---|
| 1 | 3 | 8 GB |
| 2 | 6 | 64 GB |
| 3 | 9 | 512 GB |
| 4 | 12 | 4 TB |
| 5 | 15 | 32 TB |
| 6 | 18 | 256 TB |
| 7 | 21 | 2 PB |
对于生产部署,我们建议使用 max_spill_level <../configs> 配置属性设置最大溢出级别的限制。
以下简要介绍了扩展以支持(递归)溢出的哈希构建和探测工作流程:
HashBuild
- 处理来自构建输入源或先前溢出的 数据的输入。
- 尝试为新的构建输入预留内存,如果失败或聚合的哈希构建大小(存储在行容器中的行所使用的内存) 超过限制,则向溢出运算符组发送溢出请求。
- 检查并等待是否有待处理的组溢出请求。如果此运算符 是最后一个到达溢出屏障的运算符,则运行组溢出。
- 如果存在任何溢出分区,则直接溢出相应的输入行,而无需在行容器中缓冲。
- 将未溢出的输入行存储到行容器中,以便稍后构建哈希表。
- 所有算子处理完构建输入后,最后一个完成的算子将根据所有算子收集的行构建哈希表, 并通过哈希连接桥将构建好的哈希表连同可选的溢出元数据(如果已触发溢出)一起发送给哈希探测算子。
- 如果有需要恢复的溢出数据,则等待溢出输入构建下一个哈希表。 否则,哈希构建算子就此结束。 哈希探测算子在完成连接操作后,选择一个先前溢出的分区进行恢复。
- 从哈希连接桥接收到溢出输入后,哈希构建算子使用高级分区位重置溢出器,并创建一个无序读取器,用于从溢出输入中设置的溢出文件中读取构建输入。
- 返回步骤 1 重复下一个哈希表构建过程。
请注意,一旦我们稍后获得内存仲裁支持,内存仲裁器也可以代表任何操作符的内存分配失败或预留请求触发溢出。内存仲裁器会在溢出之前 先停止被驱逐的任务,因此如果在这种情况下触发溢出,我们可能不需要协调器支持。
HashProbe
- 等待下一个哈希表从哈希连接桥进行连接,如果涉及溢出,则还要等待其他溢出元数据:表溢出分区 ID(下文讨论),用于标识关联的溢出分区(如果该表 是基于先前溢出的分区构建的),以及子溢出分区 ID 集(设置为构建哈希表时已溢出的分区的 ID)。
- 如果设置了表溢出分区 ID,则创建一个无序读取器,用于从先前溢出的数据中读取探测输入。表溢出分区 ID 指定要从磁盘读取的相应溢出探测输入。
- 处理来自探测输入源或先前溢出的探测输入 的输入。
- 如果构建端已溢出相应的分区(如果子溢出分区 ID 集不为空,则由其标识),则将输入行溢出到磁盘。
- 将未溢出的探测输入行与哈希表连接并生成结果。
- 所有算子处理完输入后,如果没有溢出数据需要恢复,则所有哈希探测算子均完成。否则,最后一个完成的算子会向桥接器发出处理完成的信号。 桥接器随后会选择下一个要恢复的溢出分区,并唤醒哈希构建算子。
- 返回步骤 1,使用下一个已构建的哈希表重复连接过程。
如果溢出是由哈希构建触发的,则某些哈希探测优化将被禁用。例如,由于不知道完整的连接键集,动态过滤将被禁用。
带有过滤器的空值感知反连接类型不支持溢出,因为它需要将空键探测行与所有构建侧行进行交叉连接,以进行过滤器评估,以检查空键探测行是否可以 添加到输出。
HashJoinBridge
HashJoinBridge 对象包含以下扩展以支持溢出:
- 扩展现有的 setHashTable 接口,如果在建表时触发了溢出,则接收可选的溢出分区元数据。
- 为哈希探测运算符添加了 probeFinished 接口,用于设置溢出输入并通知哈希构建运算符,以便构建下一个哈希表。
- 为哈希构建运算符添加了 spillInputOrFuture 接口,用于等待溢出输入以构建下一个哈希表。
- 在内部,该对象将所有剩余待恢复的溢出分区维护在一个有序映射中,并从映射的开头恢复下一个溢出分区。为了确保子分区首先恢复,我们添加 了 SpillPartitionId 类型作为映射中的键,以标识溢出分区,该分区由分区位偏移量和分区号组成。位偏移量较高的分区将放置在位偏移量较低的 分区之前。如果分区位偏移量相同,则先执行分区号较低的分区。
- 为了并行化从溢出分区构建哈希表,哈希连接桥将把溢出分区文件拆分到哈希构建运算符之间,每个运算符都有一个大小相同的分片需要恢复。
Future Work
Memory Arbitration
引入内存仲裁逻辑,当任何运算符分配或预留内存失败时,选择从正在运行的查询中回收内存的运算符。 内存仲裁器可以从可溢出运算符和一些将数据存储在 RowContainer 中的非溢出运算符中回收内存。对于可溢出运算符,我们需要 添加 arena 压缩以释放未使用的内存块。对于非溢出运算符(例如部分聚合),内存仲裁器可以通过请求部分聚合运算符将其状态刷新到下游查询阶段 来回收内存。内存仲裁逻辑将允许查询使用有限的内存成功完成,并支持并发查询之间的动态内存共享,从而提高整体内存效率。
Runtime Statistics Collection
添加以下 RuntimeMetric 统计数据来测量溢出执行的内部机制,以帮助在生产环境中进行性能分析:
溢出数据大小:溢出字节数、溢出行数、溢出分区数、溢出文件数以及溢出文 件大小分布。我们可以调整溢出参数,以查看这些统计数据的影响以及由此产生的性能变化。
溢出执行时间:运算符在溢出上花费的时间,可分为以下几个部分:
- 溢出数据扫描:行迭代和分区号计算的时间。如果分区号计算占用了大量 CPU 时间,我们可以通过将计算出的分区号与行容器中的行一起缓存来优化此步骤。
- 溢出数据排序:溢出数据排序时间。
- 溢出数据转换:将行容器中的行转换为溢出向量的时间。
- 溢出数据序列化:将转换后的向量序列化为字节流,用于溢出写入的时间。
- 溢出数据反序列化:将字节流反序列化回行向量,用于溢出读取的时间。
- 溢出文件写入:溢出文件写入时间。可以通过调整溢出执行器池大小以及考虑细粒度的并行写入来调整。
- 溢出文件读取:溢出文件读取时间。可以通过预读进行优化。
Spilling Extension
为窗口操作符添加溢出支持。