本公开涉及数据同步,本公开特别涉及一种消息同步方法、电子设备、介质及计算机程序产品。
背景技术:
1、流式消息是随时间延续而无线增长的动态数据集合,例如kafka消息队列系统,主要用于网络监控、气象测控等实时查询的领域。批式消息是一个相对静止的数据集合,例如mysql数据库,可支持对某一个时间窗口的数据展示。相较于流式消息,批式消息更适用于聚合查询。
2、相关技术中,在执行聚合查询时,主要通过引入消息队列的工具包进行开发,即对工具包的api(application program interface,应用程序接口)进行二次开发,以实现对支持流式消息的消息队列系统的改进,使其能够执行聚合查询。或者,利用分布式流式消息处理框架spark streaming,将实时流式消息视作一系列非常小的“批”,以接近实时处理效果,实现对消息的聚合查询。再或者,利用开源流处理框架flink对实时的流式消息进行聚合计算,以实现对消息的聚合查询。
3、但是,对工具包进行开发时,有一定的代码开发量,且消息交换效率受到开发者代码水平的影响;利用spark streaming对实时流式消息进行处理时,数据吞吐量较小,且具有较长的延时;而flink则具有一定的学习门槛,处理流式消息时,存在一定的代码开发量,可复用性低。
4、另外,相关技术的消息同步过程中,代码耦合高,元清洗逻辑无法复用,随之产生了很多重复的机械性开发工作。
技术实现思路
1、本公开提供了一种消息同步方法、电子设备、介质及计算机程序产品。
2、根据本公开的一个方面,提供一种消息同步方法,可包括:将从消息队列的各个消息分区中拉取的消息映射至对应的缓冲队列中缓存;响应于缓冲队列中存在至少一个消息,利用元清洗插件对缓冲队列中的消息进行清洗,获得目标消息;根据各个目标消息的属性值,对目标消息进行拼接处理,并将包含多个目标消息的拼接结果存储至存储引擎。
3、在一些实施方式中,响应于缓冲队列中存在至少一个消息,利用元清洗插件对缓冲队列中的消息进行清洗,获得目标消息,可包括:响应于缓冲队列中存在至少一个消息,确定配置文件中指定的清洗类型;根据清洗类型,筛选对应于清洗类型的至少一个元清洗插件;以及利用至少一个元清洗插件依序对各个消息进行清洗,获得目标消息。
4、在一些实施方式中,根据目标消息的属性值,对目标消息进行拼接处理,并将包含多个目标消息的拼接结果存储至存储引擎,可包括:识别目标消息的属性值,其中,属性值用于表征目标消息中文档内容的状态;对目标消息执行属性值对应的调整指令;将执行完调整指令的各个目标消息进行拼接,获得包含多个目标消息的拼接结果;以及将拼接结果存储至存储引擎。
5、在一些实施方式中,对目标消息执行属性值对应的调整指令,可包括:响应于属性值为空值,对目标消息执行删除操作;和/或响应于属性值为有效值,对目标消息执行插入操作或者更新操作。
6、在一些实施方式中,在将从消息队列的各个消息分区中拉取的消息映射至对应的缓冲队列中缓存之前,可包括:根据各个消息的属性特征对消息队列进行分区,获得多个消息分区,其中落入同一消息分区的各个消息具有相同的属性特征。
7、在一些实施方式中,在将从消息队列的各个消息分区中拉取的消息映射至对应的缓冲队列中缓存之前,可包括:获取具备目标结构的消息。
8、在一些实施方式中,获取具备目标结构的消息,可包括:读取数据同步任务的配置文件和状态;响应于状态为运行状态的数据同步任务,定位数据同步任务指定的消息队列;启动多个采集进程,分别对消息队列的各个消息分区中的消息进行采集;以及对各个消息进行格式化操作,以获得具备目标结构的消息。
9、在一些实施方式中,在对各个消息进行格式化操作,以获得具备目标结构的消息之后,可包括:获取消息在消息队列中的位置信息,位置信息包括消息分区的地址、消息分区的名称以及消息在消息分区中的存储位;根据位置信息,确定与消息相邻的待处理消息的位置;以及根据待处理消息的位置,对待处理消息进行采集。
10、在一些实施方式中,消息同步方法还可包括:响应于数据同步任务的配置文件的变更操作,生成加载配置指令。
11、根据本公开的另一个方面,提供这样一种电子设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,处理器执行程序时,以实现如上述实施方式中任一项的消息同步方法。
12、根据本公开的又一个方面,提供这样一种可读存储介质,可读存储介质存储有计算机程序,计算机程序适于处理器进行加载,以执行如上述实施方式中任一项的消息同步方法。
13、根据本公开的再一个方面,提供这样一种计算机程序产品,包括计算机程序/指令,计算机程序/指令被处理器执行时实现上述实施方式中任一项的消息同步方法。
1.一种消息同步方法,其特征在于,包括:
2.根据权利要求1所述的消息同步方法,其特征在于,所述响应于所述缓冲队列中存在至少一个所述消息,利用元清洗插件对所述缓冲队列中的所述消息进行清洗,获得目标消息,包括:
3.根据权利要求1或2所述的消息同步方法,其特征在于,所述根据所述目标消息的属性值,对所述目标消息进行拼接处理,并将包含多个所述目标消息的拼接结果存储至存储引擎,包括:
4.根据权利要求3所述的消息同步方法,其特征在于,所述对所述目标消息执行所述属性值对应的调整指令,包括:
5.根据权利要求1所述的消息同步方法,其特征在于,在所述将从消息队列的各个消息分区中拉取的消息映射至对应的缓冲队列中缓存之前,包括:
6.根据权利要求1所述的消息同步方法,其特征在于,在所述将从消息队列的各个消息分区中拉取的消息映射至对应的缓冲队列中缓存之前,包括:
7.根据权利要求6所述的消息同步方法,其特征在于,所述获取具备目标结构的所述消息,包括:
8.根据权利要求7所述的消息同步方法,其特征在于,在所述对各个所述消息进行格式化操作,以获得具备目标结构的所述消息之后,包括:
9.一种电子设备,其特征在于,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述程序时,以实现如权利要求1至8中任一项所述的消息同步方法。
10.一种可读存储介质,其特征在于,所述可读存储介质存储有计算机程序,所述计算机程序适于处理器进行加载,以执行如权利要求1至8中任一项所述的消息同步方法。