智慧社区微服务架构MQTT异步和同步通信方法和系统转让专利

申请号 : CN202410038662.1

文献号 : CN117560415B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 李勇温平陈科顺

申请人 : 德阳城市智慧之心信息技术有限公司

摘要 :

本发明提供一种智慧社区微服务架构MQTT异步和同步通信方法和系统,涉及物联网技术领域,系统包括:多个微服务节点、MQTT Broker、设备端;多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间基于MQTT协议建立双向通讯连接;多个微服务节点被配置为发布异步请求消息或同步请求消息并处理返回的响应消息;MQTT Broker被配置为对异步请求消息、同步请求消息,或响应消息进行转发,设备端被配置为对异步请求消息或同步请求消息进行处理并返回响应消息。本发明实现了微服务多节点场景下的MQTT异步和同步通信功能,解决了现有方案中异步消息重复消费和同步消息丢失的问题。

权利要求 :

1.一种智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,包括:

基于MQTT协议建立多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间的双向通讯连接;其中,多个微服务节点采用微服务场景下的分布式集群部署;

各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题;

响应于一个微服务节点发布的异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;

响应于一个微服务节点发布的同步请求消息,对该同步请求消息生成全局唯一MsgId,通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。

2.根据权利要求1所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,还包括:

各个微服务节点创建MsgId存储库后,启动MsgId超时检查线程,定时检测MsgId存储库中各MsgId是否超时,若任一MsgId超时,则唤醒等待该MsgId响应的线程进行请求超时处理,并从MsgId存储库中移除该MsgId;其中,所述MsgId存储库中包括MsgId、MsgId的生成时间戳、超时阈值。

3.根据权利要求2所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,定时检测MsgId存储库中各MsgId是否超时,若任一MsgId超时,则唤醒等待该MsgId响应的线程进行超时处理,包括:MsgId超时检查线程定时遍历MsgId存储库,根据各MsgId的生成时间戳和超时阈值判断当前MsgId的请求是否超时;其中,在超时之前,所述超时阈值可根据当前业务系统或MQTT Broker负载进行动态调整。

4.根据权利要求1所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理,包括:该微服务节点通过判断所述异步响应消息是否有MsgId,或是否有设定的特殊格式的MsgId,确定该响应消息是否为异步响应消息;其中,若所述异步响应消息无MsgId,或无设定的特殊格式的MsgId,则该响应消息为异步响应消息。

5.根据权利要求2所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,响应于一个微服务节点发布的同步请求消息,对该同步请求消息生成全局唯一MsgId,还包括:将该全局唯一MsgId与该MsgId的生成时间戳,以及超时阈值一起存入本地的MsgId存储库,同时将该同步请求消息的请求线程阻塞,等待响应消息返回;

若返回的响应消息属于当前节点的实例域,则唤醒等待该MsgId响应消息的请求线程进行消息处理。

6.根据权利要求5所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,还包括:若该响应消息不属于本节点的实例域,则通过同步响应转发话题将该响应消息发布至MQTT Broker,通过所述MQTT Broker将该响应消息广播至所有普通订阅了同步响应转发话题的微服务节点;

各微服务节点通过该响应消息对应的MsgId判断该响应消息是否属于当前节点的实例域,若是,则唤醒等待该MsgId响应消息的请求线程进行消息处理,若否,则丢弃该响应消息。

7.根据权利要求6所述的智慧社区微服务架构MQTT异步和同步通信方法,其特征在于,各微服务节点通过该响应消息对应的MsgId判断该响应消息是否属于当前节点的实例域,包括:各微服务节点在本节点的MsgId存储库中进行查找,若本节点的MsgId存储库中有该响应消息对应的MsgId,则判断该响应消息属于当前节点的实例域。

8.一种智慧社区微服务架构MQTT异步和同步通信系统,其特征在于,包括:业务系统服务节点、MQTT Broker、设备端;所述业务系统服务节点包括采用微服务场景下分布式集群部署的多个微服务节点,所述多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间基于MQTT协议建立双向通讯连接;

各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题;

