消息处理方法和装置转让专利

申请号 : CN201810551831.6

文献号 : CN108833510B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 王猛涛李乐丁陆丹峰董炫辰

申请人 : 北京百度网讯科技有限公司

摘要 :

本申请实施例公开了消息处理方法和装置。该方法的一具体实施方式包括:响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;向订阅待发布消息的主题的设备发送待发布消息。该实施方式提升了物联网中数据分析计算的效率和安全性。

权利要求 :

1.一种消息处理方法,包括:

响应于边缘计算设备在其所处的局域网内监测到以目标函数订阅的主题发布的待处理消息,接收所述待处理消息,所述待处理消息包括由物联网中的数据采集设备采集得到的待分析数据,且所述待处理消息通过与所述边缘计算设备处于同一局域网的用户端设备发布;

启动本地消息处理引擎,通过所述目标函数的运行实例对所述待处理消息中的待分析数据进行分析计算,得到计算结果;

生成包含所述计算结果的待发布消息,为所述待发布消息配置主题并发布所述待发布消息;

向订阅所述待发布消息的主题的设备发送所述待发布消息。

2.根据权利要求1所述的方法,其中,所述方法还包括:读取所述目标函数的配置信息;

其中,所述配置信息包括:主题路由规则信息、执行函数功能的脚本、函数的类型。

3.根据权利要求2所述的方法,其中,所述通过所述目标函数的运行实例对所述待处理消息中的待分析数据进行分析计算,得到计算结果,包括:在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,运行所述目标函数的运行实例,得到所述计算结果。

4.根据权利要求3所述的方法,其中,所述目标函数的配置信息还包括函数的运行实例的资源配置信息;

所述在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,运行所述目标函数的运行实例,得到所述计算结果,包括:在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,调用所述运行实例的资源配置信息所指示的可用资源运行所述目标函数的运行实例,得到所述计算结果。

5.根据权利要求3或4所述的方法,其中,所述在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,运行所述目标函数的运行实例,得到所述计算结果,包括:在已部署的与所述目标函数的类型对应的运行环境中,启动本地消息处理引擎以将所述待分析数据传输至所述目标函数的运行实例;

利用所述目标函数的运行实例对所述待分析数据进行预处理,将预处理后的消息传递至所述目标函数的执行脚本或所述目标函数的二进制程序进行计算,得到结果数据并传回至所述目标函数的运行实例;

通过所述目标函数的运行实例将所述结果数据转换为所述消息处理引擎支持传输的数据格式,得到所述计算结果。

6.根据权利要求5所述的方法,其中,所述生成包含所述计算结果的待发布消息,包括:通过所述目标函数的运行实例将所述计算结果作为消息的有效载荷传输至所述本地消息处理引擎;

通过所述本地消息处理引擎检测所述目标函数的运行实例返回的消息的有效载荷是否为空;

若所述本地消息处理引擎检测到所述目标函数的运行实例返回的消息的有效载荷不为空,将所述目标函数的运行实例返回的消息作为所述待发布消息。

7.一种消息处理装置,包括:

接收单元,被配置成响应于边缘计算设备在其所处的局域网内监测到以目标函数订阅的主题发布的待处理消息,接收所述待处理消息,所述待处理消息包括由物联网中的数据采集设备采集得到的待分析数据,且所述待处理消息通过与所述边缘计算设备处于同一局域网的用户端设备发布;

计算单元,被配置成启动本次消息处理引擎,通过所述目标函数的运行实例对所述待处理消息中的待分析数据进行分析计算,得到计算结果;

发布单元,被配置成生成包含所述计算结果的待发布消息,为所述待发布消息配置主题并发布所述待发布消息;

发送单元,被配置成向订阅所述待发布消息的主题的设备发送所述待发布消息。

8.根据权利要求7所述的装置,其中,所述装置还包括:读取单元,被配置成读取所述目标函数的配置信息;

其中,所述配置信息包括:主题路由规则信息、执行函数功能的脚本、函数的类型。

9.根据权利要求8所述的装置,其中,所述计算单元进一步被配置成:在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,运行所述目标函数的运行实例,得到所述计算结果。

10.根据权利要求9所述的装置,其中,所述目标函数的配置信息还包括函数的运行实例的资源配置信息;

所述计算单元进一步被配置成:

在已部署的与所述目标函数的类型对应的运行环境中,以所述待分析数据为目标函数的待执行参数,调用所述运行实例的资源配置信息所指示的可用资源运行所述目标函数的运行实例,得到所述计算结果。

11.根据权利要求9或10所述的装置,其中,所述计算单元进一步被配置成:在已部署的与所述目标函数的类型对应的运行环境中,启动本地消息处理引擎以将所述待分析数据传输至所述目标函数的运行实例;

利用所述目标函数的运行实例对所述待分析数据进行预处理,将预处理后的消息传递至所述目标函数的执行脚本或所述目标函数的二进制程序进行计算,得到结果数据并传回至所述目标函数的运行实例;

通过所述目标函数的运行实例将所述结果数据转换为所述消息处理引擎支持传输的数据格式,得到所述计算结果。

12.根据权利要求11所述的装置,其中,所述发布单元进一步被配置成按照如下方式生成包含所述计算结果的待发布消息:通过所述目标函数的运行实例将所述计算结果作为消息的有效载荷传输至所述本地消息处理引擎;

