分布式作业调整方法、主节点、系统、物理机及存储介质转让专利
申请号 : CN202110950182.9
文献号 : CN113407354B
文献日 : 2022-01-21
发明人 : 韩颖 , 闵雪宾 , 张炜 , 汤志鹏 , 郑君正 , 陆一峰 , 陈颖达
申请人 : 阿里云计算有限公司
摘要 :
权利要求 :
1.一种分布式作业调整方法,其中,包括:获取用户提交的作业;
生成所述作业的执行计划,所述执行计划包括多个执行阶段,所述多个执行阶段包括上游执行阶段以及所述上游执行阶段的直接下游执行阶段;
在作业的执行过程中,获取所述上游执行阶段的输出数据的统计信息;
根据所述统计信息,对所述直接下游执行阶段进行配置,以使得所述直接下游执行阶段基于配置结果执行作业;
所述生成所述作业的执行计划包括:根据作业的源数据预估的数据大小,生成具有多个候选信息的物理计划,一个候选信息表示所述源数据在作业执行过程中使用的一个任务,所述多个候选信息包括第一候选信息以及第二候选信息;将物理计划转换为算子树,算子树包括物理计划的一个或多个原始任务,一个原始任务使用一个或多个算子;在算子树中根据第一候选信息转化第一执行路径,以及根据第二候选信息转化第二执行路径;在算子树的第一执行路径和第二执行路径的上游设置控制节点,以得到执行计划。
2.根据权利要求1所述的方法,其中,所述统计信息包括如下至少一项:输出数据的数据量、输出数据的数据分区的数据量分布信息、输出数据的每个数据分区的序列化数据记录Record数量。
3.根据权利要求2所述的方法,其中,所述根据所述统计信息,对所述直接下游执行阶段进行配置包括如下至少一项:
根据所述统计信息,对所述直接下游执行阶段配置并发度;
根据所述统计信息,为所述直接下游执行阶段分配数据分区;
根据所述统计信息,选择后续执行的直接下游执行阶段,其中,所述上游执行阶段在下游存在候选的多条执行路径,一条执行路径中的执行阶段至少包括所述上游执行阶段的直接下游执行阶段。
4.根据权利要求3所述的方法,其中,所述统计信息包括输出数据的数据分区的数据量分布信息;所述方法还包括:至少对所述多个执行阶段中首先执行的执行阶段初始化配置并发度;
所述根据所述统计信息,对所述直接下游执行阶段配置并发度包括:根据所述直接下游执行阶段的工作节点对应的理想数据量,以及所述数据量分布信息,确定为所述直接下游执行阶段的工作节点分配的处理数据量;
根据所述直接下游执行阶段中分配处理数据量的工作节点的数量,配置所述直接下游执行阶段的并发度。
5.根据权利要求4所述的方法,其中,所述根据所述直接下游执行阶段的工作节点对应的理想数据量,以及所述数据量分布信息,确定为所述直接下游执行阶段的工作节点分配的处理数据量包括:
将连续且总数据量不超出所述理想数据量的多个数据分区,分配给所述直接下游执行阶段的一个工作节点,且所述直接下游执行阶段的各工作节点分配的处理数据量趋近于均匀分布,其中,所述直接下游执行阶段的一个工作节点分配的数据分区的数量不大于设定的数量上限。
6.根据权利要求4所述的方法,其中,所述统计信息还包括每个数据分区的Record数量;所述根据所述直接下游执行阶段中分配处理数据量的工作节点的数量,配置所述直接下游执行阶段的并发度包括:
根据所述直接下游执行阶段中分配处理数据量的工作节点的数量,工作节点分配的数据分区的Record数量、工作节点的算子数目以及算子复杂度,确定为所述直接下游执行阶段配置的并发度。
7.根据权利要求3所述的方法,其中,所述统计信息包括输出数据的数据分区的数据量分布信息,所述数据量分布信息指示输出数据的每个数据分区的数据量;所述根据所述统计信息,为所述直接下游执行阶段分配数据分区包括:根据理想数据量,将所述输出数据中数据量大于理想数据量的数据分区进行拆分;
将拆分后的数据分区分配给所述直接下游执行阶段,且一个拆分后的数据分区配置为由所述直接下游执行阶段的一个工作节点执行。
8.根据权利要求7所述的方法,其中,所述根据理想数据量,将所述输出数据中数据量大于理想数据量的数据分区进行拆分包括:根据理想数据量,将所述输出数据中数据量大于理想数据量的数据分区进行拆分,使得拆分后的数据分区的数据量不大于理想数据量且趋近于均匀分布;
所述方法还包括:
将所述输出数据中总数据量不大于理想数据量的至少两个数据分区进行合并,所述至少两个数据分区中的各个数据分区的数据量均小于理想数据量;
将合并的数据分区分配给所述直接下游执行阶段,且一个合并的数据分区配置为由所述直接下游执行阶段的一个工作节点执行。
9.根据权利要求7‑8任一项所述的方法,其中,所述上游执行阶段包括映射执行阶段,所述直接下游执行阶段包括归约执行阶段;所述输出数据包括映射执行阶段输出的shuffle数据。
10.根据权利要求3所述的方法,其中,所述直接下游执行阶段为连接执行阶段,用于对上游执行阶段提供的多路输入数据进行连接操作,所述多路输入数据为所述上游执行阶段多路的输出数据;其中,一路输入数据包括多个数据分区。
11.根据权利要求10所述的方法,所述统计信息包括输出数据的数据分区的数据量分布信息,所述数据量分布信息指示输出数据的每个数据分区的数据量;
所述根据所述统计信息,为所述直接下游执行阶段分配数据分区包括:基于所述多路输入数据中每个数据分区的数据量,若确定任一路输入数据中存在数据倾斜的目标数据分区,将所述目标数据分区拆分为多个子数据分区,将所述多个子数据分区分配给连接执行阶段的多个工作节点;
将其他路输入数据中与所述子数据分区属于相同分区编号的数据分区,广播到所述子数据分区分配的工作节点。
12.根据权利要求11所述的方法,其中,所述多路输入数据包括第一路输入数据和第二路输入数据;所述任一路输入数据中存在数据倾斜的目标数据分区包括:第一路输入数据中存在一个数据倾斜的目标数据分区;
或者,第一路输入数据中存在多个数据倾斜的目标数据分区;
或者,第一路输入数据和第二路输入数据中均存在一个数据倾斜的目标数据分区;
或者,第一路输入数据和第二路输入数据中均存在多个数据倾斜的目标数据分区。
13.根据权利要求12所述的方法,其中,所述若确定任一路输入数据中存在数据倾斜的目标数据分区,将所述目标数据分区拆分为多个子数据分区,将所述多个子数据分区分配给连接执行阶段的多个工作节点包括:若确定第一路输入数据中存在一个目标数据分区或多个目标数据分区,则针对任一个目标数据分区,将该目标数据分区拆分为多个子数据分区,并将该目标数据分区的各子数据分区分配给连接执行阶段的工作节点;
所述将其他路输入数据中与所述子数据分区属于相同分区编号的数据分区,广播到所述子数据分区分配的工作节点包括:针对任一个目标数据分区,将其他路输入数据中与该目标数据分区属于相同分区编号的数据分区,广播到该目标数据分区的各子数据分区分配的工作节点。
14.根据权利要求12所述的方法,其中,所述若确定任一路输入数据中存在数据倾斜的目标数据分区,将所述目标数据分区拆分为多个子数据分区,将所述多个子数据分区分配给连接执行阶段的多个工作节点包括:若确定第一路输入数据和第二路输入数据中均存在一个目标数据分区,则将第一路输入数据中的目标数据分区拆分为多个子数据分区,并将拆分的各子数据分区分配给连接执行阶段的工作节点;以及,将第二路输入数据中的一个目标数据分区拆分为多个子数据分区,并将拆分的各子数据分区分配给连接执行阶段的工作节点;
所述将其他路输入数据中与所述子数据分区属于相同分区编号的数据分区,广播到所述子数据分区分配的工作节点包括:将第一路输入数据的子数据分区,广播到第二路输入数据的相同分区编号的子数据分区所分配的工作节点;以及,将第二路输入数据的子数据分区,广播到第一路输入数据的相同分区编号的子数据分区所分配的工作节点。
15.根据权利要求11‑14任一项所述的方法,其中,所述方法还包括:配置将属于相同数据分区的连接操作结果进行联合,所述相同数据分区的连接操作结果包括:一路输入数据的子数据分区与其他路输入数据中相同分区编号的数据分区的连接操作结果,或者,一路输入数据的子数据分区与其他路输入数据中相同分区编号的子数据分区的连接操作结果。
16.根据权利要求3所述的方法,其中,所述上游执行阶段在下游存在候选的多条执行路径,一条执行路径包括所述上游执行阶段的一个或多个下游执行阶段,所述一个或多个下游执行阶段中至少包括所述上游执行阶段的直接下游执行阶段;所述控制节点用于从所述多条执行路径中选择所述上游执行阶段在下游实际执行的目标执行路径;
所述根据所述统计信息,选择后续执行的直接下游执行阶段包括:通过所述控制节点,基于所述统计信息,从所述多条执行路径中选择目标执行路径。
17.根据权利要求16所述的方法,其中,所述在算子树中根据第一候选信息转化第一执行路径包括:
当第一次遍历到预设算子时,根据与第一执行路径对应的第一候选信息,在算子树中新增第一执行路径的任务,以及将新增任务中的算子与原始任务中的相关算子进行数据管道连接,以在算子树中转化出第一执行路径;
所述在算子树中根据第二候选信息转化第二执行路径包括:当第二次遍历到预设算子时,根据与第二执行路径对应的第二候选信息,在算子树中新增第二执行路径的任务,以及将新增任务中的算子与已有任务中的相关算子进行数据管道连接,以在算子树中转化出第二执行路径。
18.根据权利要求16所述的方法,其中,所述根据作业的源数据预估的数据大小,生成具有多个候选信息的物理计划包括:预估小表在内存中的数据大小,若预估的数据大小小于预设的第一门限值,触发数据库的有条件的映射连接,并生成具有多个候选信息的物理计划,其中,一个候选信息具体表示小表使用的一个连接算子。
19.根据权利要求16‑18任一项所述的方法,其中,所述统计信息包括所述上游执行阶段的输出数据的数据量;
所述通过所述控制节点,基于所述统计信息,从所述多条执行路径中选择目标执行路径包括:
通过所述控制节点,判断所述上游执行阶段的输出数据的数据量是否小于预设的第二门限值,若是,则选择第一执行路径作为目标执行路径,若否,则选择第二执行路径作为目标执行路径。
20.根据权利要求19所述的方法,其中,所述上游执行阶段的输出数据的数据量包括小表实际执行的数据大小;其中,在小表实际执行的数据大小小于所述第二门限值时,选择第一执行路径作为目标执行路径,在小表实际执行的数据大小大于所述第二门限值时,选择第二执行路径作为目标执行路径;第一执行路径为执行广播连接的执行路径,第二执行路径为执行排序合并连接的执行路径。
21.根据权利要求1所述的方法,其中,所述作业包括深度学习作业,所述执行计划由DAG描述;在深度学习作业中,所述执行计划的多个执行阶段包括:参数服务器执行阶段、工作器执行阶段和资源优化执行阶段;所述参数服务器执行阶段通过并行边输入所述工作器执行阶段,所述资源优化执行阶段通过并行边输入所述工作器执行阶段,所述并行边连接的上、下游执行阶段的工作节点能够同时处于运行状态。
22.根据权利要求21所述的方法,其中,下游的工作器节点在上游的参数服务器节点的实例启动后,且上游的资源优化节点的执行进度达到一定阈值时进行调度;工作器节点为工作器执行阶段对应的工作节点,参数服务器节点为参数服务器执行阶段对应的工作节点,资源优化节点为资源优化执行阶段对应的工作节点。
23.根据权利要求22所述的方法,其中,所述方法还包括:在深度学习作业的执行过程中,调度资源优化节点,通过资源优化节点确定当前适应于工作器执行阶段的资源信息;
通过资源优化节点,为工作器执行阶段配置所述资源信息。
24.根据权利要求23所述的方法,其中,所述通过资源优化节点确定当前适应于工作器执行阶段的资源信息包括:
基于深度学习作业的当前执行状态,从历史数据库中查找与所述当前执行状态相同的历史执行状态,将查找的历史执行状态实际使用的资源信息,确定为当前适应于工作器执行阶段的资源信息;其中,所述历史数据库记录历史深度学习作业在各个历史执行状态实际使用的资源信息。
25.一种主节点,其中,所述主节点被配置为执行如权利要求1‑24任一项所述的分布式作业调整方法。
26.一种分布式系统,其中,所述分布式系统包括主节点和多个工作节点,所述主节点如权利要求25所述的主节点。
27.一种物理机,其中,所述物理机包括至少一个存储器和至少一个处理器,所述存储器存储一条或多条计算机可执行指令,所述处理器调用所述一条或多条计算机可执行指令,以执行如权利要求1‑24任一项所述的分布式作业调整方法。
28.一种存储介质,其中,所述存储介质存储一条或多条计算机可执行指令,所述一条或多条计算机可执行指令被执行时实现如权利要求1‑24任一项所述的分布式作业调整方法。
说明书 :
分布式作业调整方法、主节点、系统、物理机及存储介质
技术领域
背景技术
计算能力,提升作业执行效率。
过程中,可以调度工作节点和资源来实现作业的具体执行。基于分布式系统的广泛应用,本
领域技术人员一直致力于提升分布式系统的性能。
发明内容
整,从而提升分布式系统的性能。
学习参数的梯度;
源信息;
或多条计算机可执行指令,以执行如上述第一方面或第二方面所述的分布式作业调整方
法。
面所述的分布式作业调整方法。
行阶段以及所述上游执行阶段的直接下游执行阶段。在作业执行过程中,主节点可获取上
游执行阶段的输出数据的统计信息,从而根据所述统计信息,对直接下游执行阶段进行配
置,以使得所述直接下游执行阶段基于配置结果执行作业。由于本申请实施例能够在作业
执行过程中,基于上游执行阶段的数据输出结果,动态调整下游执行阶段的配置,以使得下
游执行阶段的配置能够适应于上游执行阶段的实际执行结果,从而使得下游执行阶段的并
发度、资源等配置能够贴合作业的具体执行情况,提升了执行计划配置的合理性和准确性。
可见,本申请实施例提供的分布式作业调整方法,能够在作业执行过程中对执行计划的配
置进行动态调整,实现作业的动态调整效果,并使得执行计划的配置能够贴合作业的具体
执行情况;进而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为
合理高效的完成作业执行,显著提升分布式系统执行作业的性能。
附图说明
申请的实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据
提供的附图获得其他的附图。
具体实施方式
本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他
实施例,都属于本申请保护的范围。
120。主节点110和工作节点120可以视为是分布式系统中的计算节点,计算节点可承载于具
有数据计算能力的物理机,一个物理机可以承载一个或多个计算节点。
面,主节点110作为分布式系统中的中心管控节点,也称为分布式系统的执行引擎。工作节
点120为分布式系统中具体执行作业的计算节点,可受主节点110的管理和协调来执行作
业。
作业的数据从最开始的源表,在经历一系列的数据流动、执行、以及变化后,最终产生输出
的过程。图1B示例性的示出了执行计划的示意图。如图1B所示,执行计划可以包括:多个具
有层级关系的stage(执行阶段)。在一些实施例中,stage之间可以是树状的层级结构。一个
stage可以包括一个或多个task(任务)。针对每个stage,主节点110可以通过配置工作节点
数量(并发度)、所使用的资源等来实现调度多个工作节点并行执行stage的task,从而实现
作业在分布式系统中的执行。
Language,结构化查询语言)语句。
示出了DAG的示意图。需要说明的是,DAG实际的顶点数量、层级、连接边可能相比于图1C更
为复杂,图1C仅是出于便于理解目的而示出的简易DAG示例。如图1C所示,DAG可以包括4个
顶点V1至V4,以及连接边11、12、13和14。其中,连接边11连接顶点V1和V2、连接边12连接顶
点V1和V3、连接边13连接顶点V2和V4、连接边14连接顶点V3和V4。
点的输入连接边(输入连接边指向顶点),也可能是顶点的输出连接边(输出连接边由顶点
指向其他顶点)。例如图1C中,连接边12指向V3,为V3的输入连接边;连接边14由V3输出,为
V3的输出连接边;而连接边12是由V1输出,因此连接边12又作为V1的输出连接边;连接边14
输入V4,因此连接边14又作为V4的输入连接边。
出连接边12且连接边12输入V3,则V1可称为V3的直接上游顶点,V3可称为V1的直接下游顶
点。一个顶点可能具有一个或多个直接上游顶点、一个或多个直接下游顶点。需要说明的
是,一个顶点除具有直接上游顶点之外,可能还具有间接上游顶点,间接上游顶点与该顶点
并不直接连接,而是处于该顶点的上层并且与该顶点之间通过一个或多个顶点相连接。例
如图1C中,V1处于V4的上层,V1通过V2或V3与V4连接,因此V1可称为V4的间接上游顶点。显
然,一个顶点除具有直接下游顶点之外,可能还具有间接下游顶点,间接下游顶点与该顶点
并不直接连接,而是处于该顶点的下层并且与该顶点之间通过一个或多个顶点相连接。例
如图1C中,V4处于V1的下层,且通过V2或V3与V1连接,因此V4可称为V1的间接下游顶点。顶
点的上游顶点可以包括直接上游顶点和间接下游顶点,顶点的下游顶点可以包括直接下游
顶点和间接下游顶点。
而是可与直接上游顶点并行执行。
程。物理图则体现了执行计划的各stage映射到分布式系统的物理属性,描述的是执行计划
的各stage在执行层面的并发度、工作节点使用的资源、数据传输方式等物理属性。
V2和V3)以及各个顶点的关系(例如,顶点V0指向顶点V2,顶点V1和顶点V2指向顶点V3),一
个顶点对应执行计划的一个stage。逻辑图可以体现执行计划的数据执行流程。在将逻辑图
映射为物理图之后,物理图可以描述各个stage需要配置的并发度、各stage的工作节点使
用的资源(例如CPU资源、内存资源等)、数据传输方式等物理属性。例如,结合图1D示例,物
理图描述了顶点V0需要配置3个工作节点(并发度为3),顶点V1、V2和V3分别需要配置2个工
作节点(并发度为2)。也就是说,物理图能够表达DAG中顶点和连接边的物理属性。通过物理
图描述的顶点和连接边的物理属性,主节点可以为各个stage调度工作节点和资源,以使得
stage中的task可以被多个工作节点并行执行,实现作业在分布式系统中的执行。
如,配置执行计划的执行流程等。配置执行计划的物理属性可以认为是在物理图层面对执
行计划进行配置,例如,配置执行计划各stage的并发度、资源、数据传输方式等物理属性。
之前配置完成,并且无法在作业具体执行过程中调整。然而,在作业具体执行之前并无法准
确预估执行计划各stage需要实际使用的资源,以及执行计划合理的执行路径,这无疑导致
静态执行计划的配置难以合理和准确,降低了分布式系统执行作业的性能。
置能够适应于上游stage的实际执行结果,从而使得下游stage的并发度、资源等配置能够
贴合作业的具体执行情况,提升执行计划配置的合理性和准确性。
中可以包括上游stage,以及该上游stage的直接下游stage。在一些实施例中,该多个stage
中任一存在下游stage的stage均可以视为是所述上游stage,在作业的具体执行过程中,上
游stage的输出数据可输入直接下游stage进行处理。一个上游stage的输出数据可以输入
一个或多个直接下游stage,一个直接下游stage也可能输入一个或多个上游stage的输出
数据;也就是说,上游stage可以对应一个或多个直接下游stage,一个直接下游stage也可
以对应一个或多个上游stage。在一些实施例中,上游stage可称为直接下游stage的上一
stage,直接下游stage可称为上游stage的下一stage。
上游stage,V4作为V2和V3的直接下游stage,V2和V3的输出数据可输入V4进行处理。本申请
实施例关注的一个方面是直接下游stage需要配置多少并发度、资源等,来处理上游stage
的输出数据;比如V2和V3分别需要配置多少并发度、资源等,来处理V1的输出数据,V4需要
配置多少并发度、资源等,来处理V2和V3的输出数据。
节点可收集到该上游stage的输出数据的统计信息。在一些实施例中,上游stage的输出数
据的Statistics可以包括如下任一项:输出数据的数据量(例如输出数据分别在数据压缩
前和压缩后的数据量)、输出数据的Partition(数据分区)的数据量分布信息、输出数据中
每个Partition的Record(序列化数据记录)数量等。
并发度、资源等配置能够贴合作业的具体执行情况;进而,直接下游stage可基于配置结果
执行作业,实现直接下游stage相应的task能够被合理、高效的执行完成。
能够贴合作业的具体执行情况。也就是说,在作业执行过程中,本申请实施例可基于上游
stage的输出数据与统计信息来对分布式作业进行动态调整,实现作业的动态调整效果。进
而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为合理高效的完
成作业执行,显著提升分布式系统执行作业的性能。
出的Partition拆分后分配给直接下游stage,以在直接下游stage的工作节点进行Join操
作;
果,从多条执行路径中选择实际执行的执行路径,使得执行计划的执行逻辑更为准确、合
理;在此情况下,一条执行路径中的stage可以至少包括上游stage的直接下游stage;通过
对上游stage后续候选的多条执行路径进行选择,能够实现执行逻辑的动态调整,并实现对
后续执行的直接下游stage的选择。
所述上游stage的直接下游stage。在作业执行过程中,主节点可获取上游stage的输出数据
的统计信息,从而根据所述统计信息,对直接下游stage进行配置。由于本申请实施例能够
在作业执行过程中,基于上游stage的数据输出结果,动态调整下游stage的配置,以使得下
游stage的配置能够适应于上游stage的实际执行结果,从而使得下游stage的并发度、资源
等配置能够贴合作业的具体执行情况,提升了执行计划配置的合理性和准确性。可见,本申
请实施例提供的分布式作业调整方法,能够在作业执行过程中对执行计划的配置进行动态
调整,实现作业配置的动态调整效果,并使得执行计划的配置能够贴合作业的具体执行情
况;进而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为合理高
效的完成作业执行,实现显著提升分布式系统执行作业的性能。
各stage的并发度(可以视为是配置DAG中各顶点的并发度),或者,基于用户指定的不同种
类的stage的并发度,对执行计划各stage的并发度进行配置。然而,分布式作业处理的源数
据复杂多样,主节点往往难以依赖预估规则,来配置适应不同作业的并发度,这导致执行计
划各stage的并发度配置并不准确。例如,对于处理数据量较小的stage,如果静态配置了较
大的并发度,将导致分布式系统的计算资源浪费;而对于处理数据量较大的stage,如果静
态配置了较小的并发度,将导致stage的执行时间延长,甚至带来内存使用超限等各种错
误,导致作业执行失败。因此在静态配置并发度的实现中,为避免stage配置的并发度较低
而难以处理海量数据的可能,stage往往需要配置较高并发度,这导致作业在实际执行过程
中,存在较多的计算资源浪费。例如,对于一个Map(映射)‑Reduce(归约)的作业而言,即使
上游Map阶段在实际运行过程中只产生1KB的输出数据,但下游Reduce阶段还是会由于静态
配置了较高的并发度,而调度较高数量的工作节点来处理这1KB的数据,这无疑导致了不必
要的计算资源浪费。
主节点可将上游stage输出数据中的Partition(数据分区),按照Partition数量均匀的原
则,分配给直接下游stage的工作节点。例如,直接下游stage的每个工作节点会处理上游
stage输出的连续多个Partition,并且每个工作节点处理的Partition数量相同。这种方式
可称为基于Partition数量的Even Reduction(均匀还原)策略,其在每个Partition的数据
量均匀的情况下,能够达到较为理想的效果;但是在实际环境下,数据分布特性多种多样并
经常具有不均匀特性,对于非均匀数据(每个Partition的数据量并不均匀),上述方式可能
导致直接下游stage的单个工作节点出现数据倾斜问题,即单个工作节点的处理数据量远
大于其他工作节点,从而进一步导致工作节点的长尾问题,致使作业执行时间被不必要的
拉长。例如,图3A示例性的示出了直接下游stage的处理数据分配示意图。如图3A所示,上游
stage输出的每个Partition的数据量并不均匀,上游stage输出的Partition内的数值可以
代表对应Partition的数据量,如果只是简单的合并多个指定数目的Partition给直接下游
stage的单个工作节点,虽然每个工作节点处理的Partition数量一致(例如图3A中直接下
游stage的每个工作节点都处理2个Partition),但这可能带来直接下游stage的工作节点
在数据处理量上的分配不均匀:一方面,部分工作节点处理的数据不合理的偏小(甚至可能
存在工作节点完全没有需要处理的数据),另一方面,部分工作节点会因为合并了个数据量
较大的Partition,而进一步加剧数据倾斜问题,导致工作节点的运行时间被拉长而形成长
尾。
据文件被严重压缩的情况。在这种情况下,可能少量的Partition数量就对应着大量的数据
记录(即数量较少的Partition中记录较大量的数据记录),因此简单的基于上游stage的
Partition数量来做直接下游stage的并发度配置,在实际作业中可能带来较大的不确定
性。
中每个工作节点的数据处理量趋近于均衡。作为可选实现,图3B示出了本申请实施例提供
的分布式作业调整方法的另一流程图。该方法流程可由主节点执行实现,参照图3B,该方法
流程可以包括如下步骤。
基于用户指定的不同种类的stage的并发度,对执行计划的并发度进行初始化配置。
对执行计划的并发度进行初始化配置时,应至少对首先执行的stage初始化配置并发度,以
使得首先执行的stage能通过初始化配置的并发度执行作业;而对于执行计划中非首先执
行的stage,由于非首先执行的stage一般都具有上游stage,因此非首先执行的stage可以
作为某一个或多个上游stage的直接下游stage,从而基于上游stage的输出数据的统计信
息,来配置并发度。
并发度。当然,本申请实施例也可支持对非首先执行的stage初始化配置并发度,本申请实
施例对此并不设限。在一个示例中,结合图1C所示,V1作为首先执行的stage,需要初始化配
置并发度,而V2、V3和V4均存在上游stage,本申请实施例并不限制一定要对V2、V3和V4初始
化配置并发度。
上游stage的输出数据。上游stage的输出数据可以被切分为多个数据分区(Partition)。输
出数据的数据分区的数据量可作为输出数据的数据量分布信息,即输出数据的数据量分布
信息可以表示输出数据的每个数据分区所分布的数据量,该数据量分布信息可以携带在输
出数据的统计信息中。
基于初始化配置的并发度,调度工作节点处理数据;如果上游stage为非首先执行的stage,
则上游stage可基于本申请实施例提供的方案动态调整并发度之后,再调度工作节点处理
数据。
stage的工作节点分配处理数据量完成后,自动实现配置直接下游stage的并发度。
致工作节点执行失败的可能性增大,因此在为直接下游stage的工作节点分配处理数据时,
应使得处理数据的数据量贴近但不超出理想数据量。
息,确定直接下游stage的各工作节点分配的处理数据量,并使得为工作节点分配的处理数
据量不超出所述理想数据量。
点能够分配到连续且总数据量不超出理想数据量的数据分区。在进一步的一些实施例中,
主节点在将连续且总数据量不超出理想数据量的多个数据分区,分配给直接下游stage的
一个工作节点的基础上,可保障各个直接下游stage分配的处理数据量尽可能的均衡。
数值为Partition的数据量;可以看出,主节点在为直接下游stage分配处理数据量时,是将
连续且总数据量不超出理想数据量的数据分区分配给直接下游stage的一个工作节点,并
保障每个工作节点的处理数据量趋近于均匀分布。例如,直接下游stage的4工作节点分配
的处理数据量分别为15、16、19和10。可以看出,在为直接下游stage的工作节点分配的处理
数据量之后,主节点即可完成配置直接下游stage的并发度,例如,图3C中每个工作节点完
成处理数据量的分配之后,主节点可确定需要4个工作节点(并发度为4)来处理上游stage
的输出数据。通过本申请实施例提供的方式来配置并发,可避免直接下游stage的某一工作
节点的处理数据量虽然未超出理想数据量,但相比于其他工作节点过高或过低的情况,从
而使得各工作节点的处理数据量能够趋近于均衡、合理。
行过程中,主节点可确定执行计划的上游stage的输出数据的数据量分布信息,该数据量分
布信息能够表达输出数据对应的多个数据分区的数据量;从而基于直接下游stage的工作
节点对应的理想数据量,以及该数据量分布信息,确定为直接下游stage的工作节点分配的
处理数据量;进而,主节点可根据直接下游stage中分配处理数据量的工作节点的数量,配
置直接下游stage的并发度,并使得直接下游stage中的各工作节点能以不超出理想数据量
的数据量来执行作业,降低了直接下游stage中单个工作节点的处理数据量过高而导致执
行失败的概率。本申请实施例能够在作业执行过程中,基于上游stage的输出数据的数据量
数据量分布信息,以及直接下游stage的工作节点对应的理想数据量,为直接下游stage的
工作节点分配处理数据量,并实现自动配置直接下游stage的并发度,达到基于上游stage
的输出数据结果,对直接下游stage的并发度进行动态自适应调整的效果,提升了直接下游
stage的并发度配置的精确性和合理性,极大的减少了不合理的并发度配置而带来的计算
资源浪费情况,能够显著提升分布式系统的性能。
据记录按照特定Hash函数计算散列值,并发送到对应工作节点的过程)为例,则主节点可获
取上游stage的工作节点在数据shuffle之后,输出的Partition的数据量分布信息(每个
Partition的数据量)。主节点可以按照Partition聚合来为直接下游stage的工作节点分配
处理数据量,例如,通过将连续且总数据量不超过理想数据量的多个Partition分配给直接
下游stage的一个工作节点。在上述分配过程中,如果分配给一个工作节点的多个
Partition的数据总量达到理想数据量,则自动将后续连续且总数据量不超过理想数据量
的Partition分配给下一个工作节点,以此类推,直至上游stage输出的所有Partition分配
完毕为止。直接下游stage实际的并发度,可以在上游stage所有的Partition均分配完毕后
自动确定,即基于直接下游stage中分配处理数据量的工作节点数量,确定直接下游stage
的并发度。本申请实施例可将上游stage输出的Partition尽量均匀的分配给直接下游
stage的每个工作节点,实现更为精准、合理的为直接下游stage配置并发度,并且避免了单
个工作节点的长尾问题。
当前配置并发度的直接下游stage可成为上游stage,并基于本申请实施例提供的方案,继
续为后续的下游stage调整并发度,直至执行计划的各stage均在作业执行过程中完成并发
度的调整。
说:
工作节点进行处理,而直接下游stage的工作节点如果顺序操作大量的Partition,这可能
导致读写性能的回退。基于此,本申请实施例可以设置单个工作节点允许分配的Partition
数量上限,从而在规避工作节点处理的数据量为零的前提下,对于加入工作节点的
Partition数量上限进行控制。
据,则可能将数据量较小但存在大量数据记录的Partition分配到单个工作节点,从而导致
工作节点的运行时间被拉长,造成作业执行的瓶颈;此外,还需要考虑工作节点本身涉及的
单位数据记录来计算复杂度,这可能与工作节点的算子数目,以及算子特点等因素相关,这
些信息也需要作为计算并发度的考虑因素。基于此,在使用Partition的数据大小作为基
准,来分配工作节点的处理数据量以外,本申请实施例可进一步结合Partition的Record数
量,工作节点的算子数目,以及算子复杂度等特征,对分配给工作节点的处理数据量进行二
次调整,并选取在这几个维度上得到的并发度较大的结果,从而完成直接下游stage的最终
并发度的配置。基于此,在步骤S315的可选实现中,本申请实施例可进一步根据直接下游
stage中分配处理数据量的工作节点的数量,分配给工作节点的Partition的Record数量、
工作节点的算子数目、以及算子复杂度,完成直接下游stage的最终并发度配置。
例应用于实际生产环境中,能够对生产作业整体的执行并发度带来数量级别的降低,显著
提升分布式系统中工作节点的运算效率,极大程度上避免计算资源的浪费,以及工作节点
频繁调度拉起的消耗。相比于简单直接的Even‑Reduction策略,本申请实施例提出的Fair‑
Parallelism策略,能够有效的避免Even‑Reduction策略下可能引入的严重数据倾斜,使得
分布式作业中所有工作节点处理的数据量尽可能分布均匀,避免出现突出的长尾或者短
板,可以避免长尾成为运行时间的瓶颈,也可以防止工作节点频繁处理较小的数据而导致
反复启动工作节点造成的资源浪费。进一步的,通过在合并数据量较小的Partition时,限
制连续合并的Partition数量上限,防止Partition过度合并引起下游stage在读数据性能
的下降。进一步的,依据统计作业运行期间,每个工作节点生成的详细数据信息(包括数据
量以及Record数量),本申请实施例可结合Record数量,算子数目与算子复杂度等信息,来
做出更加均衡完善的并发度调整。可见,本申请实施例能够显著提升分布式系统的性能。
Partition的拆分方案。下面对具体实现方案进行详细介绍。
执行性能的重要性能指标之一。然而全连接数据shuffle (full‑shuffle),只有在数据均
匀分布的理想场景下,才能比较高效的执行。而在实际生产作业中,数据的分布往往并不均
匀,数据的倾斜特性可能会在full‑shuffle模式下被进一步放大,从而导致个别工作节点
运行时间的大幅拉长而造成严重长尾。
DAG中各个子图的重要描述。而众多更复杂DAG拓扑中,各个连接边上的数据流动,也可以通
过各种shuffle模型来描述;例如在分布式执行框架中,DAG的连接边的一个重要物理属性
就是边上数据的传输。边上数据的传输不仅可以使用full‑shuffle,还允许引入更加动态
和智能化的数据编排方式,并以此来解决许多实际场景中full‑shuffle面临的问题。
入分区的值能够事先指定时,直接使用静态写入指定分区是较为简单的方式;对于分区值
无法事先判断,尤其是一个查询产出的数据分布在多个分区的时候,则使用动态分区
(Dynamic Partition)写入,也就是数据写入Partition的值,将会在作业运行过程中根据
具体产出数据来获取。比如在下面的SQL(Structured Query Language,结构化查询语言)
语句中,数据写出指定为country这个分区列名,但是具体会写到哪些分区,其对应的分区
数值则是在运行中获取的:
能既避免作业产生严重长尾,同时又不会由于小文件的大量产生而对存储系统带来严重的
负担。
工作节点执行task。Map stage的每个工作节点读取数据后,可按照分区值(例如上述示例
中的country)来写出文件。也就是,一个工作节点如果处理了不同的country值对应的数
据,就会产生对应的不同路径的文件。一个Map stage的执行计划简单而直观,但是对于实
际大规模分布式系统而言会带来各种隐患,其中最突出的就是小文件碎片化的问题。在一
个Map stage的执行计划中,假设Map stage的并发度为M,用户数据中country的取值可能
为N个。在数据随机分布的情况下,因为Map stage的每个工作节点独立输出,最终可能造成
每个分区都会写出M个文件。所以这种执行方式最终将会可能产生M * N个数据文件,而这
其中有可能存在大量的小文件。这些大量碎片化小文件的存在,对于一个分布式系统会带
来较大的负面影响:
储效率而言,存在碎片化的小文件也会带来较差的存储压缩比,占用更多存储空间;
Compress(压缩)等;而如果Buffer过大,则Map stage使用内存会过大,Buffer过小则导致
Encoding和Compress的效果较差。
Key(键)进行一个全连接的Reshuffle,即,把相同Partition的数据聚合到一个工作节点上
再写出。通过这种方式保证每个Partition的分区数值只产生一个分区文件。但这种强制限
制,在降低了文件数的同时,也会带来另外一个负面效应:数据倾斜。
平台的用户数据进行不同国家的分区,那么本国分区上的数据毫无疑问会远大于其他分
区,这导致本国分区上的数据对应的工作节点产生严重长尾,导致整个作业的运行时间被
大幅度拉长。对于倾斜严重的数据,这种长尾可能带来数百甚至上千倍的作业延迟,对于整
个分布式系统的资源利用率也有非常坏的影响。
将数据随机打散到10个分区上来减少数据的倾斜程度。然而这样的解决方案仍然存在一定
的问题:在数据倾斜严重的情况下,即使将数据切分成10份,切分后的数据仍然可能还是存
在倾斜问题,甚至可能由一个数据长尾情况变成10个程度稍轻的数据长尾情况;对于没有
倾斜的数据或者数据量本身很小的分区,如果同样切分成10份,又带来了最终文件数的增
加(10 * N);分布式系统强行加入Random Key的shuffle方式,可能破坏数据的幂等性,在
分布式系统中,如果发生工作节点重跑,存在产生数据正确性的风险。
克服上述两种方案的缺点。
同线性的方框表示Map阶段产生的shuffle数据中相同Partition Key的数据;例如细虚线
的方框、细实线的方框、粗虚线的方框、粗实线的方框分别表示相同Partition Key的数据。
而方框内的数值则表示对应的数据量。在经过一次shuffle之后,同一Partition Key的数
据将交给Reduce阶段的同一个工作节点进行处理。通过图4A可以看出,Reduce阶段的工作
节点R#0处理的数据量为8,工作节点R#1处理的数据量为2,工作节点R#2处理的数据量为
43,工作节点R#3处理的数据量为16;工作节点R#2处理的数据存在严重的数据倾斜,这导致
工作节点R#2在执行Reduce阶段的任务时将存在非常严重的长尾。
的数据,都交给同一个reduce的工作节点来处理。基于这个特点,本申请实施例可以对Map
阶段产生的数据量较大的Partition进行自动拆分,然后交给Reduce阶段的多个工作节点
进行处理。
数据的Partition的数据量分布信息,该数据量分布信息可以指示输出数据的每个
Partition的数据量。在进一步的一些实施例中,所述Statistics还可以包括:输出数据的
数据量(例如输出数据分别在数据压缩前和压缩后的数据量)、输出数据中每个Partition
的Record数量等。
不同。主节点在获取上游stage的输出数据的Statistics之后,可确定输出数据中每个
Partition的数据量。从而基于直接下游stage的工作节点对于Partition的理想数据量,判
断输出数据中是否存在数据量大于理想数据量的Partition;对于输出数据中数据量大于
理想数据量的Partition,本申请实施例可对该Partition进行拆分,并使得拆分后的
Partition的数据量不大于理想数据量。
近于均匀分布。
点执行,从而实现为直接下游stage配置需要处理的数据。
Partition进行合并,该至少两个Partition中的各个Partition的数据量均小于理想数据
量,从而将合并的Partition分配给直接下游stage,且一个合并的Partition配置为由直接
下游stage的一个工作节点执行,以实现直接下游stage的计算资源的高效利用。
据量的Partition进行拆分,保障拆分后的Partition的数据量不大于理想数据量。进而将
拆分后的Partition分配给直接下游stage,且一个拆分后的Partition配置为由直接下游
stage的一个工作节点执行,实现了为直接下游stage的工作节点配置需要处理的数据。本
申请实施例可依据上游stage的输出数据的Statistics,对配置给直接下游stage的
Partition进行动态调整,保障直接下游stage中每个工作节点处理的一个Partition的数
据量不会超过理想数据量,使得直接下游stage的工作节点处理的数据能够趋近于均匀分
布,降低了直接下游stage的数据倾斜情况,避免了直接下游stage的个别工作节点由于需
要处理较大数据量的Partition,而导致运行时间的大幅拉长的长尾问题。可见,本申请实
施例能够使得直接下游stage的工作节点处理的数据趋近于均匀分布,降低直接下游stage
的数据倾斜情况和工作节点的长尾问题,显著提升了分布式系统的性能。
将输出的shuffle数据的Statistics上报给执行引擎(执行引擎可以是主节点);例如,Map
阶段可能存在一个或多个工作节点来执行task,每个工作节点执行task的过程中以及执行
结束之后,均可将输出的shuffle数据的Statistics上报给执行引擎。在一些实施例中,
shuffle数据的Statistics例如:shuffle数据在压缩前和压缩后的数据量、shuffle数据的
每个Partition的数据量,以及Partition包含的Record数量等。
量,则本申请实施例可将Partition按照理想数据量进行拆分,并尽量保证Reduce阶段每个
工作节点处理的数据量是均匀的;如果Partition的数据量小于理想数据量,则本申请实施
例可将数据量小于理想数据量的多个Partition进行合并,并保障合并后的Partition的数
据量不大于理想数据量。
如,4个shuffle数据中同一线性表达的数据聚合成一个Partition,其中,shuffle数据中的
细虚线对应的数据聚合成P#0,细实线对应的数据聚合成P#1,粗虚线对应的数据聚合成P#
2,粗实线对应的数据聚合成P#3。如图4C所示,4个Partition的数据量大小依次为8、2、 43
和16。假设定义Reduce阶段的一个Partition对应的理想数据量为10,则P#0、P#1的数据量
均小于10,且P#0、P#1合并后的数据量并不超出理想数据量10,因此P#0、P#1可合并为一个
Partition,并分配到Reduce阶段的一个工作节点R#0进行处理。而由于P#2的数据量43大于
理想数据量10,则P#2可按照理想数据量尽可能均匀的拆分为多个Partition,且拆分的每
个Partition的数据量不大于理想数据量,如图4C所示,P#2可按照理想数据量尽可能的拆
分为5个Partition,并分配给Reduce阶段的5个工作节点R#1至R#5进行处理,其中,R#1至R#
4每个分配数据量为9的Partition,R#5分配数据量为7的Partition。同理,P#3的数据量由
于大于理想数据量,因此被均分拆分为2个Partition,并分配到Reduce阶段的2个工作节点
R#6和R#7进行处理。
中,基于Adaptive shuffle机制,Reduce阶段最终写出的文件数将主要依赖于输入的
Partition数据量的大小,以及理想数据量的大小配置。假设输入Reduce阶段的理想
Partition数量(Ideal Parallelism)定义为:shuffle数据总量除以理想数据量;则经过本
申请实施例的Adaptive shuffle机制之后,Reduce阶段最终生成的文件数的最大值为
Ideal Parallelism加上N。因此,如果要保证每个有输出数据的Partition上至少存在一个
数据文件的话,只有数据量大于理想数据量的Partition经过拆分后会使文件数有所增加。
但这种拆分产生的文件大小均为Partition的量级,而不是碎片化的小文件。基于此,在理
想数据量的大小配置合理的情况下,无论产生多少文件,对于分布式系统都是合理的;即只
要文件大小合适,海量的数据可以通过多一点的文件来存储。
由一个工作节点输出一个文件。这就从根本上,同时解决了大量小文件产生以及可能的数
据倾斜问题,能够比较好的解决动态分区场景上面临的两难问题。此外还要说明的是,使用
Adaptive shuffle机制,能够避免需要添加额外的Random shuffle Key来减少数据倾斜的
情况。由于本申请实施例提供的Adaptive shuffle机制在整个过程是确定性,可重入的,因
此在不稳定的分布式系统环境上,能从根本上保证在各种重试发生时,输出数据的正确性。
基于上游stage的输出数据的数据特性,来对输出数据进行智能化的分配与编排,包括倾斜
的数据分区的自动拆分,以及多个数据量较大的小分区的合并,从根本上解决了在数据
shuffle过程中可能带来的数据倾斜以及工作节点的长尾问题,同时避免了其他无shuffle
方案下的数据碎片化问题,能够显著的提升分布式系统的性能。
游stage输出的Partition拆分后,分配给直接下游stage来进行Join操作的实现方案进行
介绍。
战之外,不同路数据在Join算子上发生交互,也衍生了更多数据处理的场景。然而,数据分
配不均匀等情况,将导致Join操作出现数据倾斜和长尾问题,这成为分布式系统中比较常
见且一直没有被系统化解决的问题。
和M2的两路输入数据可按照Partition写出,并且按照分区编号被shuffle到J3不同的工作
节点上去实现数据Join。其中,中间数据会按照分区编排,被保存在物理介质上。
了分布式SQL的Join过程的另一示例图。如图5B所示,M1提供给J3的输入数据中的
Partition1(分区编号为1的Partition)存在严重的数据倾斜,同时,M2提供给J3的输入数
据中Partition1存在轻微的数据倾斜,在这种情况下,J3对Partition1进行Join操作时,J3
的工作节点将存在严重的长尾,甚至由于内存超限而导致作业失败。
stage提供多路输入数据;其中,一路输入数据包括多个Partition。
Join stage的多个工作节点。
Partition(为便于说明,存在数据倾斜的Partition可称为目标Partition),则本申请实施
例可将目标Parititio拆分为多个Partition,并将多个Partition分配给Join stage的多
个工作节点。
值,则确定Partition为存在数据倾斜的目标Partition。
量均匀分布并且每个子Partition的数据量不大于第二数据量阈值。在进一步的一些实施
例中,第二数据量阈值小于第一数据量阈值,第一数据量阈值和第二数据量阈值的具体数
值可根据实际情况定义,本申请实施例并不设限。在一种实现示例中,第二数据量阈值可以
例如前文描述的理想数据量。
后,针对目标Partition拆分后的各个子Partition,本申请实施例可确定其他路输入数据
(多路输入数据中与该某路输入数据不同路的输入数据)中,与子Partition属于相同分区
编号的Partition,从而将所确定的Partition广播到子Partition分配的工作节点,以使得
子Partition与其他路输入数据中属于相同分区编号的Partition能够在Join stage进行
正确Join。
两路输入数据,多路输入数据的路数可以大于二,而不限于只有两路输入数据。在此情况
下,多路输入数据中存在目标Partition可能存在如下几种情况。
第一路输入数据中的一个目标Partition拆分为多个子Partition,并将各子Partition分
配给Join stage的工作节点;而其他路输入数据(多路输入数据中除第一路输入数据外的
输入数据)中与目标Partition属于相同分区编号的Partition,将被广播到各子Partition
分配的工作节点。
执行Join的工作节点上不需要额外维护Partition、Sort等特性的算子,则主节点在统计了
M1和M2输出的各个Partition的数据量后,可判断M1输出的Partition1为存在数据倾斜的
目标Partition,从而将M1输出的Partition1拆分成多个子Partition。图5D中,M1的
Partition1所在方框中的一个小方框可表示拆分的一个子Partition。M1的Partition1拆
分成的多个子Partition,可分别分配到Join阶段的多个工作节点上进行Join处理。同时,
为了在Join阶段,能够对于拆分后的子Partition进行正确的Join操作,主节点需将M2中与
目标Partition相同分区编号的Partition,分别广播给各子Partition所分配的工作节点;
例如,主节点需将M2所产生的Partition1广播给M1的各子Partition分配的工作节点。通过
向Join阶段的多个工作节点广播M1和M2相同分区编号的Partition,能够在目标Partition
拆分为多个子Partition的情况下,保障相同分区编号的数据可以被正确的Join并产出结
果。
入数据中每个目标Partition的处理与第一种情况同理。在一些实施例中,针对第一路输入
数据中的每个目标Partition,主节点可将目标Partition拆分为多个子Partition,并将目
标Partition的子Partition分配给Join stage的工作节点,同时,其他路输入数据中与目
标Partition属于相同分区编号的Partition,将被广播到各子Partition分配的工作节点。
只不过在第一路输入数据存在多个目标Partition的情况下,每个目标Partition均需要按
照上述方式进行处理。
Partition,并将该目标Partition的各子Partition分配给Join stage的工作节点,同时,
将其他路输入数据中与该目标Partition属于相同分区编号的Partition,广播到该目标
Partition的各子Partition分配的工作节点。
分区编号的Parition,以实现在Join阶段将拆分后的子Parition与另一路相同分区编号的
数据进行正确Join。例如,假设图5D中,M1输出的Partition3也存在数据倾斜,则M1输出的
Partition3也可拆分成多个子Partition,并分别分配到Join阶段的多个工作节点,同时,
M2中的Partition3会被分别广播到M1的Partition3对应的子Partition分配的工作节点。
点可将第一路输入数据中的一个目标Partition拆分为多个子Partition,并将拆分的各子
Partition分配给Join stage的工作节点;同时,主节点可将第二路输入数据中的一个目标
Partition拆分为多个子Partition,并将拆分的各子Partition分配给Join stage的工作
节点。针对于第一路输入数据和第二路输出中相同分区编号的子Partition,需要分别将子
Partition广播到相同分区编号的其他路子Partition所分配的工作节点。例如,将第一路
输入数据的子Partition,广播到第二路输入数据的相同分区编号的子Partition所分配的
工作节点,将第二路输入数据的子Partition,广播到第一路输入数据的相同分区编号的子
Partition所分配的工作节点。
Partition1需要被拆分成多个子Partition,则M1的Partition1拆分的多个子Partition会
被分配到Join阶段的多个工作节点,M2的Partition1拆分的多个子Partition也会被分配
到Join阶段的多个工作节点,且一个子Partition分配到一个工作节点。图5E中M1、M2的
Partition1所在方框中的一个小方框可表示拆分的一个子Partition。为保障能够对M1和
M2相同分区编号的子Partition进行正确Join,主节点可将M1的Partition1拆分的多个子
Partition,广播到M2的相同分区编号的子Partition所分配的工作节点,同理,主节点可将
M2的Partition1拆分的多个子Partition,广播到M1的相同分区编号的子Partition所分配
的工作节点。
第一路输入数据和第二路输入数据中一个相同的目标Partition的处理过程与第三种情况
同理。只不过在第一路输入数据和第二路输入数据均存在多个目标Partition的情况下,第
一路输入数据和第二路输入数据每个相同的目标Partition均需要按照上述方式进行处
理。
和灵活的Join操作,其工作节点是有可能包含各种各样的算子。在这种情况下,对于输入
Join的Partition发生数据倾斜时,除需要利用上述示例的方式将Partition进行拆分外,
还需在完成数据Join之后加入一个union(联合)操作,从而实现将拆分后的子Partition在
Join之后重新收拢起来。在数据的特殊属性需要得到保留(比如数据不是直接落盘,而是还
有下游操作/stage)的时候,需要通过union操作来保证后续执行的正确性。
Parition1对应的子Parition,和M2的Parition1进行Join操作外,还需将各子Parition的
Join操作结果进行联合,从而生成一个新的Parition1。
Parition,和M2的Parition1的相同分区编号的子Parition进行Join操作外,还需将各子
Parition的Join操作结果进行联合,从而生成一个新的Parition1。
同分区编号的数据分区的Join操作结果,或者,一路输入数据的子数据分区与其他路输入
数据中相同分区编号的子数据分区的Join操作结果。
并保障新的shuffle模式下,Join操作的正确性。相比通过人工调整作业数据处理逻辑的方
式,以及对数据的预处理等需要大量终端用户介入的操作,本申请实施例能够使得Join操
作的输入数据的倾斜处理具有自适应性以及普适性。尤其是在主节点收集各stage的输出
数据的统计信息的情况下,本申请实施例使得数据倾斜的调整能够自动化执行,而不需要
用户手动调整。本申请实施例提供方案的自适应性使得其能够在作业运行期间,根据实际
Join输入的数据特点作出动态决策,无需终端用户感知和参与,并且能够使得数据在多个
分布式的工作节点间均匀分布,对于存在数据倾斜的作业能起到显著的加速作用,实现分
布式系统性能的显著提升。
据分布和特性相关,而这些数据特性只能在作业执行过程中才能准确获得。不同的数据特
性,可能需要配置不同逻辑的执行计划才能有效、准确的实现。因此针对静态执行计划而
言,如果执行计划一旦确定且无法在作业的执行过程中动态调整,那么无疑无法对执行计
划的逻辑实现合理、准确的配置。
以实现Table1与Table2这两个源表的Join为例,下面对Sort Merge Join和Broadcast
Join的实现过程进行说明。
数据按照shuffle/Join key进行分区;M1和M2的输出数据可在下游工作节点按照相同的
key进行merge Join操作。在上述过程中,实现Merge Join需要对M1与M2的输出数据进行全
量的shuffle 和sort(排序)操作,通过保证相同分区的数据都能被分配到同一个下游工作
节点。然而在分布式系统中,Sort Merge Join依赖外排等具体实现能够做到对于任意数据
量的处理,但是过程中涉及的大量的shuffle以及sort操作,需要消耗较多的计算及网络资
源;并且在数据分布不均匀时,shuffle后的数据可能导致严重的长尾,影响执行效率。
小能被载入单个工作节点内存,那么分布式shuffle和sort的计算/网络消耗能够被降低。
结合图6B所示,图6B中的Table1为大表,Table2为小表,则小表的数据可以Broadcast到所
有大表的工作节点,并根据小表数据建立全量的hash table(哈希表),从而大表数据读取
后可通过hash table lookup(哈希表查表)来进行Join。这样大表(Table1)的数据只需要
读取一次,而且不需要进行任何的数据shuffle和sort。同时,Broadcast Join除能避免
shuffle和sort的计算/网络消耗以外,shuffle可能带来的数据倾斜以及长尾也能得以避
免,这是因为在Broadcast Join的Join pattern(模式)中,Join的逻辑实际上发生在大表
的读取逻辑中(例如,Map stage)。基于此,Broadcast Join也被称为Broadcast Map Join。
当然,Broadcast Join在带来的资源/性能等众多好处时,也存在其特定的适用范围:用于
建立hash table的小表数据,必须能全量被单个工作节点载入,如果优化器选择了
Broadcast Join执行作业,但是执行过程中发现小表数据量超过了内存限制,那么整个作
业就会失败。
法,那么就需要更为准确、合理的判断:在保证作业能成功完成的前提下,尽可能的配置高
效的执行计划逻辑。但在实际线上场景中,优化器要在作业执行之前,就做出上述判断是非
常困难的,主要原因如下:
如一张表刚刚导入统计信息还未产生,或者,表内容的更新刚刚触及废弃统计数据的阈值
等)。总之,统计信息的缺失以及不准确,使得优化器无法准确预估Join的上游输入的大小,
Join的上游输入可能是源表,或者源表经过一定逻辑转换后的输出。
作节点,源表的数据随着上游的selection(选择)/filter/ aggregation(聚集)等复杂转
换,可能穿插各种用户自定义代码逻辑(UDF),这些都对优化器事先预估Join的输入数据量
造成了困难。
的选择只能尽量保守, 例如,将小表的门限设置的尽可能低,从而丧失了大量的优化机会。
而即便小表门限已经配置得很低,由于数据预估错误,数据膨胀等原因,依然会因误判
Broadcast Join导致作业执行失败的情况。由于这些极端情况下会导致作业执行失败的后
果,因此反馈到优化器的策略上,优化器会进一步选择更加保守的策略,从而造成负面循
环。另外,Broadcast Join的触发很大程度上基于人工手动添加的Map Join hint(Map
Join提示),即将Broadcast Join计划的产生交由用户来决定;这种优化器职能的外放,给
用户逻辑维护带来了额外的困难,而实际上,用户只能比较准确的感知源表数据体量,而无
法准确得知非源表数据在经过数据变化后的输出大小,因此用户指定Map Join hint同样
无法避免数据及上游处理逻辑变化而导致的作业执行失败。
布等)需要在作业执行过程中,由上游工作节点完成之后才能获得,因此要做出Join算法的
准确判断,则需要在分布式作业的执行过程中进行判断,而不是在作业执行之前进行判断。
然而,在作业执行过程中进行Join算法的判断和选择对于执行引擎的DAG动态能力提出了
挑战:选择Sort Merge Join以及Broadcast Join等不同的Join算法时,所产生的执行计划
不仅在物理属性(并发度,shuffle模式等)会有区别,而且在DAG的拓扑逻辑结构上,也会有
较大的变化,因此如果要在作业执行过程中来进行动态调整,就需要提供动态逻辑图的能
力(也就是说,执行计划的逻辑能够被动态调整)。需要说明的是,逻辑图的调整往往也伴随
着执行计划的物理属性调整,因此这实际是对DAG动态逻辑图以及动态物理图能力均提出
了要求。
径,一条执行路径包括所述上游stage的一个或多个下游stage;所述控制节点用于从所述
多条执行路径中选择所述上游stage在下游实际执行的目标执行路径。
路径可以包括该上游stage的直接下游stage,或者,直接下游stage和间接下游stage。在一
些实施例中,执行计划中的一个或多个上游stage可以在其下游携带候选的多条执行路径。
径中选择一条执行路径,作为该上游stage在下游实际执行的目标执行路径。需要说明的
是,该控制节点并不属于执行计划中的执行阶段,并不实际调度工作节点等计算资源,而是
标识执行计划在上游stage的下游需要加入控制逻辑,以便从多条执行路径中选择实际执
行的目标执行路径。
多条执行路径的执行计划的示意图。如图6D所示,M1的下游存在候选的两条执行路径:
Path0和Path1。并且M1的下游设置有控制节点C8_1,控制节点C8_1与Path0和Path1相联通,
可用于从Path0和Path1中,选择M1的下游实际执行的目标执行路径。
示,M1在作业执行过程中执行完成后,M1的输出数据的统计信息可被主节点采集。基于主节
点采集的上游stage的输出数据的统计信息,主节点可通过控制节点,基于该统计信息,从
上游stage在下游的多条执行路径中选择实际执行的目标执行路径。例如,结合图6D所示,
控制节点C8_1可基于M1的输出数据的统计信息,从Path0和Path1中,选择M1的下游实际执
行的目标执行路径。
来从多条候选的执行路径中选择最终执行的目标执行路径。通过这样的方式可以在作业的
执行过程中,根据上游stage的输出数据的实际情况来选取最终的执行路径,使得执行路径
的选择更为合理、准确。基于合理、准确的执行路径来执行作业,能够显著提升分布式系统
的性能。
执行路径的执行计划的流程图。该方法流程可由主节点执行实现。参照图7A,该方法流程可
以包括如下步骤。
候选信息包括第一候选信息以及第二候选信息。
plan(有条件执行计划),基于源数据在作业执行过程中可能使用的任务,优化器可生成该
源数据的多个candidates(候选信息),以及具有该多个candidates的物理计划。在一些实
施例中,任务可以包括一个或多个算子(operator),例如通过算子的处理过程来实现具体
任务的执行。
用的Broadcast Join和Sort Merge Join,生成该源表的两个candidates,其中一个
candidate表示Broadcast Join,另一个candidate表示Sort Merge Join。在一些实施例
中,物理计划可以认为是节点树的数据结构。
相关算子进行data pipe(数据管道)连接,以在算子树中转化出第一执行路径。
进行多次遍历,并且预先设置进行执行路径转化的预设算子,当优化器第一次遍历到预设
算子时,则可对多个候选信息中第一候选信息对应的第一执行路径进行转化。在一些实施
例中,多个候选信息至少包括第一候选信息以及第二候选信息,其中,第一候选信息与第一
执行路径相对应,可记录第一执行路径对应的任务,第二候选信息可与第二执行路径相对
应,可记录第二执行路径对应的任务。
路径的任务能够与算子树的原始任务进行联通,优化器可将新增任务的算子与原始任务中
的相关算子进行管道连接,原始任务中的相关算子可以认为是原始任务中与新增任务的算
子相关的算子(例如原始任务中处于新增任务的算子的上游、下游的算子)。通过上述处理,
优化器可以在算法树中转化出第一执行路径。
相关算子进行data pipe连接,以在算子树中转化出第二执行路径。
的任务,新增的任务可以包括一个或多个算子,为使得第二执行路径的任务能够与算子树
中的已有任务进行联通,优化器可将新增任务的算子与已有任务中的相关算子进行data
pipe连接,已有任务中的相关算子可以认为是已有任务中与新增任务的算子相关的算子
(例如原始任务中处于新增任务的算子的上游、下游的算子)。通过上述处理,优化器可以在
算法树中转化出第二执行路径。
可选实现。
和第二执行路径中选择目标选择路径,优化器还可在算子树的第一执行路径和第二执行路
径的上游设置控制节点,以通过控制节点实现作业执行过程中第一执行路径和第二执行路
径的动态选择。通过上述过程,本申请实施例可生成携带多条执行路径的执行计划。
多个candidates(候选信息)产生具有多条执行路径的执行计划。最终哪个candidate的执
行路径被选择使用,则是由主节点在作业执行过程中,根据上游stage产生的输出数据的统
计信息,来进行动态选择。
的执行计划。
小表的RowCount(行数)* AverageRowSize(平均行大小)估算得到小表在内存中的数据大
小。进而,优化器可判断小表在内存中的数据大小是否小于预设的第一门限值(第一门限值
可以预先设置并定义为threshold1)。如果小表在内存中的数据大小,小于预设的第一门限
值,则优化器会触发数据库的conditional Map Join(有条件的映射连接),例如产生小表
数据进行Map Join的多个candidates,一个candidate可以表示小表使用的一个Join算子
(例如Broadcast Join或Sort Merge Join)。基于conditional Map Join提供的多个
candidates来实现后继的动态决策,优化器可以将Cost‑based Optimization环节的第一
门限值threshold1配置的比较大(默认值为512M),也就是只要按照比较宽松的概率,判断
出一个作业可能利用Broadcast Join,就产生conditional Map Join的计划,而最终选择
执行路径的选择,则交给执行引擎在作业执行过程中来进行动态选择。
RelNode表示relational expression(关系表达式)。在Cost‑based Optimization环节,
physical plan中包含了conditional Map Join这样的控制节点,而该控制节点内部基于
多个candidates可以包含多条Path(执行路径),以表达进行Join算法可能选择的路径,例
如Broadcast Join和Sort Merge Join对应的执行路径。而无论最终选择哪条Path,小表的
计算和数据会在该多条Path共享。conditional Map Join在优化器的cost model中定义的
cost可以介于Broadcast Join和Sort Merge Join两者的cost之间,并且根据最终选择的
Path的概率来决定两者的cost比例。
runtime(运行时组件)理解的physical operator tree(物理算子树)以及构造出DAG
topology(拓扑结构)。因此最终的执行计划包括了由task和edge组成的DAG ,以及每个工
作节点内部的operator tree(即物理算子树)。总体而言,上述过程是一个physical plan
(物理计划)的后序遍历过程,并在遍历过程中动态切分task以及添加Edge。不同于普通
query,conditional Map Join要实现运行时的动态决策和选择Path,那么在execution
plan产生过程中需要添加控制任务和dependent edge(从属连接边),从而准确地通过
dependent edge来描述上下游工作节点之间的调度依赖关系(无数据流动)以及一个单纯
的控制节点。例如,将包含正常执行operator的普通工作节点通过dependent edge和控制
节点相连。通过上述设置可以在DAG运行过程中,由控制节点通过判断小表实际运行后的大
小,来选择后续的执行路径。
中,图示中的粗箭头指代data pipeline(数据管道连接)、细虚线箭头表示control
pipeline(控制管道连接)、细实线箭头表示runtime operator data flow(运行期间算子
的数据流)。在一个实现示例中,当主节点在遍历physical plan的过程中,如果遍历到
conditional Map Join,并准备转化conditional Map Join包含的多个Path时,主节点可
将conditional Map Join的输入relNode tree(控制节点树)转换为operator tree(算子
树),operator tree中的operator可以形成图7B所示的M1和M2这两个task,例如M1使用
TableScan1(表扫描1)和Filter1(过滤1)这两个operator,M2使用TableScan2(表扫描2)、
Filter2(过滤2)和StreamlineWrite1(流线写1)这三个operator。
Join的Join实现,Path1可以对应Sort Merge Join的Join实现。结合图7C所示,主节点在遍
历operator tree的过程中,如果第一次遍历到M1的filter1,则可对Path0进行转化。具体
来说,基于Path0的candidate记录的task,主节点可在算子树中新增Path0的task,以及将
新增的task中的算子与M1和M2进行data pipe连接。结合图7C所示,R3为新增的task并且具
有StreamlineRead1(流线读取1),StreamlineWrite2(流线写2)这两个新增的算子,R1中新
增了StreamlineRead2(流线读取2)以及ConditionalMapJoin1。其中,M1中新增的
ConditionalMapJoin1是Broadcast Hash Join的operator id(算子标识),其operator类
型是HashJoin。
原本的operator进行记录以便后续进行copy(复制)。从当前新增加的四个operator可以推
断出目前正在处理的是R3和M1,此时,将其记录下来作为一条候选的Path0。
遍历到Filter1时,可对Path1进行转化。具体来说,基于Path1的candidate记录的task,主
节点可在已转化Path0的算子树上新增Path1的task,以及将新增的task中的算子与当前任
务中的相关算子进行data pipe连接。结合图7D所示,M5和J6为新增的task,在新增M5的
task时,可以将前面记录的M1的operator(例如图7D中的TableScan1和Filter1),copy到M5
中,同时在M5中新增StreamlineWrite3(流线写3)算子。在新增J6的task时,可以在J6中新
增StreamlineRead3(流线读取3)、StreamlineRead4(流线读取4)和ConditionalMapJoin1
这三个算子,并且实现J6与M2和M5之间在算子层面的data pipe。
基于表执行过程中的大小,来从Path0和Path1中选择进行Join的执行路径。
第二门限值,若是,则选择第一执行路径作为目标执行路径,若否,则选择第二执行路径作
为目标执行路径。假设上游stage处理的数据为小表,则在作业执行过程时,如果小表对应
的task执行完毕,则控制节点可根据小表实际执行的大小,判断小表的数据输出结果是否
小于预设的第二门限值(定义为threshold2),若小表的执行输出结果小于第二门限值,则
选中Broadcast Join的执行路径(即第一执行路径为执行Broadcast Join的执行路径);若
小表的执行输出结果大于第二门限值,则选中Sort Merge Join的执行路径(即第二执行路
径为执行Sort Merge Join的执行路径)。需要说明的是,threshold2考虑的是在实际运行
时,小表数据在内存加载成hash table所占用的内存大小,其默认值可以和threshold1一
样(例如512M),当然本申请实施例也可设置threshold2与threshold1的数值不同。另外需
要说明的是,只有在小表估算的大小满足threshold1,才能触发conditional Join plan,
而在作业运行中,主节点收集到小表真正的大小时,是通过threhold2来决策小表进行Join
最终的执行路径;所以threshold1是在优化器阶段做决策用,threshold2是在DAG运行阶段
做决策用,通过threshold1和threshold2这两个数值的调整,以及优化器和DAG的互相配合
来完成整个conditional (条件)的生成以及最终执行计划的选择。
执行路径Path0和Path1)。同时DAG中新增加了控制节点(C8_1),这个控制节点只有逻辑上
的控制意义不会拉起任何工作节点,控制节点根据M1的输出来选择执行Path0还是Path1。
例如,在作业执行过程中,M1在读取小表输出数据的candidate并做处理后,其实际输出的
数据量会被主节点收集,并在控制节点基于小表实际输出的数据量进行执行路径的决策选
择。如果选择执行Path0,则完整执行计划可以例如图7F所示,如果选择执行Path1,则完整
执行计划可以例如图7G所示。
出数据的统计信息,来从多条候选的执行路径中选择最终执行的目标执行路径。通过这样
的方式可以在作业的执行过程中,根据上游stage的输出数据的实际情况来选取最终的执
行路径,使得执行路径的选择更为合理、准确,从而基于合理、准确的执行路径执行作业,能
够显著提升分布式系统的性能。
度学习作业,分布式系统在调度与执行上仍然存在各种缺陷。例如对于深度学习系统(比如
Tensorflow)的原生逻辑而言,原生逻辑的调度与执行完全依赖外部系统而并没有配置在
分布式系统的执行引擎内。以Parameter Server(参数服务器,PS)架构为例,分布式系统的
工作节点可以分为两类:PS节点和Worker(工作器)节点,其中PS节点存放深度学习参数(例
如深度学习模型的参数),而Worker节点用于计算深度学习参数的梯度。在深度学习的每个
迭代过程,Worker节点从PS节点中获得深度学习参数,然后将计算的梯度返回给PS节点,PS
节点聚合从Worker节点传回的梯度并更新深度学习参数,PS节点将更新的深度学习参数再
广播给Worker节点,以此方式不断执行,从而实现深度学习参数迭代调整。在分布式系统的
PS架构中,PS节点与Worker节点在运行过程中存在如下特点:
可称为Worker stage(工作器执行阶段);
依赖关系无法映射到上游节点运行完毕再调度下游节点的逻辑中。基于此,在许多外部系
统中,PS节点与Worker节点只能在执行计划中对应两个孤立无联系的stage来分开调度和
运行,而这完全有可能导致Worker节点在PS节点调度起来之前就已经在空转,从而造成
Worker节点的资源浪费。除此之外,由于缺失PS节点、Worker节点在不同stage之间的关系
描述,许多基本功能和动态功能都无法实现。
用的资源大小(比如一个工作节点使用的GPU数量等)。然而让用户自己选择合适的资源是
比较困难的,用户为了保证深度学习作业能够有足够使用的资源,往往会过量的申请资源,
导致资源的浪费情况。例如,用户为了保证深度学习作业的GPU使用得到保障,会在对实际
使用的GPU资源无法准确预测的情况下,为每个工作节点申请多张GPU卡,然而深度学习作
业实际执行过程中,可能只利用了其中的25%,这导致剩余的GPU资源出现闲置而产生浪费。
这种情况导致的一个累积效应就是分布式系统上大量的GPU资源由于用户的申请而被预
留,甚至用户申请的GPU资源超过分布式系统的GPU资源总量,出现分布式系统的实际GPU资
源利用率较低,而其他作业则需要排队使用GPU资源的现象。另一方面,许多深度学习作业
对于资源(尤其是GPU资源)的使用是特别敏感的,如果盲目降低允许用户申请的资源上限,
可能会导致深度学习作业的执行性能出现下降。
响,成为了亟待解决的问题。
(并行边)的物理属性,并使得顺序边和并行边与数据传输进行解耦。并行边描述的是并行
边连接的上、下游stage的工作节点可以同时处于运行状态,但是调度上依然有先后,而且
调度的时机可以自定义。例如并行边连接的上、下游stage的工作节点可以同步调度,也可
以是下游stage的工作节点在上游stage的工作节点运行后进行调度,也可以下游stage的
工作节点在上游stage的工作节点运行到一定阶段后再由事件触发调度等。而顺序边描述
的是顺序边连接的下游stage的工作节点,需要等到上游stage的工作节点全部或者部分执
行结束后才能调度运行。
(工作器执行节点)的示例图。如图8A所示,PS stage为PS节点在执行计划中对应的stage,
Worker stage为Worker节点在执行计划对应的stage,在执行计划中PS stage通过并行边
与Worker stage连接,且并行边由PS stage输入Worker stage。从而,Worker节点和Worker
节点可以同时处于运行状态,并且调度时机可以进行自定义。在一些实施例中,下表1示例
性的示出了并行边连接的下游节点的调度时机的类型(称为调度类型),可进行参照,其中,
上游节点可以认为是并行边连接的上游stage对应的工作节点(例如图8A所示的PS stage
对应的PS节点),下游节点可以认为是并行边连接的下游stage对应的工作节点(例如图8A
所示的Worker stage对应的Worker节点)。
SOURCE_TASK_STARTED 当上游节点实例已经启动后,开始调度下游节点
SOURCE_TASK_PROGRESS 当上游节点实例的进度达到一定阈值时开始调度下游节点
SOURCE_TASK_OUTPUT_READY 当上游节点输出数据准备好后开始调度下游节点
SOURCE_VERTEX_CONFIGURED 当上游节点配置完成后开始调度下游节点
SOURCE_VERTEX_STARTED 当上游节点启动后开始调度下游节点
MIXED 下游节点的调度时机由上游节点多种类型或事件综合影响
作业,近实时/准实时作业,深度学习作业等。
Worker stage之间的连接边为并行边,且并行边由PS stage输入Worker stage;也就是说,
在执行计划中,Worker stage作为PS stage的直接下游stage,并且通过并行边连接。由于
Worker只能在PS开始运行后运行才有意义,因此下游的Worker节点的调度类型为SOURCE_
TASK_STARTED,也就是说,上游的PS节点的实例启动后,开始调度下游的Worker节点。在数
据生成方面,由于PS节点上的数据仅在其处于运行状态时有效,且上游数据源不感知下游
状态,因此数据源类型为EPHEMERAL_STATELESS(短暂无状态)的类型;而在数据传输类型方
面,由于PS节点与Worker节点的数据传输可以不由执行框架感知,因此数据传输类型为
NONE(空)。因此连接PS stage和Worker stage的连接边可以描述为{CONCURRENT,
EPHEMERAL_STATELESS,NONE,SOURCE_TASK_PROGRESS}。
理。这种针对深度学习作业的执行计划的描述,使得作业运行过程中的动态调整成为可能。
具体来说,深度学习的执行引擎需要终端用户为执行计划提供大量的配置参数,例如stage
的并发度、需求的资源大小与类型、分布策略等,而这些配置参数由终端用户提供的难度极
大。基于此,本申请实施例在分布式系统的执行引擎引入Resource Optimization(资源优
化)节点,资源优化节点作为一个作业执行的控制节点,可以对资源相关请求进行协调和动
态调整。
资源。在此基础上,除了在执行计划中设置由PS stage输入Worker stage的并行边外,还需
在执行计划中增加设置与Resource Optimization节点对应的Resource Optimization
stage(资源优化执行阶段),以及由Resource Optimization stage输入Worker stage的并
行边。连接Resource Optimization stage和Worker stage的连接边可以描述为
{CONCURRENT,EPHEMERAL_STATELESS,NONE,SOURCE_TASK_PROGRESS}。与连接PS stage和
Worker stage的连接边的描述不同的是,连接Resource Optimization stage和Worker
stage的连接边采用了下游节点是SOURCE_TASK_PROGRESS的调度类型。也就是说,Worker节
点需要在上游的Resource Optimization节点的执行进度达到一定阈值后进行调度。
度达到阈值时通知下游Worker节点进行调度。而Worker节点收到上述两个通知后,可基于
Resource Optimization节点通知,更新调度的资源后启动相应实例(资源基于Resource
Optimization节点的控制而动态变化)。在一些实施例中,Resource Optimization节点可
以动态调整Worker节点的以下资源:节点并发度,节点的GPU、CPU、MEMORY等资源的使用需
求。
可由DAG描述),执行计划中可以包括PS stage、Worker stage以及Resource Optimization
stage。并且在DAG图中,PS stage由并行边输入Worker stage,Resource Optimization
stage也由并行边输入Worker stage。在进一步的一些实施例中,PS stage和Worker stage
的并行边描述,以及Resource Optimization stage和Worker stage的并行边描述,可参照
前文相应的描述,此处不再展开。
应于Worker stage的资源信息。
一些实施例中,Resource Optimization节点调度之后,Resource Optimization节点可确
定当前适应于Worker stage的资源信息。在一些实施例中,Resource Optimization节点可
确定与深度学习作业的当前执行状态相匹配的历史使用的资源信息,将所述当前执行状态
相匹配的历史使用的资源信息,作为当前适应于Worker stage的资源信息。
度学习作业在各个历史执行状态实际使用的资源信息,也就是说,历史数据库可以记录历
史执行结束的深度学习作业在各个执行状态实际使用的资源信息。从而Resource
Optimization节点可基于历史数据库中的记录,确定出适应于深度学习作业的当前执行状
态的资源信息,来为Worker节点进行资源配置。
历史执行状态实际使用的资源信息,确定为当前适应于Worker stage的资源信息。作为可
选实现,Resource Optimization节点可先从历史数据库中查找与当前执行状态相同的历
史执行状态,若查找到,则将历史数据库中记录的该相同的历史执行状态相应的资源信息,
作为当前适应于Worker stage的资源信息;若未查找到,则查找与当前执行状态相似的历
史执行状态(例如查找与当前执行状态的差异最小的历史执行状态),将历史数据库中记录
的该相似的历史执行状态相应的资源信息,作为当前适应于Worker stage的资源信息。在
一些实施例中,深度学习作业的当前执行状态例如深度学习作业的当前学习模式、当前输
入数据的特点(例如当前训练数据的数据量)、当前剩余的参数迭代次数等。
Optimization节点为执行计划中的Worker stage配置该资源信息,以使得Worker stage配
置的资源信息适应于深度学习作业的当前执行状态。进而,Worker节点调度时,Worker节点
可基于所述资源信息调度使用的资源,使得Worker节点以合理的资源执行task,提升
Worker节点的资源利用率。
信息进行调整,避免用户由于过量指定Worker stage的资源而导致资源闲置的情况。例如,
用户指定了Worker节点使用2个CPU core(核)和1个GPU core(核)的资源单位,如果
Resource Optimization节点基于深度学习作业的当前执行状态确定Worker节点实际不需
要使用这么多的资源,则Resource Optimization节点可将用户事先指定的CPU core和GPU
core的数量进行调整,例如调整为Worker节点使用为1个CPU core 和半个GPU core,避免
由于用户为Worker节点指定过多数量的CPU core和GPU core,而导致CPU和GPU资源的闲置
浪费。
指定),在深度学习作业的执行过程中,Resource Optimization节点可基于作业的当前执
行状态,从历史数据库中查找相似或者相同的历史执行状态实际使用的资源信息,例如与
作业的当前执行状态相似或者相同的历史执行状态实际使用100个CPU核和50个GPU核,则
Resource Optimization节点可将Worker节点配置的CPU核数量调整为100个、GPU核数量调
整为50个,使得Worker节点能够通过配置较低的资源,来保障深度学习作业的具体执行。
行状态,从多个资源需求方案中选择与当前执行状态相匹配的资源需求方案,并以选择的
资源需求方案为Worker stage配置资源信息。与当前执行状态相匹配的资源需求方案可以
认为是能够满足当前执行状态的作业执行,且资源需求量最少的方案。
布式系统计算资源,提升分布式系统的资源利用率。本申请实施例可以在大规模生产集群
上线使用后,能够在不影响用户针对深度学习的实际训练耗时的前提下,大幅提升深度学
习作业的资源(尤其是GPU资源)的利用率,同时带来了作业的吞吐率大幅度提升,用户作业
排队的情况能够得到极大缓解。
够避免工作节点的资源空转等待,并能够提供更准确的作业执行控制流程。并且,本申请实
施例能够在作业运行期间,对深度学习作业所需要实际使用的GPU等资源进行动态选择和
调整,确保资源的使用能根据作业实际运行需求和算法特点进行适配,保障高效的资源使
用,从而避免了大规模多用户的分布式系统中,超量资源申请以及实际资源使用率较低的
矛盾。
一个处理器1,至少一个通信接口2,至少一个存储器3和至少一个通信总线4。在本申请实施
例中,处理器1、通信接口2、存储器3、通信总线4的数量为至少一个,且处理器1、通信接口2、
存储器3通过通信总线4完成相互间的通信。可选的,通信接口2可以为用于进行网络通信的
通信模块的接口。可选的,处理器1可能是CPU(中央处理器),GPU(Graphics Processing
Unit,图形处理器),NPU(嵌入式神经网络处理器),FPGA(Field Programmable Gate
Array,现场可编程逻辑门阵列),TPU(张量处理单元),AI芯片,特定集成电路ASIC
(Application Specific Integrated Circuit),或者是被配置成实施本申请实施例的一
个或多个集成电路等。存储器3可能包含高速RAM存储器,也可能还包括非易失性存储器
(non‑volatile memory),例如至少一个磁盘存储器。其中,存储器3存储一条或多条计算机
可执行指令,处理器1调用所述一条或多条计算机可执行指令,以执行本申请实施例提供的
分布式作业调整方法。
业调整方法。
认为是本申请实施例披露、公开的实施例方案。
要求所限定的范围为准。