一种基于分布式消息的批量入库方法转让专利

申请号 : CN202110514709.3

文献号 : CN113312386B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 唐振华

申请人 : 四川新网银行股份有限公司

摘要 :

本发明公开了一种基于分布式消息的批量入库方法,属于互联网金融技术领域,解决同一时间段内进行高并发大量数据同时写入带来的数据库读写的I/O性能瓶颈的问题。包括异步消息处理服务和定时调度服务;所述异步消息处理服务负责根据不同的业务场景,在对业务数据写入之前先进行防重处理,处理完成之后,再按业务场景分组写入分布式内存高速缓存数据库中;所述定时调度服务负责每间隔数分钟启动调度任务,查询布分式内存高速缓存服务器中登记的业务数据,如果有则进行业务数据读取后,进行批量入库操作。其目的在于,大幅度的降低了数据库交互,解决了数据库的CPU、内存、磁盘IO持续满载的情况,保证分布式消费信贷核心稳定运行。

权利要求 :

1.一种基于分布式消息的批量入库方法,其特征在于,包括异步消息处理服务和定时调度服务;

所述异步消息处理服务负责根据不同的业务场景,在对业务数据写入之前先进行防重处理,处理完成之后,再按业务场景分组写入分布式内存高速缓存数据库中;

所述定时调度服务负责每间隔数分钟启动调度任务,查询布分式内存高速缓存服务器中登记的业务数据,如果有则进行业务数据读取后,进行批量入库操作;

所述异步消息服务包括:

步骤1:在消息处理服务中,消费端接收到服务端发送的贷款业务消息后,按业务处理的场景进行分组,并对分组后的消息按方式组装为本次请求的消息体的内容;

步骤2:通过步骤1对原始的数据报文按业务场景进行封装,形成新的请求报文,所述新的请求报文对同一类型的业务数据按相同的业务场景增加前缀后形成新的业务场景分组数据;

步骤3:进行高并发场景下的幂等校验,检查业务场景分组数据是否在分布式内存高速缓存数据库中,如果该业务场景分组数据已经存在,即跳过结束该步骤;

步骤4:如果该业务场景分组数据不在分布式内存高速缓存数据库中,则检查业务场景分组数据是否已经登记到mysql数据库中,如果存在,说明该数据已经被进行处理,即跳过结束该步骤;

步骤5:通过步骤3和步骤4的检查之后,确认业务场景分组数据不存在于分布式内存高速缓存数据库和mysql数据库中,将该业务场景分组数据写入到分布式内存高速缓存数据库进行中,将分布式内存高速缓存数据库作为临时缓存区用于存放业务场景分组之后的数据;

步骤6:启动异步线程,按业务场景所属分组前缀以模糊匹配方式按固定长度去遍历分布式内存高速缓存数据库,从分布式内存高速缓存数据库中批量读取带有业务场景分组数据,在数据库交互方式上极大降低对数据库读写请求;

步骤7:应用程序接收到从分布式内存高速缓存数据库获取的数据后,应用程序根据业务数据前缀遍历分布式内存高速缓存数据库中的数据,遍历到指定前缀的数据之后返回本次遍历之后游标,对接收遍历游标信息进行判断,本次遍历游标信息是否为 0 ,如果为0则等待下一步轮询,不为0则执行步骤6的操作;

步骤8:接收到游标信息后通过游标信息找到本次返回的业务主建集合,再通过业务主建集合取得该集合所对应每一条业务数据集合;

步骤9:在消息体集合中结果中以循环方式获取每一条消息,对消息通过反序列化处理,转化为实体对像,把转化之后实体对像放入数据库插入清单List中;

步骤10:接收数据库插入清单List,采用批量入库方法生成批量插入语句,调用数据库批量执行的方式进行数据入库处理;

步骤11:待数据库批量入库成功后调用操作分布式内存高速缓存的删除方法,删除分布式内存高速缓存数据库中本次拉取下来的业务主建集合;

步骤12:重复轮询分布式内存高速缓存数据库的登记的业务场景分组信息,只要有消息重复执行步骤6到步骤11。

2.根据权利要求1所述的一种基于分布式消息的批量入库方法,其特征在于,所述定时调度服务只做数据入库,不处理业务逻辑。

3.根据权利要求1所述的一种基于分布式消息的批量入库方法,其特征在于,所述key的生成方式为:业务场景前缀加上从消息体中取得的唯一标识码,Value的生成方式为:消费端接收的消息体,消息生成方式结构如下:业务主键key, 消息体value。

