基于消息队列的存量数据与增量数据融合的数据同步方法转让专利

申请号 : CN202211609848.5

文献号 : CN115599870B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 向才锋

申请人 : 云筑信息科技(成都)有限公司

摘要 :

本发明公开了基于消息队列的存量数据与增量数据融合的数据同步方法,属于大数据技术领域,包括创建数据源信息并存放至数据库中;拉取数据源中待同步的数据表作为待同步表列表,获取数据表和数据表对应的数据源信息,建立数据表与目标数据表的映射关系;创建增量同步任务,获取增量数据并根据映射关系发送至消息队列中;创建存量同步任务,异步获取存量数据并根据映射关系发送至消息队列中;创建数据实时接入任务,将消息队列中的增量数据或存量数据写入目标数据库对应的目标数据表中。本发明采用消息队列融合存量同步和增量同步数据,仅使用一条链路即同时满足存量、增量同步的需求,减轻数据同步的操作难度和实施成本。

权利要求 :

1.基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,包括以下步骤:

步骤1、创建数据源信息并存放至数据库中;

步骤2、拉取数据源中待同步的数据表作为待同步表列表,获取数据表和数据表对应的数据源信息,建立数据表与目标数据表的映射关系;

步骤3、创建增量同步任务,获取增量数据并根据映射关系发送至消息队列中;

步骤4、创建存量同步任务,异步获取存量数据并根据映射关系发送至消息队列中;

步骤5、创建数据实时接入任务,将消息队列中的增量数据或存量数据写入目标数据库对应的目标数据表中;

所述步骤3包括:步骤31、选取待同步表列表中待同步的数据表,构建增量同步列表,将增量同步列表中的数据表与目标数据库中的目标数据表对比,新建新的目标数据表或修改已存在的目标数据表;步骤32、校验是否存在与待同步的数据表对应的cdc任务,若不存在则新建对应的cdc任务;步骤33、由cdc任务监控待同步的数据表的变更,然后生成、解析变更日志并生成变更记录、建立第一json文件;步骤34、判断待同步的数据表是否发生变更,若发生变更,生成最新schem信息并进行注册获得当前schem版本,将当前schem版本存放至集合map中并生成数据表对应的schem总信息;步骤35、从集合map中获取当前schem版本并写入第一json文件中形成增量数据;步骤36、将增量数据及对应的数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将增量数据发送至消息队列对应的主题中;

所述步骤4包括:步骤41、选取待同步表列表中待同步的数据表,构建存量同步列表,初始化存量同步列表为待执行状态;步骤42、异步获取待执行状态的存量同步列表,获取所述存量同步列表中数据表的对应数据源信息的源数据库信息;步骤43、根据源数据库信息的源数据库类型选择对应的存量同步方式,通过数据表的主键分批查询和同步存量数据。

2.根据权利要求1所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述步骤1中,数据源信息至少包括数据源、以及各数据源对应的源数据库信息、消息队列信息、主题前缀和目标数据库信息,目标数据库信息至少包括数据库类型、数据库链接信息和数据库库名。

3.根据权利要求2所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,数据表与目标数据表的映射关系为:获取数据库中数据表和数据表对应的主题前缀,建立目标数据库中的目标数据表和数据表、主题的映射关系,将所述映射关系存放至集合map中。

4.根据权利要求3所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述步骤32中,新建cdc任务时,根据待同步的数据表对应数据库类型加载对应的任务模板,并在任务模板中至少填充数据表对应的源数据库信息、待同步的数据表信息和消息队列信息;所述步骤34中,schem总信息至少包括schem版本号、以及对应的schem信息和数据表。

5.根据权利要求4所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述查询和同步存量数据的方法包括:步骤431、查询源数据库的数据表并获取schem信息,注册当前的schem信息并返回至对应的schema版本号;步骤432、建立第二json文件,将返回的schema版本号写入第二json文件中形成存量数据;步骤433、将存量数据及对应数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将存量数据发送至消息队列对应的主题中。