所述多个微服务节点被配置为发布异步请求消息或同步请求消息并对返回的响应消息进行处理;所述MQTT Broker被配置为对所述异步请求消息、同步请求消息,或响应消息进行转发,所述设备端被配置为对所述异步请求消息或同步请求消息进行处理并返回响应消息;其中,当一个微服务节点发布一个异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;

当一个微服务节点发布一个同步请求消息,对该同步请求消息生成全局唯一MsgId,通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。

9.根据权利要求8所述的智慧社区微服务架构MQTT异步和同步通信系统,其特征在于,各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题。

10.根据权利要求9所述的智慧社区微服务架构MQTT异步和同步通信系统,其特征在于,所述MsgId存储库中包括MsgId、MsgId的生成时间戳、超时阈值,用于启动MsgId超时检查线程,根据各MsgId的生成时间戳和超时阈值定时检测所述MsgId存储库中各MsgId是否超时。

说明书 :

智慧社区微服务架构MQTT异步和同步通信方法和系统

技术领域

[0001] 本发明涉及物联网技术领域,具体涉及一种智慧社区微服务架构MQTT异步和同步通信方法和系统。

背景技术

[0002] 当下用于智慧社区的物联网设备多通过MQTT(消息队列遥测传输)协议接入业务系统。传统的MQTT通信是基于订阅/发布模式的异步消息模型,但某些场景下也需要与设备进行基于请求/响应模式的同步通信,此时就需要在发起请求的节点实例保存请求的状态,然后将收到的响应消息与请求进行正确匹配后再进行后续消息处理。
[0003] 针对业务系统为微服务架构的情况,后端服务可能存在多个节点实例且会进行动态增减。各后端服务节点若采用普通订阅的方式来订阅响应消息则存在消息重复消费的问题,若采用共享订阅则有可能某个同步请求的响应消息被MQTT Broker(消息代理服务器)的负载均衡规则分发至其他节点而导致消息丢失。
[0004] 目前现有的MQTT同步转异步方案,有的未保留异步模式,有的需要特殊定制的IoT中间件(IoT‑Middleware物联网中间件),有的需要更改MQTT标准协议,有的需要借助一些第三方组件实现,有的不适用于微服务场景,无法满足使用需求。

发明内容

