数据处理方法、装置、存储介质和计算设备转让专利

申请号 : CN202311705419.2

文献号 : CN117390106B

文献日 :

基本信息:

PDF:

法律信息:

相似专利:

发明人 : 熊搏周波

申请人 : 杭州网易云音乐科技有限公司

摘要 :

本公开提供了一种数据处理方法、装置、存储介质和计算设备。包括:获取ETL过程中生成的实验实时数据流和指标实时数据流;将实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,键值对中的键为实验数据中包含的用户标识,键值对的值为实验数据中包含的实验桶数据;将指标实时数据流中的指标数据按照指标标识,生成与指标标识关联的实验列表;其中,实验列表中包括具有关联的指标标识的指标数据;遍历实验列表并在实验桶列表中查询实验列表中实验对应的实验桶数据,得到由查询到的实验桶数据构成的交集实验桶列表;将交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。

权利要求 :

1.一种数据处理方法,包括:

获取ETL过程中生成的实验实时数据流和指标实时数据流;

将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据;

将所述指标实时数据流中的指标数据按照指标标识,生成与指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据;

遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表;

将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。

2.根据权利要求1所述的方法,所述键值对中的键还包括实验数据中包含的用户标识和时间分片的组合。

3.根据权利要求2所述的方法,所述实验桶数据包括实验标识、实验桶标识、切流时间戳、打点时间戳;

其中,所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻;所述打点时间戳为用户开始实验的时刻。

4.根据权利要求3所述的方法,所述遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,包括:遍历所述实验列表,查询离最近一次实验的切流时间戳;

以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。

5.根据权利要求1所述的方法,所述将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表,包括:利用分布式处理引擎消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;

所述将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储,包括:将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后写入列式存储系统。

6.根据权利要求5所述的方法,在所述写入列式存储系统之后,还包括:向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。

7.根据权利要求4所述的方法,还包括:

响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;

将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;

其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;

响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;

将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。

8.一种数据处理装置,所述装置包括:

获取单元,获取ETL过程中生成的实验实时数据流和指标实时数据流;

第一存储单元,将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据;

生成单元,将所述指标实时数据流中的指标数据按照指标标识,生成与指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据;

遍历单元,遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表;

第二存储单元,将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。

9.根据权利要求8所述的装置,所述键值对中的键还包括实验数据中包含的用户标识和时间分片的组合。

10.根据权利要求9所述的装置,所述实验桶数据包括实验标识、实验桶标识、切流时间戳、打点时间戳;

其中,所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻;所述打点时间戳为用户开始实验的时刻。

11.根据权利要求10所述的装置,所述遍历单元,包括:遍历子单元,遍历所述实验列表,查询离最近一次实验的切流时间戳;

查询子单元,以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。

12.根据权利要求8所述的装置,所述生成单元,进一步用于利用分布式处理引擎消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;

所述第二存储单元,进一步用于将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后写入列式存储系统。

13.根据权利要求12所述的装置,所述装置还包括:查询单元,向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。

14.根据权利要求11所述的装置,还包括:

创建子单元,响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;

调度子单元,将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;

写入子单元,响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;

所述第二存储单元,还用于将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。

15.一种计算机可读存储介质,包括:

当所述计算机可读存储介质中的指令由电子设备的处理器执行时,使得电子设备能够执行如权利要求1‑7中任一项所述的数据处理方法。

16.一种计算设备,包括:

处理器;

用于存储所述处理器可执行指令的存储器;

其中,所述处理器被配置为执行所述可执行指令,以实现如权利要求1‑7中任一项所述的数据处理方法。

说明书 :

数据处理方法、装置、存储介质和计算设备

技术领域

[0001] 本公开的实施方式涉及计算机技术领域,更具体地,本公开的实施方式涉及一种数据处理方法、装置、存储介质和计算设备。

背景技术