4.根据权利要求1所述的一种基于分布式消息的批量入库方法,其特征在于,所述分布式内存高速缓存数据库是基于内存来进行数据临时存储,用于解决接收到每一条数据,同时通过依赖数据库的批量写入的方式一次性向数据写入固定笔数的贷款业务数据,减少应用服务器与数据库间频繁的读写操作。

5.根据权利要求1所述的一种基于分布式消息的批量入库方法,其特征在于,所述步骤

10解决了从分布式内存高速缓存数据库中取得数据后,对分布式内存高速缓存数据库数据进行反序列化生成了JAVA应用开发中的实体对像,通过调用数据库批量入库方法,进行JAVA实体对像批量保存到数据库。

说明书 :

一种基于分布式消息的批量入库方法

技术领域

[0001] 本发明属于互联网金融技术领域,具体涉及一种基于分布式消息的批量入库方法。

背景技术

[0002] 在消费信贷业务中,随着业务的快速发展和软件架构的分布式和微服务化演进带来了大量的业务数据,基于海量业务数据和分布式系统的处理架构上,消费信贷晚间批量业务处理的消费端在高并发的场景下会接收到大量并发的异步业务消息数据并进行数据库入库的操作。伴随着业务量的陡增,在分布式消费信贷系统的应用架构层,可通过基于微服务化的横向扩容方案进行解决,但是应用服务器的横向扩容给也将并发访问的瓶颈问题传递给了数据库。
[0003] 分布式消费信贷进行业务系统的消息接收和入库,该处理方式将业务流程的处理和单一的数据入库处理进行强绑定,并且在晚间利息计提批量的时间段中,每条消息请求都需要跟数据库进行更新和查询操作,业务信息和数据处理信息耦合程度较高,不符合微服务设计原则。
[0004] 同时数据库在同一时间段内会产生大量的读写请求,给数据库带来了巨大的压力,甚至在持续的高并发读取下会导致数据库响应缓慢、处理不及时、请求响应下降,高并发的请求相当于在内网进行了一次大流量攻击且这种攻击是针对数据库的,会造成数据库的响应缓慢,在进行磁盘IO操作时,数据库进程无法及响应造成数据库宕机的风险。由于批量处理必须在规定的时间段内完成,该处理方式对数据库的压力较大,无法在时间窗口内完成批量的处理,影响下游大数据抽取及贷后业务系统业务开展。

发明内容