[0005] 有鉴于此,本申请实施例提供一种智慧社区微服务架构MQTT异步和同步通信方法和系统,以实现微服务多节点场景下的MQTT异步和同步通信的功能,解决现有方案中异步消息重复消费和同步消息丢失的问题。
[0006] 本申请实施例提供以下技术方案:一种智慧社区微服务架构MQTT异步和同步通信方法,包括:
[0007] 基于MQTT协议建立多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间的双向通讯连接;其中,多个微服务节点采用微服务场景下的分布式集群部署;
[0008] 各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题;
[0009] 响应于一个微服务节点发布的异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;
[0010] 响应于一个微服务节点发布的同步请求消息,对该同步请求消息生成全局唯一MsgId,通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。
[0011] 根据本申请一种实施例,还包括:各个微服务节点创建MsgId存储库后,启动MsgId超时检查线程,定时检测MsgId存储库中各MsgId是否超时,若任一MsgId超时,则唤醒等待该MsgId响应的线程进行请求超时处理,并从MsgId存储库中移除该MsgId;其中,所述MsgId存储库中包括MsgId、MsgId的生成时间戳、超时阈值。
[0012] 根据本申请一种实施例,定时检测MsgId存储库中各MsgId是否超时,若任一MsgId超时,则唤醒等待该MsgId响应的线程进行超时处理,包括:
[0013] MsgId超时检查线程定时遍历MsgId存储库,根据各MsgId的生成时间戳和超时阈值判断当前MsgId的请求是否超时;其中,在超时之前,所述超时阈值可根据当前业务系统或MQTT Broker负载进行动态调整。
[0014] 根据本申请一种实施例,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理,包括:
[0015] 该微服务节点通过判断所述异步响应消息是否有MsgId,或是否有设定的特殊格式的MsgId,确定该响应消息是否为异步响应消息;其中,若所述异步响应消息无MsgId,或无设定的特殊格式的MsgId,则该响应消息为异步响应消息。
[0016] 根据本申请一种实施例,响应于一个微服务节点发布的同步请求消息,对该同步请求消息生成全局唯一MsgId,还包括:
[0017] 将该全局唯一MsgId与该MsgId的生成时间戳,以及超时阈值一起存入本地的MsgId存储库,同时将该同步请求消息的请求线程阻塞,等待响应消息返回;
[0018] 若返回的响应消息属于当前节点的实例域,则唤醒等待该MsgId响应消息的请求线程进行消息处理。
[0019] 根据本申请一种实施例,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,还包括:
[0020] 若该响应消息不属于本节点的实例域,则通过同步响应转发话题将该响应消息发布至MQTT Broker,通过所述MQTT Broker将该响应消息广播至所有普通订阅了同步响应转发话题的微服务节点;
[0021] 各微服务节点通过该响应消息对应的MsgId判断该响应消息是否属于当前节点的实例域,若是,则唤醒等待该MsgId响应消息的请求线程进行消息处理,若否,则丢弃该响应消息。
[0022] 根据本申请一种实施例,各微服务节点通过该响应消息对应的MsgId判断该响应消息是否属于当前节点的实例域,包括:
[0023] 各微服务节点在本节点的MsgId存储库中进行查找,若本节点的MsgId存储库中有该响应消息对应的MsgId,则判断该响应消息属于当前节点的实例域。
[0024] 本申请还提供一种智慧社区微服务架构MQTT异步和同步通信系统,包括:业务系统服务节点、MQTT Broker、设备端;所述业务系统服务节点包括采用微服务场景下分布式集群部署的多个微服务节点,所述多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间基于MQTT协议建立双向通讯连接;
[0025] 所述多个微服务节点被配置为发布异步请求消息或同步请求消息并对返回的响应消息进行处理;所述MQTT Broker被配置为对所述异步请求消息、同步请求消息,或响应消息进行转发,所述设备端被配置为对所述异步请求消息或同步请求消息进行处理并返回响应消息;其中,
[0026] 当一个微服务节点发布一个异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;
[0027] 当一个微服务节点发布一个同步请求消息,对该同步请求消息生成全局唯一MsgId,通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。
[0028] 根据本申请一种实施例,各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题。
[0029] 根据本申请一种实施例,所述MsgId存储库中包括MsgId、MsgId的生成时间戳、超时阈值,用于启动MsgId超时检查线程,根据各MsgId的生成时间戳和超时阈值定时检测所述MsgId存储库中各MsgId是否超时。
[0030] 与现有技术相比,本说明书实施例采用的上述至少一个技术方案能够达到的有益效果至少包括:
[0031] 1.本申请实施例仅使用标准的MQTT协议实现了微服务场景下的MQTT异步和同步通信,未引入HTTP等其他协议,实现轻量简洁;
[0032] 2.本申请实施例仅需MQTT Broker这一MQTT通信必备的基本组件,不依赖于任何其他第三方服务或组件,如Redis/Kafka/RabbitMQ/ZooKeeper等,系统复杂度低,开发难度、运维成本低;
[0033] 3.本申请实施例通用性强,所使用的MQTT Broker可以是任何支持MQTT v5.0协议的程序软件或IoT云平台,无需特殊定制,可随意更换;
[0034] 4.本申请实施例使用独立的超时检查线程来处理同步请求超时,可对同步请求的超时时间进行动态更改,可以对单个同步请求进行续期或立即超时等操作,从而更方便地进行系统资源调度;
[0035] 5.本申请实施例中各微服务节点均为无状态节点,各服务节点无需知晓其他节点的任何信息,无需额外同步其他节点的状态和数据,利于微服务场景下节点的动态扩容或缩减。
[0036] 综上,本申请实施例具有轻量、低成本、通用性强、灵活性高、支持多线程并发、适合微服务场景等特点,实现了微服务多节点场景下的MQTT异步和同步通信的功能,解决了异步消息重复消费和同步消息丢失的问题。

