一种高效并行分布式数据处理系统转让专利

申请号 : CN201210315625.8

文献号 : CN102821164B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 彭建华徐立中李臣明石爱业李昌利

申请人 : 河海大学

摘要 :

本发明公开了一种高效并行分布式数据处理系统。该系统包括一组一级数据处理子系统,以及与一级数据处理子系统级联的一组二级数据子处理系统,每个一级/二级数据处理子系统单独安装于一台主机;每个一级/二级数据处理子系统在启动时均创建一个包含多个数据处理线程的线程池;当数据处理系统接收到新数据包时,将新数据包分配给其中一个一级数据处理子系统;收到新数据包的一级数据处理子系统判断自身各数据处理线程的当前数据处理效率是否均大于或等于预设的效率阈值,如否,则将数据包分配给当前数据处理效率最小的数据处理线程处理;如是,则在二级数据处理子系统中为其分配一个数据处理线程。本发明能实现海量数据实时、高效、并发性处理。

权利要求 :

1.一种并行分布式数据处理系统,其特征在于,包括一组一级数据处理子系统,以及与一级数据处理子系统级联的一组二级数据处理子系统,每个一级/二级数据处理子系统单独安装于一台主机;每个一级/二级数据处理子系统在启动时均创建一个包含多个数据处理线程的线程池;当数据处理系统接收到新数据包时,将新数据包分配给其中一个一级数据处理子系统;收到新数据包的一级数据处理子系统判断自身线程池中的各数据处理线程的当前数据处理效率是否均大于或等于预设的效率阈值,如否,则将数据包分配给当前数据处理效率最小的数据处理线程进行处理,所述数据处理线程的当前数据处理效率 按照下式计算:其中,n 为该数据处理线程当前已处理数据包个数, 为该数据处理线程处理第n 个数据包所耗费时间;

如是,则在二级数据处理子系统中为该数据包分配一个数据处理线程。

2.如权利要求1所述并行分布式数据处理系统,其特征在于,所述一级数据处理子系统在启动时,开启一个数据接收线程、一个数据分发线程,一个数据转发线程,以及一个状态采集器,并创建包含多个数据处理线程的线程池,同时开辟一段接收数据动态存储区RBM与一段动态数据存储区ABM,并为线程池中的每一个数据处理线程开辟一个动态存储区SM;数据接收线程用于接收数据包并将接收到的数据包存入接收数据动态存储区RBM;

数据分发线程对接收数据动态存储区RBM的数据包进行循环读取,循环检测线程池中各数据处理线程的当前数据处理效率,并根据检测结果对读取的数据包进行分配:如线程池中各数据处理线程的当前数据处理效率均大于或等于预设的效率阈值,则将数据包发送至数据转发线程,否则,将数据包存入当前数据处理效率最小的数据处理线程的动态存储区SM;

状态采集器负责定时采集各二级数据处理子系统的性能状态数据;数据转发线程根据状态采集器所采集的数据在二级数据处理子系统中为接收到的数据包分配一个数据处理线程;

二级数据处理子系统完成所分配数据包的处理后,将处理结果反馈至发送该数据包的一级数据处理子系统的动态数据存储区ABM中。

3.如权利要求2所述并行分布式数据处理系统,其特征在于,所述状态采集器定时采集的性能状态数据是指各二级数据处理子系统中各数据处理线程的当前数据处理效率;数据转发线程根据状态采集器所采集的数据,将接收到的数据包分配给二级数据处理子系统中当前数据处理效率最小的数据处理线程。

4.如权利要求2所述并行分布式数据处理系统,其特征在于,所述状态采集器定时采集的性能状态数据是指各二级数据处理子系统中各数据处理线程的当前待处理数据包个数、当前已处理数据包个数以及当前数据处理效率;数据转发线程根据状态采集器所采集的数据在二级数据处理子系统中为接收到的数据包分配一个数据处理线程,具体按照以下方法:判断是否存在当前待处理数据包个数C为0或者当前已处理数据包个数n为0的数据处理线程,如是,则将数据包分配给其中任意一个数据处理线程;如否,则将数据包分配给相对空闲率最高的数据处理线程,所述数据处理线程的相对空闲率按照以下公式计算:式中, 为相对空闲率; 为该数据处理线程当前待处理数据包个数; 为该数据处理线程的当前数据处理效率。

说明书 :

一种高效并行分布式数据处理系统

技术领域

[0001] 本发明涉及一种高效并行分布式数据处理系统,属于计算机及通信技术领域。

背景技术