[0005] 针对现有技术存在的问题,本发明提供了一种基于分布式消息的批量入库方法,其目的在于,对原有的方式进行业务异步消息请求改造,在同等并发量的情况下,大幅度的降低了数据库交互,解决了数据库的CPU、内存、磁盘IO持续满载的情况,保证了分布式消费信贷核心稳定运行,解决同一时间段内进行高并发大量数据同时写入带来的数据库读写的I/O性能瓶颈。
[0006] 本发明采用的技术方案如下:
[0007] 一种基于分布式消息的批量入库方法,包括异步消息处理服务和定时调度服务;
[0008] 所述异步消息处理服务负责根据不同的业务场景,在对业务数据写入之前先进行防重处理,处理完成之后,再按业务场景分组写入分布式内存高速缓存数据库中;
[0009] 所述定时调度服务负责每间隔数分钟启动调度任务,查询布分式内存高速缓存服务器中登记的业务数据,如果有则进行业务数据读取后,进行批量入库操作。
[0010] 进一步的,所述定时调度服务只做数据入库,不处理业务逻辑。
[0011] 进一步的,所述异步消息服务包括:
[0012] 步骤1:在消息处理服务中,消费端接收到服务端发送的贷款业务消息后,按业务处理的场景进行分组,并对分组后的消息按方式组装为本次请求的消息体的内容;
[0013] 步骤2:通过步骤1对原始的数据报文按业务场景进行封装,形成新的请求报文,所述新的请求报文对同一类型的业务数据按相同的业务场景增加前缀后形成新的业务场景分组的数据;
[0014] 步骤3:进行高并发场景下的幂等校验,检查业务场景分组数据是否在分布式内存高速缓存数据库中,如果该业务场景分组数据已经存在,即跳过结束该步骤;
[0015] 步骤4:如果该业务场景分组数据不在分布式内存高速缓存数据库中,则检查业务场景分组数据是否已经登记到mysql数据库中,如果存在,说明该数据已经被进行处理,即跳过结束该步骤;
[0016] 步骤5:通过步骤3和步骤4的检查之后,确认业务场景分组数据不存在于分布式内存高速缓存数据库和mysql数据库中,将该业务场景分组数据写入到分布式内存高速缓存数据库进行中,将分布式内存高速缓存数据库作为临时缓存区用于存放业务场景分组之后的数据。
[0017] 所述key的生成方式为:业务场景前缀加上从消息体中取得的唯一标识码,Value的生成方式为:消费端接收的消息体,消息生成方式结构如下:业务主键key, 消息体value。
[0018] 所述分布式内存高速缓存数据库是基内存来进行数据临时存储,用于解决接收到每一条数据,同时通过依赖数据库的批量写入的方式一次性向数据写入固定笔数的贷款业务数据,减少应用服务器与数据库间频繁的读写操作。
[0019] 进一步的,所述定时调度服务包括:
[0020] 步骤6:启动异步线程,按业务场景所属分组前缀以模糊匹配方式按固定长度去遍历分布式内存高速缓存数据库,从分布式内存高速缓存数据库中批量读取带有业务场景分组数据,在数据库交互方式上极大降低对数据库读写请求;
[0021] 步骤7:应用程序接收到从分布式内存高速缓存数据库获取的数据后,应用程序根据业务数据前缀遍历分布式内存高速缓存数据库中的数据,遍历到指定前缀的数据之后返回本次遍历之后游标,对接收遍历游标信息进行判断,本次遍历游标信息是否为 0 ,如果为0则等待下一步轮询,不为0则执行步骤6的操作;
[0022] 步骤8:接收到游标信息后通过游标信息找到本次返回的业务主建集合,再通过业务主建集合取得该集合所对应每一条业务数据集合;
[0023] 步骤9:在消息体集合中结果中以循环方式获取每一条消息,对消息通过反序列化处理,转化为实体对像,把转化之后实体对像放入数据库插入清单List中;
[0024] 步骤10:接收数据库插入清单List,采用批量入库方法生成批量插入语句,调用数据库批量执行的方式进行的数据入库处理;
[0025] 步骤11:待数据库批量入库成功后调用操作分布式内存高速缓存的删除方法,删除分布式内存高速缓存数据库中本次拉取下来的业务主建集合;
[0026] 步骤12:重复轮询分布式内存高速缓存数据库的登记的业务场景分组信息,只要有消息重复执行步骤6到步骤11。
[0027] 进一步的,所述步骤10解决了从分布式内存高速缓存数据库中取得数据后,对分布式内存高速缓存数据库数据进行反序列化生成了JAVA应用开发中的实体对像,通过调用数据库批量入库方法,进行JAVA实体对像批量保存到数据库。
[0028] 综上,本发明技术方案所带来的有益效果是:
[0029] 1.定时调度服务通过设置的固定的启动的时间节点,每隔2分钟启动定时调度任务去查询布分式内存高速缓存中登记的业务数据,如果有则进行业务数据读取后,进行批量写入数据库操作,定时调度服务只做数据入库,不处理业务逻辑,该设计模式下对业务和数据处理完美解耦,符合低耦合、高内聚的设计原则。
[0030] 2.待数据库批量入库成功后调用操作分布式内存高速缓存的删除方法,删除分布式内存高速缓存数据库中本次拉取下来的主建集合,该操作避免从分布式内存高速缓存数据库拉到重复数据,减少对业务数据进行重复处理风险。
[0031] 4.通过增加定时调度服务,在定时调度服务中启动异步线程,按业务场景所属分组前缀以模糊匹配方式按固定长度去遍历分布式内存高速缓存,该步骤可批量从分布式内存高速缓存数据库中读取带有业务场景数据,较之前的接收到单笔就进行数据库入库操作,在数据库交互方式上极大降低对数据库读写请求,从而降低了数据库并发量,保证了数据库在一个安全水位线前提下进行数据读写的I/O处理能力。
[0032] 5. 本方案在原有方案基础上,在接收到异步业务消息后按业务场景对业务异步消息进行分组,对分组的数据缓存入分布式内存高速缓存数据库中,将分布式内存高速缓存数据数据库作为分组数据的缓冲池。按组进行数据的批量拉取及批量入库,对原有的方式进行业务异步消息请求改造,在同等并发量的情况下,大幅度的降低了数据库交互,解决了数据库的CPU、内存、磁盘IO持续满载的情况,该方案在线上运行,CPU和内存都在低于预警值的范围内运行,保证了分布式消费信贷核心稳定运行。