通过所述本地消息处理引擎检测所述目标函数的运行实例返回的消息的有效载荷是否为空;

若所述本地消息处理引擎检测到所述目标函数的运行实例返回的消息的有效载荷不为空,将所述目标函数的运行实例返回的消息作为所述待发布消息。

13.一种电子设备,包括:

一个或多个处理器;

存储装置,其上存储有一个或多个程序,

当所述一个或多个程序被所述一个或多个处理器执行,使得所述一个或多个处理器实现如权利要求1-6中任一所述的方法。

14.一种计算机可读存储介质,其上存储有计算机程序,其中,所述计算机程序被处理器执行时实现如权利要求1-6中任一所述的方法。

15.一种物联网系统,包括:

数据采集设备,被配置成采集数据;

用户端设备,与所述数据采集设备连接,被配置成以待处理消息的主题发布包含所述数据采集设备所采集的数据的待处理消息;

边缘计算设备,包括一个或多个处理器以及存储装置,存储装置上存储有一个或多个程序,当所述一个或多个程序被所述一个或多个处理器执行,使得所述一个或多个处理器实现如权利要求1-6中任一所述的方法。

说明书 :

消息处理方法和装置

技术领域

[0001] 本申请实施例涉及计算机技术领域,具体涉及物联网技术领域,尤其涉及消息处理方法和装置。

背景技术

[0002] 物联网,是指通过各种信息传感设备,实时采集任何需要监控、连接、互动的物体或过程等各种需要的信息,与互联网结合形成的一个实现物与物、物与人,所有的物品与网络的连接的网络。
[0003] 在物联网的各类型应用中,需要根据业务需求对设备采集到的数据进行针对性的处理。目前对大规模海量数据的处理通常采用本地设备将数据上传至具有较强的计算能力的云端,触发云端的处理功能来对数据进行分析计算的方式。

发明内容

[0004] 本申请实施例提出了消息处理方法和装置。
[0005] 第一方面,本申请实施例提供了一种消息处理方法,包括:响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;向订阅待发布消息的主题的设备发送待发布消息。
[0006] 在一些实施例中,上述方法还包括:读取目标函数的配置信息;其中,配置信息包括:主题路由规则信息、执行函数功能的脚本或程序、函数的类型。
[0007] 在一些实施例中,上述通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果,包括:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,运行目标函数的运行实例,得到计算结果。
[0008] 在一些实施例中,上述目标函数的配置信息还包括函数的运行实例的资源配置信息;上述在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,运行目标函数的运行实例,得到计算结果,包括:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,调用运行实例的资源配置信息所指示的可用资源运行目标函数的运行实例,得到计算结果。
[0009] 在一些实施例中,上述在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,运行目标函数的运行实例,得到计算结果,包括:在已部署的与目标函数的类型对应的运行环境中,启动本地消息处理引擎以将待分析数据传输至目标函数的运行实例;利用目标函数的运行实例对待分析数据进行预处理,将预处理后的消息传递至目标函数的执行脚本或目标函数的二进制程序进行计算,得到结果数据并传回至目标函数的运行实例;通过目标函数的运行实例将结果数据转换为消息处理引擎支持传输的数据格式,得到计算结果。
[0010] 在一些实施例中,上述生成包含计算结果的待发布消息,包括:通过目标函数的运行实例将计算结果作为消息的有效载荷传输至本地消息处理引擎;通过本地消息处理引擎检测目标函数的运行实例返回的消息的有效载荷是否为空;若本地消息处理引擎检测到目标函数的运行实例返回的消息的有效载荷不为空,将目标函数的运行实例返回的消息作为待发布消息。
[0011] 第二方面,本申请实施例提供了一种消息处理装置,包括:接收单元,被配置成响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;计算单元,被配置成通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;发布单元,被配置成生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;发送单元,被配置成向订阅待发布消息的主题的设备发送待发布消息。
[0012] 在一些实施例中,上述装置还包括:读取单元,被配置成读取目标函数的配置信息;其中,配置信息包括:主题路由规则信息、执行函数功能的脚本或程序、函数的类型。
[0013] 在一些实施例中,上述计算单元进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,运行目标函数的运行实例,得到计算结果。
[0014] 在一些实施例中,上述目标函数的配置信息还包括函数的运行实例的资源配置信息;上述计算单元进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,调用运行实例的资源配置信息所指示的可用资源运行目标函数的运行实例,得到计算结果。
[0015] 在一些实施例中,上述计算单元进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,启动本地消息处理引擎以将待分析数据传输至目标函数的运行实例;利用目标函数的运行实例对待分析数据进行预处理,将预处理后的消息传递至目标函数的执行脚本或目标函数的二进制程序进行计算,得到结果数据并传回至目标函数的运行实例;通过目标函数的运行实例将结果数据转换为消息处理引擎支持传输的数据格式,得到计算结果。
[0016] 在一些实施例中,上述发布单元进一步被配置成按照如下方式生成包含计算结果的待发布消息:通过目标函数的运行实例将计算结果作为消息的有效载荷传输至本地消息处理引擎;通过本地消息处理引擎检测目标函数的运行实例返回的消息的有效载荷是否为空;若本地消息处理引擎检测到目标函数的运行实例返回的消息的有效载荷不为空,将目标函数的运行实例返回的消息作为待发布消息。
[0017] 第三方面,本申请实施例提供了一种电子设备,包括:一个或多个处理器;存储装置,用于存储一个或多个程序,当一个或多个程序被一个或多个处理器执行,使得一个或多个处理器实现如第一方面提供的消息处理方法。
[0018] 第四方面,本申请实施例提供了一种计算机可读存储介质,其上存储有计算机程序,其中,程序被处理器执行时实现第一方面提供的消息处理方法。
[0019] 第五方面,本申请实施例提供了一种物联网系统,包括:数据采集设备,被配置成采集数据;用户端设备,与数据采集设备连接,被配置成以待处理消息的主题发布包含数据采集设备所采集的数据的待处理消息;边缘计算设备,包括一个或多个处理器以及存储装置,存储装置上存储有一个或多个程序,当一个或多个程序被一个或多个处理器执行,使得一个或多个处理器实现第一方面提供的消息处理方法。
[0020] 本申请上述实施例的消息处理方法和装置,通过响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据,随后通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果,然后生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息,最后向订阅待发布消息的主题的设备发送待发布消息,实现了靠近数据采集设备端的数据的快速处理,提升了物联网数据处理的安全性。