[0002] 海量数据是指数据集的大小已经超过传统软件工具在一定时间范围内可以获取、管理、处理的能力。其具有如下特征:
[0003] ①数据量大:通常达到TB级的数据;
[0004] ②数据类型复杂:海量数据通常不是单一类型数据,而是多种类型数据甚至是远远不断数据流;
[0005] ③多模态数据:对于不同的对象,得到的数据可能是通过不同的方式或角度收集。
[0006] 现在海量数据处理方法趋于成熟,处理海量数据的方法包含了Bloom filter、Hashing、bit-map、堆、双层桶划分、倒排索引、外排序、trie树等等技术,但这些技术只是从单数据层面给出了处理方法,并不能在实际应用中有效解决海量数据的实际处理情况。例如,MapReduce是google提出的一种软件架构,用于处理大规模数据集的并行运算,是当今最先进的大数据并行处理架构。MapReduce把大数据进行拆分成小块数据,然后通过Map方法进行并行处理,处理结果放入中间文件,reduce函数对Map处理结果的中间文件进行合并,这种分解、读写文件以及合并会消耗一定的计算机资源,占用一定的时间;同时,在数据分配方面,MapReduce通过调度方式进行函数分配,其调度计算的时间较长。

发明内容

[0007] 本发明所要解决的技术问题在于克服现有技术不足,提供一种能够对海量数据进行更高效处理的高效并行分布式数据处理系统。
[0008] 本发明具体采用以下技术方案解决上述技术问题。
[0009] 一种高效并行分布式数据处理系统,包括一组一级数据处理子系统,以及与一级数据处理子系统级联的一组二级数据子处理系统,每个一级/二级数据处理子系统单独安装于一台主机;每个一级/二级数据处理子系统在启动时均创建一个包含多个数据处理线程的线程池;当数据处理系统接收到新数据包时,将新数据包分配给其中一个一级数据处理子系统;收到新数据包的一级数据处理子系统判断自身线程池中的各数据处理线程的当前数据处理效率是否均大于或等于预设的效率阈值,
[0010] 如否,则将数据包分配给当前数据处理效率最小的数据处理线程进行处理,所述数据处理线程的当前数据处理效率 按照下式计算:
[0011]
[0012] 其中,n 为该数据处理线程当前已处理数据包个数, 为该数据处理线程处理第n个数据包所耗费时间;
[0013] 如是,则在二级数据处理子系统中为该数据包分配一个数据处理线程。
[0014] 优选地,所述一级数据处理子系统在启动时,开启一个数据接收线程、一个数据分发线程,一个数据转发线程,以及一个状态采集器,并创建包含多个数据处理线程的线程池,同时开辟一段接收数据动态存储区RBM与一段动态数据存储区ABM,并为线程池中的每一个数据处理线程开辟一个动态存储区SM;数据接收线程用于接收数据包并将接收到的数据包存入接收数据动态存储区RBM;数据分发线程对接收数据动态存储区RBM的数据包进行循环读取,循环检测线程池中各数据处理线程的当前数据处理效率,并根据检测结果对读取的数据包进行分配:如线程池中各数据处理线程的当前数据处理效率均大于或等于预设的效率阈值,则将数据包发送至数据转发线程,否则,将数据包存入当前数据处理效率最小的数据处理线程的动态存储区SM;状态采集器负责定时采集各二级数据处理子系统的性能状态数据;数据转发线程根据状态采集器所采集的数据在二级数据处理子系统中为接收到的数据包分配一个数据处理线程;二级数据处理子系统完成所分配数据包的处理后,将处理结果反馈至发送该数据包的一级数据处理子系统的动态数据存储区ABM中。
[0015] 采用上述技术方案时,一级数据处理子系统在为超过自身负荷而无法处理的数据包分配二级数据处理子系统时,可以采用最简单的随机选取的方式,也可根据现有的各种性能评价指标选择性能最好的二级数据处理子系统,本发明优选以下两种方案:
[0016] 优选方案一:
[0017] 所述状态采集器定时采集的性能状态数据是指各二级数据处理子系统中各数据处理线程的当前数据处理效率;数据转发线程根据状态采集器所采集的数据,将接收到的数据包分配给二级数据处理子系统中当前数据处理效率最小的数据处理线程。
[0018] 优选方案二:
[0019] 所述状态采集器定时采集的性能状态数据是指各二级数据处理子系统中各数据处理线程的当前待处理数据包个数、当前已处理数据包个数以及当前数据处理效率;数据转发线程根据状态采集器所采集的数据在二级数据处理子系统中为接收到的数据包分配一个数据处理线程,具体按照以下方法:
[0020] 判断是否存在当前待处理数据包个数C为0或者当前已处理数据包个数n为0的数据处理线程,如是,则将数据包分配给其中任意一个数据处理线程;如否,则将数据包分配给相对空闲率最高的数据处理线程,所述数据处理线程的相对空闲率按照以下公式计算:
[0021]
[0022] 式中, 为相对空闲率; 为该数据处理线程当前待处理数据包个数; 为该数据处理线程的当前数据处理效率。
[0023] 本发明采用两层子系统级联的系统结构,对数据进行多线程并行处理,并以动态变化的当前数据处理效率作为数据处理线程的负荷评价指标,选择负荷较低的数据处理线程;当一级数据处理子系统的各数据处理线程负荷均达到一定程度时,则从级联的二级数据处理子系统中选择当前数据处理效率最低或者相对空闲率最高的数据处理线程来对数据进行处理。本发明对于海量数据的分析处理,具有极高的性能与效率,真正实现了海量数据实时、高效、并发性处理。