[0002] 本部分旨在为本公开的实施方式提供背景或上下文。此处的描述不因为包括在本部分中就承认是现有技术。
[0003] ETL(Extract‑Transform‑Load)是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,其目的是将用户、企业或组织中分散、凌乱、标准不统一的数据整合到一起,从而为决策提供分析数据。
[0004] 在一些需要使用ETL的场景中,需要对ETL过程中产生的指标数据和实验数据进行处理,从而方便后续数据查询使用。例如,基于纯日志ETL的数据处理方案、基于双流JOIN的数据处理方案、基于存储类JOIN的数据处理方案等。
[0005] 然而,上述几种方案只适合一些数据量不大的简单业务,一旦涉及到复杂业务,由于复杂业务需要处理更多的数据量,且可以源源不断地产生越来越多的数据,因此会导致上述几种方案出现资源不足的问题。

发明内容

[0006] 在本公开实施方式的第一方面中,提供了一种数据处理方法。所述方法包括:
[0007] 获取ETL过程中生成的实验实时数据流和指标实时数据流;
[0008] 将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据;
[0009] 将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据;
[0010] 遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0011] 将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。
[0012] 可选的,所述键值对中的键还包括实验数据中包含的用户标识和时间分片的组合。
[0013] 可选的,所述实验桶数据包括实验标识、实验桶标识、切流时间戳、打点时间戳;
[0014] 其中,所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻;所述打点时间戳为用户开始实验的时刻。
[0015] 可选的,所述遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,包括:
[0016] 遍历所述实验列表,查询离最近一次实验的切流时间戳;
[0017] 以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。
[0018] 可选的,所述将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表,包括:
[0019] 利用分布式处理引擎消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;
[0020] 所述将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储,包括:
[0021] 将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后写入列式存储系统。
[0022] 可选的,在所述写入列式存储系统之后,还包括:
[0023] 向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。
[0024] 可选的,还包括:
[0025] 响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;
[0026] 将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;
[0027] 响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0028] 将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。
[0029] 在本公开实施方式的第二方面中,提供了一种数据处理装置,所述装置包括:
[0030] 获取单元,获取ETL过程中生成的实验实时数据流和指标实时数据流;
[0031] 第一存储单元,将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据;
[0032] 生成单元,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据;
[0033] 遍历单元,遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0034] 第二存储单元,将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。
[0035] 可选的,所述键值对中的键还包括实验数据中包含的用户标识和时间分片的组合。
[0036] 可选的,所述实验桶数据包括实验标识、实验桶标识、切流时间戳、打点时间戳;
[0037] 其中,所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻;所述打点时间戳为用户开始实验的时刻。
[0038] 可选的,所述遍历单元,包括:
[0039] 遍历子单元,遍历所述实验列表,查询离最近一次实验的切流时间戳;
[0040] 查询子单元,以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。
[0041] 可选的,所述生成单元,进一步用于利用分布式处理引擎消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;
[0042] 所述第二存储单元,进一步用于将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后写入列式存储系统。
[0043] 可选的,所述装置还包括:
[0044] 查询单元,向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。
[0045] 可选的,还包括:
[0046] 创建子单元,响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;
[0047] 调度子单元,将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;
[0048] 写入子单元,响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0049] 所述第二存储单元,还用于将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。
[0050] 在本公开实施方式的第三方面中,提供了一种计算机可读存储介质,包括:
[0051] 当所述计算机可读存储介质中的指令由电子设备的处理器执行时,使得电子设备能够执行如前任一项所述的数据处理方法。
[0052] 在本公开实施方式的第四方面中,提供了一种计算设备,包括:
[0053] 处理器;
[0054] 用于存储所述处理器可执行指令的存储器;
[0055] 其中,所述处理器被配置为执行所述可执行指令,以实现如前任一项所述的数据处理方法。
[0056] 根据本公开实施方式提供的数据处理方案,针对ETL过程中生成的实验实时数据流和指标实时数据流,通过将用户标识关联实验桶列表以实现对实验实时数据流中相同用户的实验桶数据进行划分,以及将指标标识关联实验列表以实现对实时数据流中相同指标的指标数据进行划分。在数据存储时,不再存储全量数据,而是在遍历了实验桶列表和实验列表得出的交集实验桶列表后,按照用户标识分片存储交集实验桶列表中的实验桶数据。如此有效得减少了数据量,避免出现资源不足的问题。此外通过预先划分出与用户对应的实验桶数据和指标对应的指标数据,从而在后续数据查询时利用分片查询能力,加快数据查询速度。