附图说明

[0021] 通过阅读参照以下附图所作的对非限制性实施例所作的详细描述,本申请的其它特征、目的和优点将会变得更明显:
[0022] 图1是本申请的一个实施例可以应用于其中的示例性系统架构图;
[0023] 图2是根据本申请的消息处理方法的一个实施例的流程图;
[0024] 图3是根据本申请的消息处理方法的另一个实施例的流程图;
[0025] 图4是根据本申请的消息处理方法中函数计算的一种具体实现方式的流程示意图;
[0026] 图5是根据本申请的消息处理装置的一个结构示意图;
[0027] 图6是适于用来实现本申请实施例的电子设备的计算机系统的结构示意图;
[0028] 图7是适于用来实现本申请实施例的物联网系统的一个结构示意图。

具体实施方式

[0029] 下面结合附图和实施例对本申请作进一步的详细说明。可以理解的是,此处所描述的具体实施例仅仅用于解释相关发明,而非对该发明的限定。另外还需要说明的是,为了便于描述,附图中仅示出了与有关发明相关的部分。
[0030] 需要说明的是,在不冲突的情况下,本申请中的实施例及实施例中的特征可以相互组合。下面将参考附图并结合实施例来详细说明本申请。
[0031] 图1示出了可以应用本申请的消息处理方法或消息处理装置的示例性系统架构100。在这里,系统架构100可以是基于物联网的系统架构。
[0032] 如图1所示,系统架构100可以包括设备101、102、103。设备101、102、103可以通过网络通信连接,例如设备101、102、103可以位于同一个局域网内,设备101、102、103之间可以通过有线、无线链路或者光线电缆等方式连接。
[0033] 设备101可以为数据采集设备,例如物联网场景中的温度、湿度、速度、时间等用于表征设备所处环境的各种物理信息的采集设备,数据采集设备可以采集包括图像、文字、语音等形式的数据。
[0034] 设备102可以为用户端设备,用户端设备可以与数据采集设备连接。用户端设备获取数据采集设备采集的数据,并向物联网用户提供数据分析处理的接口。用户端设备还可以与边缘计算设备(例如设备103)和/或云服务器(例如服务器104)通信连接,用户端设备在接收到用户的数据分析处理请求时,可以将数据采集设备采集的数据发送至边缘计算设备或云服务器进行处理。用户端设备还可以接收边缘计算设备或云服务器对数据的处理结果。设备102上可以安装有用于与设备103通信的客户端,用户可以通过该客户端发起数据处理请求。
[0035] 设备103可以为边缘计算设备,边缘计算设备可以为用户端设备提供计算服务。边缘计算设备上可以部署数据分析函数的运行环境。边缘计算设备可以执行数据分析函数的功能,将用户端设备发送的数据作为函数的计算对象,生成数据分析计算的结果,并将结果返回给用户端设备。
[0036] 在本申请的一些场景中,上述系统架构还可以包括与设备101、102、103中的至少一个通过广域网通信连接的服务器104。设备101、102、103中的至少一个可以通过有线、无线链路或者光线电缆等方式与服务器104连接。
[0037] 服务器104可以是物联网中的云服务器。云服务器可以为用户端设备提供云计算服务。云服务器可以部署数据计算功能,在接收到用户端请求处理的数据时对数据进行分析计算,得到计算结果。
[0038] 需要说明的是,本申请实施例所提供的消息处理方法可以由边缘计算设备103执行,相应地,消息处理装置可以设置于边缘计算设备103中。
[0039] 应该理解,图1中的设备、网络、服务器的数目仅仅是示意性的。根据实现需要,可以具有任意数目的设备、网络、服务器。在具体的场景中,可以具有任意数目的数据采集设备、用户端设备、边缘计算设备、网络、云服务器。
[0040] 继续参考图2,其示出了根据本申请的消息处理方法的一个实施例的流程200。该消息处理方法,包括以下步骤:
[0041] 步骤201,响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息。
[0042] 在本实施例中,上述消息处理方法的执行主体(例如图1所示的边缘计算设备103)可以监测其所处的局域网中是否存在该执行主体上部署的目标函数已订阅的主题的消息被发布,若监测到,则上述执行主体可以接收以该目标函数订阅的主题发布的消息作为待处理消息。
[0043] 在这里,待处理消息可以包括由物联网中的数据采集设备采集得到的待分析数据。目标函数可以是用于实现对待分析数据进行特定的分析计算功能的函数,该特定的分析计算功能可以根据用户的需求选择,例如用户希望可以对数据进行统计分析,则该目标函数为统计分析函数。目标函数的执行脚本或用于执行目标函数功能的二进制程序可以由用户预先编写或编译完成,并保存在上述执行主体中,并且目标函数的运行环境也可以预先部署在上述执行主体上。
[0044] 待分析数据可以是由诸如温度采集器、湿度采集器、压力传感器、光线传感器、摄像头、语音信号接收器等传感器采集到的传感器数据,也可以是由包含这些传感器及数据处理装置的硬件设备采集到的数据,例如可以是由具有摄像头和图像处理芯片的电子设备对摄像头采集到的图像进行预处理后得到的图像数据。
[0045] 用户通过用户端设备上提交待分析数据之后,用户端设备可以将待分析数据进行打包,生成待处理消息。之后可以在物联网内发布待处理消息,在发布待处理消息时,可以为待处理消息配置一个预设的主题。上述执行主体可以监测是否有部署于其上的目标函数订阅的主题被发布,若监测到有目标函数订阅的主题被发布,则可以从用户端设备接收该主题的待处理消息。
[0046] 在本实施例的一些可选的实现方式中,上述执行主体上可以存储有其可连接访问的主题权限列表。该主题权限列表包括函数与订阅的主题的对应关系,则在监测到已发布的主题之后,可以在列表中查找出订阅该主题的函数作为目标函数。
[0047] 步骤202,通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果。
[0048] 上述执行主体可以启动本地消息处理引擎目标函数的运行实例,本地消息处理引擎可以将待分析数据传输至目标函数的运行实例,通过目标函数的运行实例对待分析数据进行分析计算。在这里,本地消息处理引擎可以是执行消息处理方法的主程序。具体地,本地消息处理引擎可以在目标函数的运行实例运行时,将待分析数据传递至目标函数的执行脚本或二进制程序,执行目标函数的执行脚本或运行目标函数的二进制程序进行相应的计算和处理,执行脚本或二进制程序计算完毕后向目标函数的运行实例返回一条消息作为计算结果。
[0049] 步骤203,生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息。
[0050] 在函数的执行脚本或二进制程序完成计算之后,可以将计算结果传回至本地消息处理引擎,本地消息处理引擎可以对计算结果进行数据打包,生成包含计算结果的待发布消息。具体可以按照上述执行主体与发布待处理消息的设备之间的数据传输协议,对计算结果进行封装后,生成待发布消息。而后可以为待发布消息配置预先定义的主题,并根据执行主体中预先设置的主题路由规则在物联网中发布生成的消息,以便订阅该主题的设备可以接收到待发布消息。
[0051] 步骤204,向订阅待发布消息的主题的设备发送待发布消息。
[0052] 上述执行主体和订阅待发布消息的主题的设备之间可以通过订阅-发布模式进行数据传输。订阅待发布消息的主题的设备可以是发布待处理消息的用户端设备,也可以是物联网中与上述执行主体在同一局域网内的其他边缘设备,例如其他用户端设备或其他边缘计算设备。这些设备可以监测是否存在其订阅的主题被发布,若监测到订阅的主题,可以接收发布的消息,由此可以接收到包含计算结果的待发布消息,也就是说,在上述执行主体的局域网中,已订阅待发布消息的各设备可以得到对发布待处理消息的设备所发送的待分析数据进行分析计算的结果。
[0053] 在实际场景中,用户可以通过用户端设备将需要分析的数据通过安装在用户端设备的客户端发布,同一局域网中如果部署有边缘计算设备,且边缘计算设备中应用了目标函数的相关配置,并部署了目标函数的运行环境,则可以通过目标函数订阅的主题接收客户端发布的消息。在边缘计算设备完成计算之后,可以发布包含计算结果的消息,用户端设备可以订阅该消息,从而得到提交的待分析数据的分析计算结果。
[0054] 本申请上述实施例的消息处理方法,首先响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;随后通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;之后生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;最后向订阅待发布消息的主题的设备发送待发布消息,实现了在近数据采集设备端的数据分析处理。由于数据处理过程中无需将数据上传云端,避免了数据泄露或丢失等安全问题,同时使得数据处理速度不依赖于网络性能,提升了数据处理效率。
[0055] 继续参考图3,其示出了根据本申请的消息处理方法的另一个实施例的流程图。如图3所示,本实施例的消息处理方法的流程300,包括以下步骤:
[0056] 步骤301,读取目标函数的配置信息。
[0057] 在本实施例中,上述消息处理方法的执行主体可以预先读取存储的目标函数的配置信息。目标函数的配置信息可以是由用户按照需求预先配置并存储于上述执行主体中的。目标函数的配置信息可以是用于描述目标函数的相关配置的信息。在这里,目标函数的配置信息可以包括但不限于:目标函数的主题路由规则信息、函数计算配置信息、日志处理信息。主题路由规则信息可以用于表示目标函数的消息收发规则,其中包含目标函数订阅的主题。函数计算配置信息可以包括但不限于:函数名称、函数的类型、函数执行器。函数类型可以表示实现函数功能的程序的编程语言类型,例如Python、Node.js、Java、C#等,函数执行器可以是执行函数功能的脚本或程序,函数执行器与函数类型对应,例如函数类型是Python,则函数执行器可以是实现函数功能的Python脚本。日志处理信息可以是用于指示函数执行日志的处理方式的信息。
[0058] 上述函数计算配置信息还可以包括与函数的类型对应的运行环境配置信息。上述执行主体可以根据读取到的运行环境配置信息安装、部署目标函数的运行环境。
[0059] 步骤302,响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息。
[0060] 待处理消息包括由物联网中的数据采集设备采集得到的待分析数据。在本实施例中,如果检测到消息处理方法的执行主体所在的局域网内有目标函数订阅的主题被发布,则可以接收该主题中的待处理消息。该待处理消息可以是与上述执行主体在同一局域网内的用户端设备发布的,包含由数据采集设备采集的待分析数据。
[0061] 步骤303,在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的参数,运行目标函数的运行实例,得到计算结果。
[0062] 在读取目标函数的配置信息之后,上述执行主体可以安装部署目标函数的类型对应的运行环境。这样,可以将接收到的待处理消息中的待分析数据作为目标函数的计算对象,将待分析数据输入目标函数进行计算。在计算时具体可以启动目标函数的运行实例,将待分析数据传输至运行实例进行计算,得到计算结果。
[0063] 在本实施例的一些可选的实现方式中,上述函数计算配置信息还可以包括函数的运行实例的资源配置信息。函数的运行实例的资源配置信息用于表征为函数的运行实例配置的可用资源,可以包括但不限于:可支持的运行实例的配置数目、运行实例消息处理限时、运行实例的内存和/或CPU资源配置。
[0064] 在读取目标函数的配置信息中的用于指示运行实例的可用资源的资源配置信息之后,可以按照该资源配置信息为目标函数开启相应数量的运行实例。例如资源配置信息中可支持的运行实例的配置数目为3,表明允许目标函数最多开启3个运行实例,则可以为目标函数开启最多3个运行实例。当需要处理的消息数量增加时,可以判断当前目标函数是否有空闲的运行实例,如果无空闲的运行实例,并且当前开启的运行实例数量已达到上限3,则可以等待已运行的实例处理完毕后立即复用;如果无空闲的运行实例,并且当前开启的运行实例未达到上限3,则可以启动一个新的运行实例来处理增加的消息。
[0065] 进一步可选地,如果资源配置信息中未指示可支持的运行实例的配置数据,则在读取配置信息之后,可以按照默认的运行实例数量配置为目标函数启动运行实例。例如可以默认设置每个函数至少启动一个运行实例,以保证消息传来时及时处理。或者可以默认设置每个函数不创建运行实例,即默认函数可允许运行实例书的最小值为0,则读取配置信息后可以进保证函数启动时所需的最少资源,仅在有需要处理的消息传来时,创建运行实例。
[0066] 在进一步的实现方式中,可以按照如下方式执行步骤303:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,调用运行实例的资源配置信息所指示的可用资源运行目标函数的运行实例,得到计算结果。在这里,资源配置信息还可以包括函数运行实例的内存配置信息、CPU配置信息,即每个运行实例可利用的内存资源配置信息和CPU资源配置信息。
[0067] 具体地,函数运行实例的内存资源配置信息可以包括内存的最大软额度和最大限定额度。当系统内存不足时,可以优先回收超过最大软额度的运行实例占用的内存,使运行实例占用的内存靠近最大软额度。最大限定额度可以是系统可用的最大内存,当系统内存不足时,占用的内存超过最大限定额度的运行实例可以被重置。
[0068] 函数运行实例的CPU资源配置信息可以表示运行实例可使用的CPU资源的上限,包括可使用的CPU时间周期长度和可用的CPU时间周期长度内所能使用的CPU数量。
[0069] 为了实现对上述函数的可支持的运行实例的配置数目、运行实例消息处理限时、运行实例的内存和/或CPU资源的配置,需要上述执行主体的操作系统为Linux系统,支持Cgroup(control groups,控制进程组)。Cgroup是Linux内核提供的一种可以限制、记录、隔离进程组所使用的物理资源(如:CPU,内存,输入输出接口等等)的机制。
[0070] 可以按照上述资源配置信息为目标函数的运行实例配置资源,即控制目标函数的运行实例对待分析数据执行分析计算期间所使用的资源,从而提升上述执行主体的资源的利用效率。
[0071] 可选地,上述资源配置信息还可以包括运行实例的超时时间信息、运行实例可创建的最大进程数等。这样可以在运行实例超出所配置的时间的时候清理该运行实例,以保证资源的有效利用,并且避免运行实例创建过多的进程而造成资源过多的消耗。
[0072] 请参考图4,其示出了根据本申请的消息处理方法中对待处理消息中的待分析数据进行分析计算并生成待发布消息的一种具体实现方式的流程示意图,也即示出了上述消息处理方法的流程300中步骤303和步骤304的一种可选实现方式的流程示意图。其中步骤401至步骤403为步骤303的一种可选实现方式的流程。
[0073] 如图4所示,在本实施例的一些可选的实现方式中,对待处理消息中的待分析数据进行分析计算生成包含计算结果的待发布消息的流程400可以包括:
[0074] 步骤401,在已部署的与目标函数的类型对应的运行环境中,启动本地消息处理引擎以将待分析数据传输至目标函数的运行实例。
[0075] 上述执行主体可以启动本地消息处理引擎,本地消息处理引擎的具体实现可以为目标函数的主程序。本次消息处理引擎可以向目标函数的一个运行实例发送一个消息,这个消息中可以包含从待处理消息中解析出的待分析数据。
[0076] 步骤402,利用目标函数的运行实例对待分析数据进行预处理,将预处理后的消息传递至目标函数的执行脚本进行计算,得到结果数据并传回至目标函数的运行实例。
[0077] 目标函数的运行实例在收到主程序的消息之后,可以对消息进行预处理,具体可以将消息中的待分析数据转换为Json类或字典类,之后运行实例可以将消息传入函数执行器。函数执行器可以为目标函数的执行脚本或目标函数的二进制程序,在接收到运行实例预处理后的消息之后,可以利用执行脚本或二进制程序,以待分析数据为待执行参数来执行函数的功能,执行完毕后可以向运行实例返回一条包括函数执行的结果数据的消息。
[0078] 具体地,在目标函数的执行脚本或二进制程序计算时,可以从字典类或Json类的待分析数据中获取具体字段的值,执行分析计算,计算完毕后,可以向目标函数的运行实例返回一个Json类或字典类的结果数据。
[0079] 步骤403,通过目标函数的运行实例将结果数据转换为消息处理引擎支持传输的数据格式,得到计算结果。
[0080] 目标函数的运行实例收到指数执行器返回的新消息之后,可以将其转换为本地消息处理引擎所支持传输的消息格式,将转换格式后的消息作为计算结果。
[0081] 在这里,本地消息处理引擎与函数运行实例之间的通信可以采用进程通信方式。举例来说,若上述执行主体的操作系统为Linux系统或MacOS系统,则进程间的通信方式可以包括但不限于socketpair(套接字对)方式,若上述执行主体的操作系统为Windows系统,则进程间通信方式可以包括但不限于Namedpipe(有名管道)方式。
[0082] 返回图3,上述消息处理方法的流程300还包括:
[0083] 步骤304,生成包含计算结果的待发布消息,并为待发布消息配置主题并发布待发布消息。
[0084] 在得到计算结果之后,上述执行主体可以按照预设的主题在物联网内发布包含计算结果的待发布消息。
[0085] 继续参考图4,步骤404至步骤406为步骤304的一种可选实现方式的实现流程。在步骤403之后,上述流程400还可以包括:,
[0086] 步骤404,通过目标函数的运行实例将计算结果作为消息的有效载荷传输至本地消息处理引擎。
[0087] 目标函数的运行实例可以将计算结果进行数据打包,将计算结果按照与本地消息处理引擎之间约定的数据格式封装为消息,该消息的有效载荷为上述目标函数的运行实例得到的计算结果。之后目标函数的运行实例可以将该消息传输至本地消息处理引擎。
[0088] 步骤405,通过本地消息处理引擎检测目标函数的运行实例返回的消息的有效载荷是否为空。
[0089] 本地消息处理引擎可以接收运行实例返回的消息,并判断运行实例返回的消息的有效载荷是否为空。
[0090] 步骤406,若本地消息处理引擎检测到目标函数的运行实例返回的消息的有效载荷不为空,将目标函数的运行实例返回的消息作为待发布消息。
[0091] 如果本地消息处理引擎检测到运行实例返回的消息的有效载荷不为空,可以将收到的包含计算结果的消息作为待发布消息,进而可以为待发布消息配置主题并发布。
[0092] 可选地,当本地消息处理引擎检测到运行实例返回的消息的有效载荷为空时,可以不发布运行实例返回的消息。这样可以避免函数计算失败或发生其他错误时发布不包含用户期望的计算结果的消息。
[0093] 从图4可以看出,对待处理消息中的待分析数据进行分析计算生成包含计算结果的待发布消息的流程400,通过本地消息处理引擎、目标函数的运行实例、以及目标函数的执行脚本或二进制程序之间的消息传输,实现了在边缘计算设备上的函数计算。
[0094] 返回图3,在步骤304之后,消息处理方法的流程300还包括:
[0095] 步骤305,向订阅待发布消息的主题的设备发送待发布消息。
[0096] 上述执行主体和订阅待发布消息的主题的设备之间可以通过订阅-发布模式进行数据传输。订阅待发布消息的主题的设备可以是物联网中与上述执行主体在同一局域网内的其他设备,例如用户端设备或其他边缘计算设备。这些设备在监测到订阅的主题时,可以接收发布主题内的消息,由此可以获取计算结果。
[0097] 上述实施例中的步骤302和步骤305分别与前述实施例的步骤201和步骤204一致,此处不再赘述。
[0098] 从图3可以看出,本实施例的消息处理方法,通过读取目标函数的配置信息,为目标函数的功能的实现提供可用的运行环境。使得用户可以根据需求对目标函数进行配置,从而拓展了本地函数计算的可应用平台(包括Linux、Windows、Darwin等系统,arm、x86、mips、powerpc等CPU)的范围。此外,启动运行函数功能所需要的最小内存空间较小,降低了对本地边缘计算设备的性能要求。
[0099] 进一步参考图5,作为对上述各图所示方法的实现,本申请提供了一种消息处理装置的一个实施例,该装置实施例与图2和图3所示的方法实施例相对应,该装置具体可以应用于各种电子设备中。
[0100] 如图5所示,本实施例的消息处理装置500包括:接收单元501、计算单元502、发布单元503以及发送单元504。其中,接收单元501可以被配置成响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;计算单元502可以被配置成通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;发布单元503可以被配置成生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;发送单元504可以被配置成向订阅待发布消息的主题的设备发送待发布消息。
[0101] 在一些实例中,上述装置500还可以包括:读取单元,被配置成读取目标函数的配置信息;其中,配置信息包括:主题路由规则信息、执行函数功能的脚本或程序、函数的类型。
[0102] 在上述实例的一些可选的实现方式中,上述计算单元502可以进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,运行目标函数的运行实例,得到计算结果。
[0103] 在上述实例的一些可选的实现方式中,上述目标函数的配置信息还可以包括函数的运行实例的资源配置信息;上述计算单元502可以进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,以待分析数据为目标函数的待执行参数,调用运行实例的资源配置信息所指示的可用资源运行目标函数的运行实例,得到计算结果。
[0104] 在上述实例的一些可选的实现方式中,上述计算单元502可以进一步被配置成:在已部署的与目标函数的类型对应的运行环境中,启动本地消息处理引擎以将待分析数据传输至目标函数的运行实例;利用目标函数的运行实例对待分析数据进行预处理,将预处理后的消息传递至目标函数的执行脚本或目标函数的二进制程序进行计算,得到结果数据并传回至目标函数的运行实例;通过目标函数的运行实例将结果数据转换为消息处理引擎支持传输的数据格式,得到计算结果。
[0105] 在上述实例的一些可选的实现方式中,上述发布单元503可以进一步被配置成按照如下方式生成包含计算结果的待发布消息:通过目标函数的运行实例将计算结果作为消息的有效载荷传输至本地消息处理引擎;通过本地消息处理引擎检测目标函数的运行实例返回的消息的有效载荷是否为空;若本地消息处理引擎检测到目标函数的运行实例返回的消息的有效载荷不为空,将目标函数的运行实例返回的消息作为待发布消息。
[0106] 装置500中记载的诸单元与参考图2和图3描述的方法中的各个步骤相对应。由此,上文针对方法描述的操作和特征同样适用于装置500及其中包含的单元,在此不再赘述。
[0107] 本申请上述实施例的消息处理装置,通过接收单元响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据,随后计算单元通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果,然后发布单元生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息,最后发送单元向订阅待发布消息的主题的设备发送待发布消息,实现了靠近数据采集设备端的数据的快速、处理,提升了数据处理的安全性。
[0108] 下面参考图6,其示出了适于用来实现本申请实施例的电子设备的计算机系统600的结构示意图。图6示出的电子设备仅仅是一个示例,不应对本申请实施例的功能和使用范围带来任何限制。
[0109] 如图6所示,计算机系统600包括中央生成单元(CPU)601,其可以根据存储在只读存储器(ROM)602中的程序或者从存储部分608加载到随机访问存储器(RAM)603中的程序而执行各种适当的动作和处理。在RAM 603中,还存储有系统600操作所需的各种程序和数据。CPU 601、ROM 602以及RAM 603通过总线604彼此相连。输入/输出(I/O)接口605也连接至总线604。
[0110] 以下部件连接至I/O接口605:包括键盘、鼠标等的输入部分606;包括诸如阴极射线管(CRT)、液晶显示器(LCD)等以及扬声器等的输出部分607;包括硬盘等的存储部分608;以及包括诸如LAN卡、调制解调器等的网络接口卡的通信部分609。通信部分609经由诸如因特网的网络执行通信处理。驱动器610也根据需要连接至I/O接口605。可拆卸介质611,诸如磁盘、光盘、磁光盘、半导体存储器等等,根据需要安装在驱动器610上,以便于从其上读出的计算机程序根据需要被安装入存储部分608。
[0111] 特别地,根据本公开的实施例,上文参考流程图描述的过程可以被实现为计算机软件程序。例如,本公开的实施例包括一种计算机程序产品,其包括承载在计算机可读介质上的计算机程序,该计算机程序包含用于执行流程图所示的方法的程序代码。在这样的实施例中,该计算机程序可以通过通信部分609从网络上被下载和安装,和/或从可拆卸介质611被安装。在该计算机程序被中央生成单元(CPU)601执行时,执行本申请的方法中限定的上述功能。需要说明的是,本申请的计算机可读介质可以是计算机可读信号介质或者计算机可读存储介质或者是上述两者的任意组合。计算机可读存储介质例如可以是——但不限于——电、磁、光、电磁、红外线、或半导体的系统、装置或器件,或者任意以上的组合。计算机可读存储介质的更具体的例子可以包括但不限于:具有一个或多个导线的电连接、便携式计算机磁盘、硬盘、随机访问存储器(RAM)、只读存储器(ROM)、可擦式可编程只读存储器(EPROM或闪存)、光纤、便携式紧凑磁盘只读存储器(CD-ROM)、光存储器件、磁存储器件、或者上述的任意合适的组合。在本申请中,计算机可读存储介质可以是任何包含或存储程序的有形介质,该程序可以被指令执行系统、装置或者器件使用或者与其结合使用。而在本申请中,计算机可读的信号介质可以包括在基带中或者作为载波一部分传播的数据信号,其中承载了计算机可读的程序代码。这种传播的数据信号可以采用多种形式,包括但不限于电磁信号、光信号或上述的任意合适的组合。计算机可读的信号介质还可以是计算机可读存储介质以外的任何计算机可读介质,该计算机可读介质可以发送、传播或者传输用于由指令执行系统、装置或者器件使用或者与其结合使用的程序。计算机可读介质上包含的程序代码可以用任何适当的介质传输,包括但不限于:无线、电线、光缆、RF等等,或者上述的任意合适的组合。
[0112] 可以以一种或多种程序设计语言或其组合来编写用于执行本申请的操作的计算机程序代码,程序设计语言包括面向对象的程序设计语言—诸如Java、Smalltalk、C++,还包括常规的过程式程序设计语言—诸如“C”语言或类似的程序设计语言。程序代码可以完全地在用户计算机上执行、部分地在用户计算机上执行、作为一个独立的软件包执行、部分在用户计算机上部分在远程计算机上执行、或者完全在远程计算机或服务器上执行。在涉及远程计算机的情形中,远程计算机可以通过任意种类的网络——包括局域网(LAN)或广域网(WAN)—连接到用户计算机,或者,可以连接到外部计算机(例如利用因特网服务提供商来通过因特网连接)。
[0113] 本申请实施例还提供了一种物联网系统。如图7所示,本申请实施例提供的物联网系统可以包括数据采集设备701、用户端设备702以及边缘计算设备703。数据采集设备701、用户端设备702以及边缘计算设备可以分别与图1所示系统架构中的设备101、102、103相对应。数据采集设备701可以被配置成采集数据,可以包括各种传感器设备;用户端设备702可以与数据采集设备连接,并被配置成以待处理消息的主题发布包含数据采集设备所采集的数据的待处理消息.用户端设备702上可以安装有控制函数计算的客户端,通过该客户端用户端设备702可以与边缘计算设备703进行交互。边缘计算设备703可以为图6所示的设备,包括一个或多个处理器以及存储装置,存储装置上存储有一个或多个程序,当一个或多个程序被一个或多个处理器执行,使得一个或多个处理器实现前述实施例描述的消息处理方法。
[0114] 附图中的流程图和框图,图示了按照本申请各种实施例的系统、方法和计算机程序产品的可能实现的体系架构、功能和操作。在这点上,流程图或框图中的每个方框可以代表一个模块、程序段、或代码的一部分,该模块、程序段、或代码的一部分包含一个或多个用于实现规定的逻辑功能的可执行指令。也应当注意,在有些作为替换的实现中,方框中所标注的功能也可以以不同于附图中所标注的顺序发生。例如,两个接连地表示的方框实际上可以基本并行地执行,它们有时也可以按相反的顺序执行,这依所涉及的功能而定。也要注意的是,框图和/或流程图中的每个方框、以及框图和/或流程图中的方框的组合,可以用执行规定的功能或操作的专用的基于硬件的系统来实现,或者可以用专用硬件与计算机指令的组合来实现。
[0115] 描述于本申请实施例中所涉及到的单元可以通过软件的方式实现,也可以通过硬件的方式来实现。所描述的单元也可以设置在处理器中,例如,可以描述为:一种处理器包括接收单元、计算单元、发布单元和发送单元。其中,这些单元的名称在某种情况下并不构成对该单元本身的限定,例如,接收单元还可以被描述为“响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息”。
[0116] 作为另一方面,本申请还提供了一种计算机可读介质,该计算机可读介质可以是上述实施例中描述的装置中所包含的;也可以是单独存在,而未装配入该装置中。上述计算机可读介质承载有一个或者多个程序,当上述一个或者多个程序被该装置执行时,使得该装置:响应于在局域网内监测到以目标函数订阅的主题发布的待处理消息,接收待处理消息,待处理消息包括由物联网中的数据采集设备采集得到的待分析数据;通过目标函数的运行实例对待处理消息中的待分析数据进行分析计算,得到计算结果;生成包含计算结果的待发布消息,为待发布消息配置主题并发布待发布消息;向订阅待发布消息的主题的设备发送待发布消息。
[0117] 以上描述仅为本申请的较佳实施例以及对所运用技术原理的说明。本领域技术人员应当理解,本申请中所涉及的发明范围,并不限于上述技术特征的特定组合而成的技术方案,同时也应涵盖在不脱离上述发明构思的情况下,由上述技术特征或其等同特征进行任意组合而形成的其它技术方案。例如上述特征与本申请中公开的(但不限于)具有类似功能的技术特征进行互相替换而形成的技术方案。