附图说明

[0033] 本发明将通过例子并参照附图的方式说明,其中:
[0034] 图1是本发明中消息处理服务的示意图;
[0035] 图2是本发明中定时调度服务的流程示意图。

具体实施方式

[0036] 为使本申请实施例的目的、技术方案和优点更加清楚,下面将结合本申请实施例中附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅是本申请一部分实施例,而不是全部的实施例。通常在此处附图中描述和示出的本申请实施例的组件可以各种不同的配置来布置和设计。因此,以下对在附图中提供的本申请的实施例的详细描述并非旨在限制要求保护的本申请的范围,而是仅仅表示本申请的选定实施例。基于本申请的实施例,本领域技术人员在没有做出创造性劳动的前提下所获得的所有其他实施例,都属于本申请保护的范围。
[0037] 在本申请实施例的描述中,需要说明的是,术语“上”、“下”、“左”、“右”、“竖直”、“水平”、“内”、“外”等指示的方位或位置关系为基于附图所示的方位或位置关系,或者是该发明产品使用时惯常摆放的方位或位置关系,仅是为了便于描述本申请和简化描述,而不是指示或暗示所指的装置或元件必须具有特定的方位、以特定的方位构造和操作,因此不能理解为对本申请的限制。此外,术语“第一”、“第二”、“第三”等仅用于区分描述,而不能理解为指示或暗示相对重要性。
[0038] 下面结合图1~图2对本发明作详细说明。
[0039] 一种基于分布式消息的批量入库方法,包括异步消息处理服务和定时调度服务;
[0040] 所述异步消息处理服务负责根据不同的业务场景,在对业务数据写入之前先进行防重处理,处理完成之后,再按业务场景分组写入分布式内存高速缓存数据库中;
[0041] 所述定时调度服务通过设置的固定的启动时间节点,每隔2分钟启动定时调度任务去查询布分式内存高速缓存数据库中登记的业务数据,如果有则进行业务数据读取后,进行批量写入数据库操作,其中定时调度服务只做数据入库,不处理业务逻辑,该设计模式下对业务和数据处理完美解耦,符合低耦合、高内聚的设计原则。
[0042] 进一步的,所述异步消息服务包括:
[0043] 步骤1:在消息处理服务中,消费端接收到服务端发送的贷款业务消息后,按业务处理的场景进行分组,并对分组后的消息按方式组装为本次请求的消息体的内容;
[0044] key的生成方式为:业务场景前缀加上从消息体中取得的唯一标识码。
[0045] Value的生成方式:消费端接收的消息体
[0046] 消息生成方式结构如下:
[0047] 业务主键key, 消息体value
[0048] A001_1,{ID:1,NO:001 }
[0049] A001_2,{ID:2,NO:002 }
[0050] A001_3,{ID:3,NO:003 }
[0051] B001_1,{ID:4,NO:003 }
[0052] B001_2,{ID:5,NO:004 }
[0053] 步骤2:通过步骤1对原始的数据报文按业务场景进行封装,形成新的请求报文,该类请求报文对同一类型的业务数据按相同的业务场景增加前缀后形成新的按业务场景分组的数据,该业务场景中,打破了原有接收到数据之后立刻进行数据库插入方式,采用按业务场景进行数据分组方式进行数据处理模式。
[0054] 步骤3:进行高并发场景下的幂等校验,检查业务场景分组数据是否在分布式内存高速缓存数据库中,如果该业务场景分组数据已经存在,即跳过结束该步骤;
[0055] 步骤4:如果该业务场景分组数据不在分布式内存高速缓存数据库中,则检查该笔业务场景分组数据是否已经登记到mysql数据库中,如果存在,说明该数据已经被进行处理,即跳过结束该步骤;
[0056] 步骤5:通过步骤3和步骤4的检查之后,确认业务场景分组数据不存在于分布式内存高速缓存数据库和mysql数据库中,该步骤利用分布式内存高速缓存数据库读写速度快、性能高的特点,将该业务场景分组数据写入到分布式内存高速缓存数据库进行中,将分布式内存高速缓存数据库作为临时缓存区用于存放业务场景分组之后的数据。
[0057] 该实施例中,所述key的生成方式为:业务场景前缀加上从消息体中取得的唯一标识码,Value的生成方式为:消费端接收的消息体,消息生成方式结构如下:业务主键key, 消息体value。
[0058] 该实施例中,所述分布式内存高速缓存数据库是基内存来进行数据临时存储,用于解决接收到每一条数据,同时通过依赖数据库的批量写入的方式一次性向数据写入固定笔数的贷款业务数据,减少应用服务器与数据库间频繁的读写操作。
[0059] 进一步的,所述定时调度服务包括:
[0060] 步骤6:启动异步线程,按业务场景所属分组前缀以模糊匹配方式按固定长度去遍历分布式内存高速缓存数据库,如步骤1中所示消息,只需模糊匹配前缀A001固定长度记录数,获取属于A001前缀数据请求信息,可从分布式内存高速缓存数据库中批量读取带有业务场景分组数据,较之前的接收到单笔就进行数据库入库操作,在数据库交互方式上极大降低对数据库读写请求,从而降低了数据库并发量,保证了数据库在一个安全水位线前提下进行数据读写的I/O处理能力。
[0061] 步骤7:应用程序接收到从分布式内存高速缓存数据库获取的数据后,应用程序根据业务数据前缀遍历分布式内存高速缓存数据库中的数据,遍历到指定前缀的数据之后返回本次遍历之后游标,对接收遍历游标信息进行判断,本次遍历游标信息是否为 0 ,如果为0则等待下一步轮询,不为0则执行步骤6的操作;
[0062] 该操作为检查在分布式内存高速缓存数据库是否有该业务分组(如:A001)数据信息,如果有就从分布式内存高速缓存数据库中取出该业务分组(如:A001)的数据清单信息。
[0063] 例:取A001 开头的数据,可取到以下清单的数据实体:
[0064] A001_1,{ID:1,NO:001 }
[0065] A001_2,{ID:2,NO:002 }
[0066] A001_3,{ID:3,NO:003 }
[0067] 步骤8:接收到游标信息后通过游标信息找到本次返回的业务主建集合,再通过业务主建集合取得该集合所对应每一条业务数据集合;
[0068] 例:取A001 开头的数据,游标包括 { A001_1,A001_2,A001_3}
[0069] 步骤9:在消息体集合中结果中以循环方式获取每一条消息,对消息通过反序列化处理,转化为实体对像,把转化之后实体对像放入数据库插入清单List中;
[0070] 步骤10:接收数据库插入清单List,采用批量入库方法生成批量插入语句,调用数据库批量执行的方式进行该批次的数据入库处理;通过第5步到第8步的操作,解决了从分布式内存高速缓存数据库中取得数据后,对分布式内存高速缓存数据数据库进行反序列化生成了JAVA应用开发中的实体对像,通过调用数据库批量入库方法,进行JAVA实体对像批量保存到数据库。
[0071] 步骤11:待数据库批量入库成功后调用操作分布式内存高速缓存的删除方法,删除分布式内存高速缓存数据库中本次拉取下来的业务主建集合;该操作目的是避免从分布式内存高速缓存拉到重复数据。减少对业务数据进行重复处理风险。
[0072] 步骤12:重复轮询分布式内存高速缓存数据库的登记的业务场景分组信息,只要有消息重复执行步骤6到步骤11。
[0073] 进一步的,所述步骤10解决了从分布式内存高速缓存数据库中取得数据后,对分布式内存高速缓存数据库数据进行反序列化生成了JAVA应用开发中的实体对像,通过调用数据库批量入库方法,进行JAVA实体对像批量保存到数据库。
[0074] 本方案在原有方案基础上,在接收到异步业务消息后按业务场景对业务异步消息进行分组,对分组的数据缓存入分布式内存高速缓存数据库中,将分布式内存高速缓存数据数据库作为分组数据的缓冲池。按组进行数据的批量拉取及批量入库,对原有的方式进行业务异步消息请求改造,在同等并发量的情况下,大幅度的降低了数据库交互,解决了数据库的CPU、内存、磁盘IO持续满载的情况,该方案在线上运行,CPU和内存都在低于预警值的范围内运行。保证了分布式消费信贷核心稳定运行。
[0075] 以上所述实施例仅表达了本申请的具体实施方式,其描述较为具体和详细,但并不能因此而理解为对本申请保护范围的限制。应当指出的是,对于本领域的普通技术人员来说,在不脱离本申请技术方案构思的前提下,还可以做出若干变形和改进,这些都属于本申请的保护范围。