附图说明

[0057] 通过参考附图阅读下文的详细描述,本公开示例性实施方式的上述以及其他目的、特征和优点将变得易于理解。在附图中,以示例性而非限制性的方式示出了本公开的若干实施方式,其中:
[0058] 图1示意性地示出了基于双流JOIN的数据处理方案的示意图;
[0059] 图2示意性地示出了基于存储类JOIN的数据处理方案的示意图;
[0060] 图3示意性地示出了本公开提供的数据处理方案的示意图;
[0061] 图4示意性地示出了本公开提供的数据处理方法示意图;
[0062] 图5示意性地示出了本公开提供的实验数据的存储示意图;
[0063] 图6示意性地示出了本公开提供的指标数据的关联示意图;
[0064] 图7示意性地示出了本公开提供的实验桶数据的存储示意图;
[0065] 图8示意性地示出了本公开提供的介质示意图;
[0066] 图9示意性地示出了本公开提供的数据处理装置示意图;
[0067] 图10示意性地示出了本公开提供的计算设备示意图。
[0068] 在附图中,相同或对应的标号表示相同或对应的部分。

具体实施方式

[0069] 下面将参考若干示例性实施方式来描述本公开的原理和精神。应当理解,给出这些实施方式仅仅是为了使本领域技术人员能够更好地理解进而实现本公开,而并非以任何方式限制本公开的范围。相反,提供这些实施方式是为了使本公开更加透彻和完整,并且能够将本公开的范围完整地传达给本领域的技术人员。
[0070] 本领域技术人员知道,本公开的实施方式可以实现为一种系统、装置、设备、方法或计算机程序产品。因此,本公开可以具体实现为以下形式,即:完全的硬件、完全的软件(包括固件、驻留软件、微代码等),或者硬件和软件结合的形式。
[0071] 根据本公开的实施方式,提出了一种数据处理方法、计算机可读存储介质、装置和计算设备。
[0072] 在本文中,需要理解的是,附图中的任何元素数量均用于示例而非限制,以及任何命名都仅用于区分,而不具有任何限制含义。
[0073] 下面参考本公开的若干代表性实施方式,详细阐释本公开的原理和精神。
[0074] 本公开所涉及的数据可以为经用户授权或者经过各方充分授权的数据,对数据的采集、传播、使用等,均符合国家相关法律法规要求,本公开实施方式/实施例可以互相组合。
[0075] 在介绍本公开提供的实施方式之前,先介绍一些技术概念。
[0076] ETL(Extract‑Transform‑Load)是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,其目的是将用户、企业或组织中分散、数据格式不统一的数据整合到一起,从而为决策提供分析数据。
[0077] Apache Flink(简称Flink)是一个在有界数据流和无界数据流上进行有状态计算的分布式处理引擎。任何类型的数据都是作为数据流产生,常见的网站移动应用程序上的交互,所有数据都可以作为数据流产生。Flink擅长处理无边界和有界的数据集,对事件和状态的精确控制使Flink的运行能够在无限制的流上运行任何类型的应用程序,有界流有专门为固定大小数据集设计的算法和数据结构正在内部处理,从而产生出色性能。
[0078] ClickHouse(Click Stream,Data WareHouse)是一种列式存储的数据库管理系统(Database Management System,DBMS),具备在线分析处理查询(Online Analytical Processing,OLAP)能力,支持使用SQL查询并实时生成分析数据报告。对于列式存储,列的聚合、计数、求和等统计操作优于行式存储,且某一列的数据类型相同时,针对数据存储更容易数据压缩,提高压缩比重,从而更加节省存储空间。另外ClickHouse还可以将数据划分为多个分片(partition),每个分片进一步又可以划分为多个索引粒度(index granularity),然后多个CPU核心可以分别处理其中一部分来实现并行数据处理,从而极大降低了查询延时。
[0079] 在一些需要使用ETL的场景中,需要对ETL过程中产生的指标数据和实验数据进行处理,从而方便后续数据查询使用。例如,基于纯日志ETL的数据处理方案、基于双流JOIN的数据处理方案、基于存储类JOIN的数据处理方案等。
[0080] 其中,基于纯日志ETL的数据处理方案,是在指标日志中就携带了实验数据,这样数据处理不需要额外的关联成本,只用ETL解析数据即可。
[0081] 在图1所示的基于双流JOIN的数据处理方案中,不再由客户端做指标日志的关联处理,而是在分布式处理引擎(以下以Flink为例)中利用其双流JOIN能力,将指标数据和实验数据进行关联。
[0082] 在图2所示的基于存储类JOIN的数据处理方案中,为了Flink中双流JOIN对性能和计算的影响,利用外部存储支持主键upsert更新模式,实现类JOIN。例如实验实时数据流和指标实时数据流都经过Flink ETL关联一些维表后,可以使双方有相同的主键,但是不同的Value值,两个流分别更新自己对应的外部存储中的Value值即可,从而降低Flink内部的处理压力。
[0083] 在理想的环境和数据量不大的情况中,上述几种方案都可以有较好的处理结果。但是一旦加入了复杂场景,由于复杂业务需要处理更多的数据量,且可以源源不断地产生越来越多的数据,因此会导致上述几种方案出现资源不足的问题。
[0084] 基于纯日志ETL的数据处理方案,虽然能够很好的简化数据ETL的开发,但是这种方案在客户端上报指标日志的时候,就已经将指标数据和实验数据的关系绑定,无法改变。如果后期实验需要新加指标,上报的指标日志是没有新加实验的实验数据。因此该方案灵活性、扩展性较低。而且随着实验数量和指标数量增加,指标日志中大量的实验数据,不仅会增加存储成本,也会影响网络传输,从而影响整体数据链路中如计算、查询等服务的性能。
[0085] 基于双流JOIN的数据处理方案,虽然将实验数据和指标数据的关联放入到了数据计算侧,但是对于以天或者小时累计指标计算,必须将一天或者一个小时的指标和实验数据放到Flink状态存储,这样就会导致Flink状态存储和访问压力变大,导致整个数据链路的不稳定,容易出现崩溃的风险。
[0086] 基于存储类JOIN的数据处理方案,虽然利用外部存储主键upsert分别更新value值的方式,实现了双流的类关联,从而将双流JOIN的数据处理方案存在的Flink状态存储的压力转移到了外部存储。但是外部存储能够支撑的数据压力也不是无限的,由于指标可以被多个实验引用,一个实验也可以关联多个指标,这种多对多的关系以及业务指标和实验数量的增加,都会导致数据急剧膨胀,最后外部存储也可能承受不了这种高并发的写入压力,从而造成系统崩溃,影响查询等服务。
[0087] 综合上述几种方案,都是将关联动作往后推,减少前面的计算压力,但是没有从根本上解决数据量大的问题,要解决数据压力大,就需要解决数据量膨胀的问题。
[0088] 发明概述
[0089] 本公开旨在提供一种数据处理方案,针对ETL过程中生成的实验实时数据流和指标实时数据流,通过将用户标识关联实验桶列表以实现对实验实时数据流中相同用户的实验桶数据进行划分,以及将指标标识关联实验列表以实现对实时数据流中相同指标的指标数据进行划分。在数据存储时,不再存储全量数据,而是在遍历了实验桶列表和实验列表得出的交集实验桶列表后,按照用户标识分片存储交集实验桶列表中的实验桶数据。如此从根本上解决了数量膨胀的问题,有效得减少了数据量,避免出现资源不足的问题。此外通过预先划分出与用户对应的实验桶数据和指标对应的指标数据,从而在后续数据查询时利用分片查询能力,加快数据查询速度。
[0090] 在介绍了本公开的基本原理之后,下面具体介绍本公开的各种非限制性实施方式。
[0091] 应用场景总览
[0092] 首先参考图3所示的一种数据处理的流程示意图。该流程示意图可以分为三个阶段。
[0093] 第一阶段,先将实验实时数据流中的实验数据按照键值对的存储格式进行存储。在图3示出的键值对为例,可以是以用户标识(如图3中示出的userid)为主键,以实验数据中包含的实验桶数据为主键对应的值。
[0094] 第二阶段,针对分布式处理引擎(如Flink)在进行ETL过程中生成的指标实时数据流,可以根据指标标识(如图3中的指标id)关联出指标对应的实验列表;以及,根据用户标识关联出实验桶列表。
[0095] 第三阶段,遍历实验列表,在实验桶列表中查询出每个实验对应的实验桶数据,从而得出交集实验桶列表。最后根据用户标识对该交集实验桶列表进行分片进行存储,图3中给出了存储到列式存储系统的示例。
[0096] 列式存储系统是一种针对大规模数据分析和查询优化的存储方式,与传统的行式存储系统相比,列式存储系统具有更高的存储性能和效率。
[0097] 列式存储系统除了包括前述的ClickHouse,还可以包括Apache Parquet、ORC(Optimized Row Columnar)等。
[0098] 示例性方法
[0099] 下面参考图4来描述根据本公开示例性实施方式的数据处理的方法。需要注意的是,上述应用场景仅是为了便于理解本公开的精神和原理而示出,本公开的实施方式在此方面不受任何限制。相反,本公开的实施方式可以应用于适用的任何场景。
[0100] 如图4所示,所述数据处理方法,可以包括以下步骤:
[0101] 步骤410:获取ETL过程中生成的实验实时数据流和指标实时数据流。
[0102] 步骤420:将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据。
[0103] 本说明书中,上述步骤420对应了前述图3中的第一阶段。在第一阶段中,Flink可以直接处理ETL过程中生成的实验实时数据流。具体可以是以用户标识为主键(Key)、以实验桶数据为主键对应的值(value)的键值对(KV)格式构建一个实验桶列表。
[0104] 下面请结合图5所示的实验数据的存储示意图来对步骤420进行说明。
[0105] 在图5中,所述键值对中的键除了上述的用户标识userId还可以包括时间分片dt;即键值对中的键可以包括实验数据中包含的用户标识和时间分片的组合。
[0106] 如此按照userid+dt为主键,可以构建一个实验桶列表的Map结构,并写入KV存储中。时间分片dt可以是以天为单位的日期。
[0107] 因为用户每天实验都可能会变化,加上dt可以使每个主键的值不至于过于庞大,实时处理只需要维护当天进入的实验即可,也有可能第二天用户没有进入任何实验,设置Key的存储过期时间,可以有效的减少存储压力,防止某些用户一直残留在存储中。
[0108] 示例性的,如图5所示,所述实验桶数据可以包括用户标识(userId)、实验标识(expId)、实验桶标识(bucketId)、切流时间戳(m_time)、打点时间戳(t_time);
[0109] 其中,所述打点时间戳为用户开始实验的时刻。
[0110] 而所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻。也就是说m_time可以作为每一个实验的切流标志,每一次不同实验桶的流量重新划分时,这个m_time也会更新,类似于实验版本。通过m_time可以在后续分析实验效果数据时,防止切流后造成当天切流前后的效果数据互相影响。
[0111] 在图5中,HH是作为分流归因,用于标记用户在哪个小时进入了该实验。当关联指标数据后,这个HH可以作为指标数据的一个字段,同时写入ClickHouse中,这样就相当于指标归因到用户哪次进入实验,查询时两者才能对应起来。
[0112] 通过上述实验数据的存储方式,可以将每天产生的海量(可能是几十亿、几百亿甚至更多大的级别)的实验数据,以用户标识(或者用户标识+日期)的形式分流存储到实验桶列表中,从而将海量数据的一次存储变更为以用户数这一级别的分流存储。而用户数可能只有几百万、几千万的数据量。
[0113] 步骤430:将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据。
[0114] 示例性的,利用分布式处理引擎(如前述Flink)消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表。
[0115] 本说明书中,上述步骤430对应了前述图3中的第二阶段。在第二阶段中,Flink可以直接处理ETL过程中生成的指标实时数据流。具体可以是按照指标标识关联指标数据构建的实验列表。
[0116] 下面请结合图6所示的指标数据的关联示意图来对步骤430进行说明。
[0117] 在图6中,分布式处理引擎Flink消费指标实时数据流时,可以通过指标标识关联出指标对应的实验列表Array<expId>,通过userid+dt关联出当天用户进入过的实验和所在实验桶列表Map<expId+m_time:bucketId+HH>。
[0118] 步骤440:遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表。
[0119] 本说明书中,上述步骤440对应了前述图3中的第三阶段。在第三阶段中,通过FlinkUDF可以遍历实验列表,从而查询出实验桶数据构成的交集实验桶列表。
[0120] 对于前述以用户标识和时间切片为主键构建的实验列表,上述步骤440可以包括:
[0121] 遍历实验列表,查询离最近一次实验的切流时间戳;
[0122] 以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。
[0123] 下面请继续结合图6所示的指标数据的关联示意图来对步骤440进行说明。
[0124] 通过Flink UDF遍历实验列表Array<expId>,查询离当前最近的一次该实验的切流时间m_time,组装成expId+m_time在实验桶列表Map中,查询出对应的实验桶数据,和所归因到的进入实验的时间,即bucketId+HH。即以expId+m_time和bucketId+HH为查询条件进行查询。
[0125] 如果某个实验没有找到实验桶数据,就可以单独维护一个空的交集实验桶列表Array<exp>,说明在这些实验中,目前该用户还没有进入过,也可能说明实验分流数据稍微有些延迟,留给下一步处理。
[0126] 通过前面的步骤,分别根据用户标识获取到了这个用户进入过了哪些实验、哪些实验桶;根据指标标识获取了这些指标需要被哪些实验引用,然后将两者取交集,那么这样的数据会分化成两部分。一种是这些实验列表获取到了一些实验桶数据,即两个集合交集构成的交集实验桶列表。而另一种是部分实验列表,没有从实验桶列表Map中找到对应实验桶数据。
[0127] 对于查询到交集实验桶列表的部分,可以执行后续步骤450。
[0128] 步骤450:将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。
[0129] 下面请结合图7所示的实验桶数据的存储示意图来对步骤450进行说明。
[0130] 对于查询到交集实验桶列表的部分(图3中的第一部分),将所述交集实验桶列表中的实验桶数据,按照用户标识进行分片后写入列式存储系统。下面以ClickHouse这一列式存储系统为例加以说明。
[0131] 这里按照用户标识进行分片存储,再结合ClickHouse支持多个CPU核心并行数据处理的能力,从而能够加快后续的查询速度。
[0132] 示例性的,通过向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。
[0133] 在向ClickHouse进行查询时,ClickHouse中的各个节点可以聚合本地存储的与用户标识关联的实验桶数据,并算出指定指标类型(如pv、uv)的指标值;进而由ClickHouse中的最终节点汇总这些指标值即可,这样尽可能减少节点之间的网络传输的数据量,从而加快查询速度。
[0134] 而对于从实验桶列表Map中找到对应实验桶数据的部分(图3中的第二部分),本说明书还提供了以下实施例:
[0135] 响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;
[0136] 将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;
[0137] 响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0138] 将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。
[0139] 在该实施例中,对于没有从实验桶列表Map中找到对应实验桶数据的部分,如前所述可以单独维护一个空的交集实验桶列表Array<exp>。如图7所示,当前的实验列表会作为离线实验列表先存储到数据仓库Hive中,而调度任务可以定时拉取Hive中的离线实验列表,并和实时实验桶列表的实验桶数据关联,以查询实时实验桶列表中新增的与离线实验列表中实验对应的实验桶数据。如此,通过离线存储以及任务调度的方式弥补实验桶数据延迟的问题。
[0140] 示例性介质
[0141] 在介绍了本公开示例性实施方式的方法之后,接下来,参考图8对本公开示例性实施方式的介质进行说明。
[0142] 本示例性实施方式中,可以通过程序产品实现上述方法,如可以采用便携式紧凑盘只读存储器(CD‑ROM)并包括程序代码,并可以在设备,例如个人电脑上运行。然而,本公开的程序产品不限于此,在本文件中,可读存储介质可以是任何包含或存储程序的有形介质,该程序可以被指令执行系统、装置或者器件使用或者与其结合使用。
[0143] 该程序产品可以采用一个或多个可读介质的任意组合。可读介质可以是可读信号介质或者可读存储介质。可读存储介质例如可以为但不限于电、磁、光、电磁、红外线、或半导体的系统、装置或器件,或者任意以上的组合。可读存储介质的更具体的例子(非穷举的列表)包括:具有一个或多个导线的电连接、便携式盘、硬盘、随机存取存储器(RAM)、只读存储器(ROM)、可擦式可编程只读存储器(EPROM或闪存)、光纤、便携式紧凑盘只读存储器(CD‑ROM)、光存储器件、磁存储器件、或者上述的任意合适的组合。
[0144] 计算机可读信号介质可以包括在基带中或者作为载波一部分传播的数据信号,其中承载了可读程序代码。这种传播的数据信号可以采用多种形式,包括但不限于电磁信号、光信号或上述的任意合适的组合。可读信号介质还可以是可读存储介质以外的任何可读介质,该可读介质可以发送、传播或者传输用于由指令执行系统、装置或者器件使用或者与其结合使用的程序。
[0145] 可读介质上包含的程序代码可以用任何适当的介质传输,包括但不限于无线、有线、光缆、RE等等,或者上述的任意合适的组合。
[0146] 可以以一种或多种程序设计语言的任意组合来编写用于执行本公开操作的程序代码,程序设计语言包括面向对象的程序设计语言,诸如Java、C++等,还包括常规的过程式程序设计语言,诸如C语言或类似的程序设计语言。程序代码可以完全地在用户计算设备上执行、部分在用户计算设备上部分在远程计算设备上执行、或者完全在远程计算设备或服务器上执行。在涉及远程计算设备的情形中,远程计算设备可以通过任意种类的网络,包括局域网(LAN)或广域网(WAN),连接到用户计算设备,或者,可以连接到外部计算设备(例如利用因特网服务提供商来通过因特网连接)。
[0147] 综上,本公开可以提供一种计算机可读存储介质,当所述计算机可读存储介质中的指令由电子设备的处理器执行时,可以使得电子设备能够执行前述数据处理方法实施例。
[0148] 示例性装置
[0149] 在介绍了本公开示例性实施方式的介质之后,接下来,参考图9对本公开示例性实施方式的装置进行说明。
[0150] 图9示意性地示出了根据本公开实施方式的一种数据处理装置的框图,对应于前述图4所示的方法实施例。该数据处理装置可以包括:
[0151] 获取单元910,获取ETL过程中生成的实验实时数据流和指标实时数据流;
[0152] 第一存储单元920,将所述实验实时数据流中的实验数据按照键值对的存储格式存储到实验桶列表;其中,所述键值对中的键为实验数据中包含的用户标识,所述键值对的值为实验数据中包含的实验桶数据;
[0153] 生成单元930,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;其中,所述实验列表中包括具有关联的指标标识的指标数据;
[0154] 遍历单元940,遍历所述实验列表并在所述实验桶列表中查询所述实验列表中实验对应的实验桶数据,得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0155] 第二存储单元950,将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后进行存储。
[0156] 可选的,所述键值对中的键还包括实验数据中包含的用户标识和时间分片的组合。
[0157] 可选的,所述实验桶数据包括实验标识、实验桶标识、切流时间戳、打点时间戳;
[0158] 其中,所述切流时间戳为不同实验生成的实验桶数据之间的分界时刻;所述打点时间戳为用户开始实验的时刻。
[0159] 可选的,所述遍历单元940,包括:
[0160] 遍历子单元941,遍历所述实验列表,查询离最近一次实验的切流时间戳;
[0161] 查询子单元943,以所述最近一次实验的实验标识和所述切流时间戳为查询条件,在所述实验桶列表中查询对应的实验桶数据。
[0162] 可选的,所述生成单元930,进一步用于利用分布式处理引擎消费ETL过程中生成的指标实时数据流,将所述指标实时数据流中的指标数据按照指标标识,生成与所述指标标识关联的实验列表;
[0163] 所述第二存储单元950,进一步用于将所述交集实验桶列表中的实验桶数据,按照用户标识进行划分后写入列式存储系统。
[0164] 可选的,所述装置还包括:
[0165] 查询单元960,向所述列式存储系统发起查询请求,以使所述列式存储系统按照所述查询请求中指定的用户标识聚合本地具有与所述用户标识关联的实验桶数据,并进一步基于聚合的实验桶数据计算所述查询请求中指定的指标类型的指标值。
[0166] 可选的,还包括:
[0167] 创建子单元945,响应于未从所述实验桶列表中查询到对应的实验桶数据,创建空的交集实验桶列表;
[0168] 调度子单元947,将当前的实验列表作为离线实验列表存储到预设数据仓库,并启动定时的调度任务;其中,所述调度任务用于周期性拉取存储到所述预设数据仓库中的离线实验列表,并在实时存储的实验桶列表中查询是否存在与所述离线实验列表中实验对应的实验桶数据;
[0169] 写入子单元949,响应于查询到与所述离线实验列表中实验对应的实验桶数据,结束所述调度任务的部署,并将所述查询到的实验桶数据写入所述空的交集实验桶列表,以得到由所述查询到的实验桶数据构成的交集实验桶列表;
[0170] 所述第二存储单元950,还用于将所述交集实验桶列表中的实验桶数据,按照预设时长进行划分后写入列式存储系统。
[0171] 示例性计算设备
[0172] 在介绍了本公开示例性实施方式的方法、介质和装置之后,接下来,参考图10对本公开示例性实施方式的计算设备进行说明。
[0173] 图10显示的计算设备1500仅仅是一个示例,不应对本公开实施例的功能和使用范围带来任何限制。
[0174] 如图10所示,计算设备1500以通用计算设备的形式表现。计算设备1500的组件可以包括但不限于:至少一个处理单元1501、至少一个存储单元1502,连接不同系统组件(包括处理单元1501和存储单元1502)的总线1503。
[0175] 总线1503包括数据总线、控制总线和地址总线。
[0176] 存储单元1502可以包括易失性存储器形式的可读介质,例如随机存取存储器(RAM)15021和/或高速缓存存储器15022,可以进一步包括非易失性存储器形式的可读介质,例如只读存储器(ROM)15023。
[0177] 存储单元1502还可以包括具有一组(至少一个)程序模块15024的程序/实用工具15025,这样的程序模块15024包括但不限于:操作系统、一个或者多个应用程序、其它程序模块以及程序数据,这些示例中的每一个或某种组合中可能包括网络环境的实现。计算设备1500也可以与一个或多个外部设备1504(例如键盘、指向设备等)通信。
[0178] 这种通信可以通过输入/输出(I/O)接口1505进行。并且,计算设备1500还可以通过网络适配器1506与一个或者多个网络(例如局域网(LAN),广域网(WAN)和/或公共网络,例如因特网)通信。如图10所示,网络适配器1506通过总线1503与计算设备1500的其它模块通信。应当理解,尽管图中未示出,可以结合计算设备1500使用其它硬件和/或软件模块,包括但不限于:微代码、设备驱动器、冗余处理单元、外部磁盘驱动阵列、RAID系统、磁带驱动器以及数据备份存储系统等。
[0179] 通过如图10示出的计算设备1500,可以实现前述数据处理方法,更具体地,存储单元1502存储处理单元1501可执行的指令,处理单元1501执行指令时,实现前述数据处理方法。
[0180] 应当注意,尽管在上文详细描述中提及了数据处理装置的若干单元/模块或子单元/模块,但是这种划分仅仅是示例性的并非强制性的。实际上,根据本公开的实施方式,上文描述的两个或更多单元/模块的特征和功能可以在一个单元/模块中具体化。反之,上文描述的一个单元/模块的特征和功能可以进一步划分为由多个单元/模块来具体化。
[0181] 此外,尽管在附图中以特定顺序描述了本公开方法的操作,但是,这并非要求或者暗示必须按照该特定顺序来执行这些操作,或是必须执行全部所示的操作才能实现期望的结果。附加地或备选地,可以省略某些步骤,将多个步骤合并为一个步骤执行,和/或将一个步骤分解为多个步骤执行。
[0182] 虽然已经参考若干具体实施方式描述了本公开的精神和原理,但是应该理解,本公开并不限于所公开的具体实施方式,对各方面的划分也不意味着这些方面中的特征不能组合以进行受益,这种划分仅是为了表述的方便。本公开旨在涵盖所附权利要求的精神和范围内所包括的各种修改和等同布置。