6.根据权利要求5所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述步骤5包括:步骤51、设置数据实时接入任务的数据接入模块,消费消息队列信息;步骤52、设置数据实时接入任务的数据处理模块,处理数据源信息。

7.根据权利要求6所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述步骤52包括:步骤521、设置目标数据库信息;步骤522、由目标数据库信息的数据库类型,选择对应的数据处理模板,解析消息队列中的增量数据或存量数据,并根据映射关系将增量数据或存量数据写入对应的目标数据表中。

8.根据权利要求7所述的基于消息队列的存量数据与增量数据融合的数据同步方法,其特征在于,所述步骤522包括以下过程:步骤A、基于schema总信息加载schema版本及对应schema信息到集合map中;步骤B、解析增量数据或存量数据,获取对应的schema版本号,再从集合map中获取对应的schema信息;步骤C、校验获取的schema信息与目标数据表的schema信息是否一致或兼容;步骤D、校验结果一致或兼容时,根据目标数据表的schema信息和数据库类型,调用数据写入模块将增量数据或存量数据写入目标数据表中。

说明书 :

基于消息队列的存量数据与增量数据融合的数据同步方法

技术领域

[0001] 本发明属于大数据技术领域,具体涉及基于消息队列的存量数据与增量数据融合的数据同步方法。

背景技术

[0002] 为了解决数据同步,业内出现了类似sqoop、datax、kettle这类离线同步工具,也有类似canal、debezium这类增量同步工具。这两类同步工具在数据同步中都扮演者非常重要的角色,但是它们各自也有其缺陷。
[0003] 离线同步工具用于离线存量数据的同步,如果一定要用于增量数据同步,那么需要借助调度工具定时调度小时间粒度的同步任务,这种方式数据延迟较大,并且无法适用于业务数据表不存在类似数据变更时间的字段的数据。增量同步工具主要是通过监控数据库变更日志如mysql‑binlog来实现准实时数据同步,但是对于这类工具又无法应用于存量数据同步。一般情况下,增量同步工具会与离线同步工具搭配使用,首先初始化时在数据库建立一个快照,开启增量同步将快照后面的变更通过增量同步的方式写入目标库,然后使用离线同步工具将快照对应存量数据同步到目标库,但是这种方式存在存量和增量两条同步线路处理数据,对于存量和增量数据无缝链接等方面有较大的处理成本,成为所属技术领域技术人员亟待解决的技术问题。

发明内容