附图说明

[0024] 图1为本发明数据处理系统的结构示意图;
[0025] 图2为本发明数据处理系统的一级数据处理子系统的工作流程示意图;
[0026] 图3为本发明数据处理系统的二级数据处理子系统的工作流程示意图。

具体实施方式

[0027] 下面结合附图对本发明的技术方案进行详细说明:
[0028] 本发明的高效并行分布式数据处理系统如图1所示,包括分别部署在N台主机上的N个一级数据处理子系统,以及与一级数据处理子系统级联的分别部署在M台主机上的M个二级数据处理子系统(N、M均为大于1的自然数)。
[0029] 一级数据处理子系统启动时,开启一个数据接收线程TRR、创建包含多个数据处理线程D的线程池、一个数据分发线程TA、一个数据转发线程TS以及一个状态采集器,同时开辟一段接收数据动态存储区RBM与动态数据存储区ABM,并且为每一个数据处理线程D开辟一段动态存储区SM。数据接收线程TRR只负责接收数据,并将数据存入接收数据动态存储区RBM;数据处理线程D负责处理数据;状态采集器定时采集各个级联的二级数据处理子系统的性能状态数据,然后把性能状态数据存储在系统内存中。数据分发线程TA对接收数据动态存储区RBM的数据包进行循环读取,循环检测线程池中各数据处理线程D的当前数据处理效率,并根据检测结果对读取的数据包进行分配:如线程池中各数据处理线程的当前数据处理效率均大于或等于预设的效率阈值M,说明本台机器上部署的一级数据处理子系统的系统处理性能达到饱和,则将数据包发送至数据转发线程TS,否则,将数据包存入当前数据处理效率最小的数据处理线程的动态存储区SM。其中数据处理线程的当前数据处理效率 按照下式计算:
[0030]
[0031] 其中,n 为该数据处理线程当前已处理数据包个数, 为该数据处理线程处理第n个数据包所耗费时间。
[0032] 数据转发线程TS根据状态采集器所采集的数据在二级数据处理子系统中为接收到的数据包分配一个数据处理线程;二级数据处理子系统完成所分配数据包的处理后,将处理结果反馈至发送该数据包的一级数据处理子系统的动态数据存储区ABM中。
[0033] 二级数据处理子系统启动时,开启一个数据接收代理Agent、创建包含多个数据处理线程的线程池、一个数据反馈线程TAR、一个数据分发线程TA′,以及一个性能状态计算器,同时开辟一段接收数据动态存储区RBM′与一段动态数据存储区ABM′,并且为每一个数据处理线程D开辟一段动态存储区SM。数据接收代理Agent只负责接收一级数据处理子系统转发的数据包,并放数据包存入接收数据动态存储区RBM′;数据分发线程TA′对接收数据动态存储区RBM′中的数据包进行读取,并将读取的数据包存入当前数据处理效率最小或者相对空闲率最大的数据处理线程的动态存储区SM中。当前数据处理效率的计算方法上文已给出,此处不再赘述。数据处理线程的相对空闲率按照以下公式计算:
[0034]
[0035] 式中, 为相对空闲率; 为该数据处理线程当前待处理数据包个数(亦即该数据处理线程的动态存储区SM中当前存储的数据包个数); 为该数据处理线程的当前数据处理效率。
[0036] 上式中,当前待处理数据包个数 与当前数据处理效率 不能为0,否则上式无意义,而实际上这两者均可能为0,为此,可规定:当前待处理数据包个数C为0或者当前已处理数据包个数n 为0(即当前数据处理效率 为0)时,其相对空闲率 最大。这样,在根据相对空闲率为数据包分配数据处理线程时,首先判断是否存在当前待处理数据包个数C 为0或者当前已处理数据包个数n 为0的数据处理线程,如是,则将数据包分配给其中任意一个数据处理线程;如否,则将数据包分配给相对空闲率最高的数据处理线程。数据处理线程处理数据后,把处理结果放入动态数据存储区ABM′中;数据反馈线程TAR循环检测动态数据存储区ABM′,把其中的处理结果反馈到对应的一级数据处理子系统的动态数据存储区ABM中。
[0037] 性能状态计算器定时计算并存储其所在的二级数据处理子系统的各数据处理线程的当前数据处理效率,或者当前待处理数据包个数、当前已处理数据包个数以及当前数据处理效率;一级数据处理子系统的状态采集器通过轮询各二级数据处理子系统的性能状态计算器,获得所级联的各二级数据处理子系统性能状态数据,数据转发线程TS即可将数据包发送给性能状态最优的二级数据处理子系统。
[0038] 图2、图3分别显示了本发明数据处理系统中一级、二级数据处理子系统的数据处理流程。