附图说明

[0037] 为了更清楚地说明本申请实施例的技术方案,下面将对实施例中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本申请的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其它的附图。
[0038] 图1是本发明实施例的MQTT异步和同步通信系统架构示意图;
[0039] 图2是本发明实施例的MQTT异步和同步通信方法的初始阶段流程示意图;
[0040] 图3是本发明实施例的MQTT异步和同步通信方法的异步请求流程示意图;
[0041] 图4是本发明实施例的MQTT异步和同步通信方法的同步请求流程示意图。

具体实施方式

[0042] 下面结合附图对本申请实施例进行详细描述。
[0043] 以下通过特定的具体实例说明本申请的实施方式,本领域技术人员可由本说明书所揭露的内容轻易地了解本申请的其他优点与功效。显然,所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例。本申请还可以通过另外不同的具体实施方式加以实施或应用,本说明书中的各项细节也可以基于不同观点与应用,在没有背离本申请的精神下进行各种修饰或改变。需说明的是,在不冲突的情况下,以下实施例及实施例中的特征可以相互组合。基于本申请中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。
[0044] 本发明实施例提供了一种智慧社区微服务架构MQTT异步和同步通信方法,包括:
[0045] 基于MQTT协议建立多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间的双向通讯连接;其中,多个微服务节点采用微服务场景下的分布式集群部署;
[0046] 各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题;
[0047] 响应于一个微服务节点发布的异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;
[0048] 响应于一个微服务节点发布的同步请求消息,对该同步请求消息生成全局唯一MsgId(Message Identification 消息标识),通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。
[0049] 针对现有方案中存在异步消息重复消费和同步消息丢失的问题,本发明实施例利用MQTT v5.0提供的共享订阅方式来保证原生的异步通信模式,由MQTT Broker集群基于预设的负载均衡规则来进行响应消息的路由分发,避免消息重复消费的问题;利用带特殊前缀的唯一ID进行同步请求和响应消息的一一匹配,并对分发到本节点的但不属于本实例域的同步响应消息进行二次广播转发,保证该同步响应消息到达其所属的节点实例,避免同步响应丢失的问题;并且,使用独立的超时检查线程来处理同步请求超时,可提供超时时间动态更新功能。
[0050] 本实施例仅使用标准的MQTT协议实现了微服务场景下的MQTT异步和同步通信,未引入HTTP等其他协议,开发、使用、维护成本都很低,实现轻量简洁。本实施例仅需MQTT Broker这一MQTT通信必备的基本组件,不依赖于任何其他第三方服务或组件,如Redis/Kafka/RabbitMQ/ZooKeeper等,系统复杂度低,开发难度、运维成本低;另外,本实施例通用性强,所使用的MQTT Broker可以是任何支持MQTT v5.0协议的程序软件或IoT云平台等第三方平台,无需特殊定制,可随意更换;且本实施例中各微服务节点均为无状态节点,无需额外维护节点的状态和数据以及进行缓存生命周期同步等操作,利于微服务场景下节点的动态扩容或缩减。在此基础上,本实施例仅将各节点生成的全局唯一的msgId存储在所属节点实例,无需同步至其他节点或第三方缓存,无空间浪费,无带宽浪费,无同步问题。
[0051] 因此,本发明实施例具有轻量、低成本、通用性强、灵活性高、支持多线程并发、适合微服务场景等特点,同时支持微服务多节点场景下的负载均衡的MQTT异步和同步通信的功能,解决了异步消息重复消费和同步消息丢失的问题。
[0052] 具体实施时,如图2所示,本实施例的MQTT异步和同步通信方法,主要包括以下几个阶段:
[0053] 1.初始化阶段:
[0054] (1)各Service实例节点(上述的微服务节点)共享订阅设备响应话题(Topic)“$share/g/resp/#”,其中“$share/g/”为共享话题分组前缀,“resp/#”表示订阅所有以“resp/”开头的话题,即订阅所有设备的响应消息。
[0055] MQTT Broker会将设备响应消息按负载均衡规则分配到某一个共享订阅该话题的Service实例,避免了异步响应被广播至所有Service实例导致重复消费的问题。
[0056] 上述的MQTT Broker的负载均衡规则,常见的规则有:轮询、加权轮询、随机、加权随机、最快响应、Hash算法等。
[0057] (2)各Service实例普通订阅同步响应转发话题“sync/resp”;
[0058] MQTT Broker会将发布到该话题的消息广播至每一个订阅了该话题的Service实例;
[0059] (3)各Service实例创建MsgId存储库,用于存储MsgId、MsgId的生成时间戳及超时阈值等相关信息;
[0060] (4)各Service实例启动MsgId超时检查线程,定时检测存储库中各MsgId是否超时,若某一MsgId超时,则唤醒等待该Id响应的线程进行请求超时处理,并从MsgId存储库中移除该MsgId。
[0061] 具体地,MsgId超时检查线程定时遍历MsgId存储库,根据各MsgId的生成时间戳和超时阈值来判断当前MsgId的请求是否超时,各MsgId的超时阈值可以在超时之前根据当前业务系统或MQTT Broker负载进行动态调整,避免大量请求长时间阻塞或过快超时导致响应消息无效,从而造成系统资源浪费或请求成功率低等问题,也可以对某个请求进行等待续期或立即超时等操作。
[0062] (5)各设备订阅含有设备唯一ID的请求话题(如Device 1订阅话题“req/dev1”),当某设备如Device1接收到请求消息时,经过处理将响应消息通过话题“resp/dev1”发布到Broker。
[0063] 2.异步请求流程,如图3所示。
[0064] 本方案保留了MQTT原生的异步通信模式。
[0065] 当某个Service实例节点向MQTT Broker发布一个异步请求消息后,MQTT Broker将该请求转发至对应的设备,设备处理请求并发布相应的响应消息,MQTT Broker将响应消息根据负载均衡规则分配至任意一个Service实例节点,Service实例节点判断该响应为异步响应消息,则直接对消息进行后续处理。
[0066] 在一种实施例中,该微服务节点通过判断所述异步响应消息是否有MsgId,或是否有设定的特殊格式的MsgId,确定该响应消息是否为异步响应消息;其中,若所述异步响应消息无MsgId,或无设定的特殊格式的MsgId,则该响应消息为异步响应消息。
[0067] 本实施例利用MQTT v5.0提供的共享订阅方式来保证原生的异步通信模式,由MQTT Broker集群基于预设的负载均衡规则来进行响应消息的路由分发,避免消息重复消费的问题。
[0068] 同步请求流程,如图4所示。(1)当某个Service实例发起一个同步请求时,通过分布式ID生成算法生成一个全局唯一的MsgId放入请求消息负载中然后发布该请求,同时将该MsgId与MsgId的发送时间戳、超时阈值一起存入本地的MsgId存储库,然后该请求线程阻塞,等待响应消息返回。其中,MsgId存储库采用线程安全的设计和编码,使其支持多线程并发,单个Service实例节点可同时发起多个同步请求并等待返回。
[0069] 上述的分布式ID生成算法中,常见的算法包括:数据库自增、redis递增、号段模式、UUID算法、snowflake算法、uid‑generator算法等。
[0070] 本实施例使用独立的超时检查线程来处理同步请求超时,可对同步请求的超时时间进行动态更改,可以对单个同步请求进行续期或立即超时等操作,从而更方便地进行系统资源调度。
[0071] (2)设备端收到MQTT Broker转发的请求消息后进行处理,并将请求携带的MsgId放入响应消息负载中进行返回。
[0072] (3)该响应消息经过MQTT Broker的负载均衡算法,可能被转发至任意一个Service实例节点;收到此消息的Service实例节点判断该响应为同步响应,然后判断该响应中的MsgId是否属于本节点的实例域:若是,则唤醒等待该MsgId响应消息的请求线程进行消息处理,请求结束;若不是,则通过同步响应转发Topic(sync/resp)将该消息发布至MQTT Broker。
[0073] (4)若该响应消息不属于本节点的实例域,MQTT Broker会将转发的响应消息广播至所有普通订阅了同步响应转发Topic的Service实例节点;各实例收到消息后判断:通过该响应消息对应的MsgId判断该响应消息是否属于当前节点的实例域,若是,则唤醒等待该MsgId响应消息的请求线程进行消息处理,请求结束;若不是,则丢弃消息,请求结束。
[0074] 具体地,各微服务节点在本节点的MsgId存储库中进行查找,若本节点的MsgId存储库中有该响应消息对应的MsgId,则判断该响应消息属于当前节点的实例域。
[0075] 本实施例利用带特殊前缀的唯一ID进行同步请求和响应消息的一一匹配,并对分发到本节点的但不属于本实例域的同步响应消息进行二次广播转发,保证该同步响应消息到达其所属的节点实例,避免同步响应丢失的问题。并且,本实施例采用MQTT Broker重新广播发布的方式,各节点无需知道其他节点的任何信息,无数据同步问题,实现更为简洁。
[0076] (5)若因为设备不在线、网络故障、设备故障等各种原因导致某Service实例一直未收到某个MsgId的响应消息,该实例的MsgId超时检查线程会根据MsgId存储库中该MsgId的生成时间,在预设或指定的时间后,唤醒等待该Id响应的线程进行请求超时处理,并从MsgId存储库中移除该MsgId。
[0077] 如图1所示,本申请还提供一种智慧社区微服务架构MQTT异步和同步通信系统,包括:业务系统服务节点、MQTT Broker、设备端;所述业务系统服务节点包括采用微服务场景下分布式集群部署的多个微服务节点,所述多个微服务节点与MQTT Broker,以及MQTT Broker与多个设备端之间基于MQTT协议建立双向通讯连接;
[0078] 所述多个微服务节点被配置为发布异步请求消息或同步请求消息并对返回的响应消息进行处理;所述MQTT Broker被配置为对所述异步请求消息、同步请求消息,或响应消息进行转发,所述设备端被配置为对所述异步请求消息或同步请求消息进行处理并返回响应消息;其中,
[0079] 当一个微服务节点发布一个异步请求消息,通过MQTT Broker将该异步请求消息转发至对应的设备端,并收到设备端处理该异步请求消息后返回的响应消息,通过MQTT Broker将该响应消息根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为异步响应消息后对所述异步响应消息进行处理;
[0080] 当一个微服务节点发布一个同步请求消息,对该同步请求消息生成全局唯一MsgId,通过MQTT Broker将该同步请求消息及生成的全局唯一MsgId共同转发至对应的设备端,并收到设备端处理该同步请求消息后返回的响应消息及对应的MsgId,通过MQTT Broker将该响应消息及对应的MsgId根据负载均衡规则分配至任意一个微服务节点,通过该微服务节点判断该响应消息为同步响应消息后,再通过该响应消息对应的MsgId判断该响应消息是否属于本节点的实例域,若是,则对所述同步响应消息进行处理。
[0081] 本实施例的系统架构中,Service服务为具体的后端业务服务节点,采用微服务场景下的分布式集群部署,可根据实时负载动态增减运行实例节点的数量,各实例节点通过MQTT协议与MQTT Broker集群或单个MQTT Broker程序连接;各设备通过MQTT协议连接到MQTT Broker集群或单个MQTT Broker程序。
[0082] 根据本申请一种实施例,各个微服务节点共享订阅所有设备端的响应话题,各个微服务节点普通订阅同步响应转发话题,各个微服务节点创建MsgId存储库,各个设备端订阅含有设备唯一ID的请求话题。
[0083] 根据本申请一种实施例,所述MsgId存储库中包括MsgId、MsgId的生成时间戳、超时阈值,用于启动MsgId超时检查线程,根据各MsgId的生成时间戳和超时阈值定时检测所述MsgId存储库中各MsgId是否超时。
[0084] 以上所述,仅为本申请的具体实施方式,但本申请的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本申请揭露的技术范围内,可轻易想到的变化或替换,都应涵盖在本申请的保护范围之内。因此,本申请的保护范围应以权利要求的保护范围为准。