[0004] 本发明要解决的技术问题是:提供基于消息队列的存量数据与增量数据融合的数据同步方法,以至少解决上述部分技术问题。
[0005] 为实现上述目的,本发明采用的技术方案如下:
[0006] 基于消息队列的存量数据与增量数据融合的数据同步方法,包括以下步骤:
[0007] 步骤1、创建数据源信息并存放至数据库中;
[0008] 步骤2、拉取数据源中待同步的数据表作为待同步表列表,获取数据表和数据表对应的数据源信息,建立数据表与目标数据表的映射关系;
[0009] 步骤3、创建增量同步任务,获取增量数据并根据映射关系发送至消息队列中;
[0010] 步骤4、创建存量同步任务,异步获取存量数据并根据映射关系发送至消息队列中;
[0011] 步骤5、创建数据实时接入任务,将消息队列中的增量数据或存量数据写入目标数据库对应的目标数据表中。
[0012] 进一步地,所述步骤1中,数据源信息至少包括数据源、以及各数据源对应的源数据库信息、消息队列信息、主题前缀和目标数据库信息,目标数据库信息至少包括数据库类型、数据库链接信息和数据库库名。
[0013] 进一步地,所述步骤2中,数据表与目标数据表的映射关系为:获取数据库中数据表和数据表对应的主题前缀,建立目标数据库中的目标数据表和数据表、主题的映射关系,将所述映射关系存放至集合map中。
[0014] 进一步地,所述步骤3包括:步骤31、选取待同步表列表中待同步的数据表,构建增量同步列表,将增量同步列表中的数据表与目标数据库中的目标数据表对比,新建新的目标数据表或修改已存在的目标数据表;步骤32、校验是否存在与待同步的数据表对应的cdc任务,若不存在则新建对应的cdc任务;步骤33、由cdc任务监控待同步的数据表的变更,然后生成、解析变更日志并生成变更记录、建立第一json文件;步骤34、判断待同步的数据表是否发生变更,若发生变更,生成最新schem信息并进行注册获得当前schem版本,将当前schem版本存放至集合map中并生成数据表对应的schem总信息;步骤35、从集合map中获取当前schem版本并写入第一json文件中形成增量数据;步骤36、将增量数据及对应的数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将增量数据发送至消息队列对应的主题中。
[0015] 进一步地,所述步骤32中,新建cdc任务时,根据待同步的数据表对应数据库类型加载对应的任务模板,并在任务模板中至少填充数据表对应的源数据库信息、待同步的数据表信息和消息队列信息;所述步骤34中,schem总信息至少包括schem版本号、以及对应的schem信息和数据表。
[0016] 进一步地,所述步骤4包括:步骤41、选取待同步表列表中待同步的数据表,构建存量同步列表,初始化存量同步列表为待执行状态;步骤42、异步获取待执行状态的存量同步列表,获取所述存量同步列表中数据表的对应数据源信息的源数据库信息;步骤43、根据源数据库信息的源数据库类型选择对应的存量同步方式,通过数据表的主键分批查询和同步存量数据。
[0017] 进一步地,所述查询和同步存量数据的方法包括:步骤431、查询源数据库的数据表并获取schem信息,注册当前的schem信息并返回至对应的schema版本号;步骤432、建立第二json文件,将返回的schema版本号写入第二json文件中形成存量数据;步骤433、将存量数据及对应数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将存量数据发送至消息队列对应的主题中。
[0018] 进一步地,所述步骤5包括:步骤51、设置数据实时接入任务的数据接入模块,消费消息队列信息;步骤52、设置数据实时接入任务的数据处理模块,处理数据源信息。
[0019] 进一步地,所述步骤52包括:步骤521、设置目标数据库信息;步骤522、由目标数据库信息的数据库类型,选择对应的数据处理模板,解析消息队列中的增量数据或存量数据,并根据映射关系将增量数据或存量数据写入对应的目标数据表中。
[0020] 进一步地,所述步骤522包括以下过程:步骤A、基于schema总信息加载schema版本及对应schema信息到集合map中;步骤B、解析增量数据或存量数据,获取对应的schema版本号,再从集合map中获取对应的schema信息;步骤C、校验获取的schema信息与目标数据表的schema信息是否一致或兼容;步骤D、校验结果一致或兼容时,根据目标数据表的schema信息和数据库类型,调用数据写入模块将增量数据或存量数据写入目标数据表中。
[0021] 与现有技术相比,本发明具有以下有益效果:
[0022] 本发明采用消息队列融合存量同步和增量同步数据,仅使用一条链路即同时满足存量、增量同步的需求,减轻数据同步的操作难度和实施成本。本发明增量数据和存量数据使用同一套schema信息,在源数据表的结构发生变化时,及时感知变化以应对数据结构变更,从而调整目标数据表以达到兼容的目的。并且存量数据可随时同步,满足实时数据因各种故障发生丢失或错误时需同步存量数据而补全目标数据表,以达到数据一致的需求。

附图说明

[0023] 图1为本发明的方法流程图。
[0024] 图2为本发明的映射关系生成图。

具体实施方式

[0025] 为了使本发明的目的、技术方案及优点更加清楚明白,以下结合附图,对本发明进一步详细说明。显然,所描述的实施例仅仅是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
[0026] 如图1所示,本发明提供的基于消息队列的存量数据与增量数据融合的数据同步方法,包括以下步骤:
[0027] 步骤1、创建数据源信息并存放至数据库中;
[0028] 步骤2、拉取数据源中待同步的数据表作为待同步表列表,获取数据表和数据表对应的数据源信息,建立数据表与目标数据表的映射关系;
[0029] 步骤3、创建增量同步任务,获取增量数据并根据映射关系发送至消息队列中;
[0030] 步骤4、创建存量同步任务,异步获取存量数据并根据映射关系发送至消息队列中;
[0031] 步骤5、创建数据实时接入任务,将消息队列中的增量数据或存量数据写入目标数据库对应的目标数据表中。
[0032] 本发明采用消息队列融合存量同步和增量同步数据,仅使用一条链路即同时满足存量、增量同步的需求,减轻数据同步的操作难度和实施成本。本发明增量数据和存量数据使用同一套schema信息,在源数据表的结构发生变化时,及时感知变化以应对数据结构变更,从而调整目标数据表以达到兼容的目的。并且存量数据可随时同步,满足实时数据因各种故障发生丢失或错误时需同步存量数据而补全目标数据表,以达到数据一致的需求。
[0033] 本发明所述步骤1为各数据源的信息集合,其创建的数据源信息至少包括数据源、以及各数据源对应的源数据库信息、消息队列信息、主题前缀和目标数据库信息。数据源信息以信息表形式呈现,如表1所示。目标数据库信息至少包括数据库类型、数据库链接信息和数据库库名。
[0034] 数据源信息的信息表如表1所示:
[0035] 表1 数据源信息的信息表
[0036]
[0037] 本发明所述步骤2为待同步的数据表的拉取、以及数据表与目标数据表的映射关系的建立。从数据源中实时拉取待同步的数据表,构建为待同步表列表,待同步表列表可作为增量和存量数据同步的入口,并获取待同步表列表中的数据表和数据表对应的数据源信息。数据表与目标数据表的映射关系的映射方法图2所示,包括获取数据库中数据表和数据表对应的主题前缀,建立目标数据库中的目标数据表和数据表、主题的映射关系,即每个数据表具有一一对应的数据源、题和目标数据表,将所述映射关系存放至集合map中。所述主题的名称为主题前缀的名称加上下横线字符再加上数据表的名称,所述目标数据表的名称为Ods字符加上下横线字符再加上数据表的名称。
[0038] 目标数据表和数据表、主题的映射关系表如表2所示:
[0039] 表2 映射关系表
[0040]
[0041] 本发明所述步骤3为基于消息队列的增量数据生成。步骤3包括:
[0042] 步骤31、选取待同步表列表中待同步的数据表,构建增量同步列表,将增量同步列表中的数据表与目标数据库中的目标数据表对比,新建新的目标数据表或修改已存在的目标数据表,具体为:若目标数据库中无所述数据表对应的目标数据表,则在目标数据库中新建对应的目标数据表,若目标数据库中对应的目标数据表,再次校验两个表的结构,若结构不同对目标数据表进行修改,修改方式包括但不限于增加字段,修改字段类型等;
[0043] 步骤32、校验是否存在与待同步的数据表对应的cdc任务,若不存在则新建对应的cdc任务,新建cdc任务时,根据待同步的数据表对应数据库类型加载对应的任务模板,并在任务模板中至少填充数据表对应的源数据库信息、待同步的数据表信息、消息队列信息和主题前缀;所述源数据库信息至少包括数据库地址、账户和密码;所述数据表信息不仅包括数据表与对应的数据源,还包括数据表的主键;消息队列信息至少包括发送数据间隔;
[0044] 步骤33、由cdc任务监控待同步的数据表的变更,然后生成、解析变更日志并生成变更记录、建立第一json文件;数据表的变更包括数据的增加、删除和修改,对应的变更记录即为增加记录,修改记录,删除记录;
[0045] 步骤34、判断待同步的数据表是否发生变更,若发生变更,生成最新schem信息并进行注册获得当前schem版本,将当前schem版本存放至集合map中并生成数据表对应的schem总信息;集合map用于维护待同步的数据表的表名、schema版本的版本号,schema管理模块将生成的schem总信息持久化到数据库中用以后续数据处理schema版本获取数据记录对应的schema信息;所述schem总信息至少包括schem版本号、以及对应的schem信息和数据表,并且以信息表形式呈现,schem总信息的信息表如表3所示;
[0046] 表3 schem总信息的信息表
[0047]
[0048] 步骤35、从集合map中获取当前schem版本并写入第一json文件中形成增量数据,若集合map不存在schem版本,则执行步骤34注册schem;
[0049] 步骤36、将增量数据及对应的数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将增量数据发送至消息队列对应的主题中;将增量数据发送至对应主题的过程中,增量数据还根据填充数据表的主键,由hash取模分发至对应的分区中。
[0050] 本发明所述步骤4为基于消息队列的存量数据生成。步骤4包括:
[0051] 步骤41、选取待同步表列表中待同步的数据表,构建存量同步列表,初始化存量同步列表为待执行状态;
[0052] 步骤42、异步获取待执行状态的存量同步列表,获取所述存量同步列表中数据表的对应数据源信息的源数据库信息;
[0053] 步骤43、根据源数据库信息的源数据库类型选择对应的存量同步方式,通过数据表的主键分批查询和同步存量数据。所述存量同步方式包括但不限jdbc和REST API。
[0054] 所述查询和同步存量数据的方法包括:步骤431、查询源数据库的数据表并获取schem信息,注册当前的schem信息并返回至对应的schema版本号,若schema信息已存在则返回已存在的schema版本号,若schema信息不存在、则新建schema版本并返回新建的schema版本号;步骤432、建立第二json文件,将返回的schema版本号写入第二json文件中形成存量数据;步骤433、将存量数据及对应数据表的名称发送至消息队列发送模块,消息队列发送模块根据映射关系获取对应的主题,并将存量数据发送至消息队列对应的主题中,存量数据发送至消息队列对应主题的过程中,存量数据还根据填充数据表的主键,由hash取模分发至对应的分区中。
[0055] 本发明所述步骤5为存量数据与增量数据在消息队列中的融合。步骤5包括:
[0056] 步骤51、设置数据实时接入任务的数据接入模块,消费消息队列信息,所述消息队列信息包括消息队列地址、需要消费的主题、以及主题对应的目标数据表,所述需要消费的主题根据映射关系自动生成;
[0057] 步骤52、设置数据实时接入任务的数据处理模块,处理数据源信息。
[0058] 所述步骤52包括:步骤521、设置目标数据库信息;步骤522、由目标数据库信息的数据库类型,选择对应的数据处理模板,解析消息队列中的增量数据或存量数据,并根据映射关系将增量数据或存量数据写入对应的目标数据表中。
[0059] 所述步骤522包括以下过程:步骤A、基于schema总信息加载schema版本及对应schema信息到集合map中;步骤B、解析增量数据或存量数据,获取对应的schema版本号,再从集合map中获取对应的schema信息;步骤C、校验获取的schema信息与目标数据表的schema信息是否一致或兼容;步骤D、校验结果一致或兼容时,根据目标数据表的schema信息和数据库类型,调用数据写入模块将增量数据或存量数据写入目标数据表中。
[0060] 本发明所述的消息队列优选kafka。
[0061] 最后应说明的是:以上各实施例仅仅为本发明的较优实施例用以说明本发明的技术方案,而非对其限制,当然更不是限制本发明的专利范围;尽管参照前述各实施例对本发明进行了详细的说明,本领域的普通技术人员应当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其中部分或者全部技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案的本质脱离本发明各实施例技术方案的范围;也就是说,但凡在本发明的主体设计思想和精神上作出的毫无实质意义的改动或润色,其所解决的技术问题仍然与本发明一致的,均应当包含在本发明的保护范围之内;另外,将本发明的技术方案直接或间接的运用在其他相关的技术领域,均同理包括在本发明的专利保护范围内。