1、概述
在实际私有化物联网平台项目中,部分存量设备由于异构总线、多制式以太网、协议多样化等因素导致无法直接连接物联网平台,大量数据较难集成,平台侧和设备侧面临大量定制化开发,成本较高。因此难以推动客户或设备厂商进行存量设备接入改造,导致设备无法直连物联网平台,无法达到物联网平台对企业所有设备数据进行统一纳管。
企业内部存量的数据采集系统多为“烟囱式”,各个厂商的系统只需对接自己厂商的设备即可,数据孤岛问题突出。
各“烟囱”的数据格式各不相同,定制化采集任务代码不可复用,费时费力,难以同时支撑多个项目。
除了设备数据采集外,还有业务数据采集需求,传统物联网系统只能采集设备数据而无法集成业务数据。
2、技术选型
数字集成技术通过对不同系统数据的抽取(Extract),数据清洗和转换(Transformation)以及输入最终的目标系统(Load),打通各个业务孤岛,实现数据互联互通,助力企业数字化转型。由于物联网场景下的数据处理大多都要求实时性,所以要求实现时具备实时数据处理能力。实时计算也被称作流计算,代表是Storm、Spark Streaming、Flink等大数据技术。计算引擎也在不断更新迭代,从第一代的Hadoop MapReduce,到第二代的Spark,再到第三代的Flink技术,从批处理到微批,再到真正的流式计算。
Apache Flink是一个开源的流处理框架,应用于分布式、高性能、高可用的数据流应用程序。可以处理有限数据流和无限数据,即能够处理有边界和无边界的数据流。无边界的数据流就是真正意义上的流数据,所以Flink是支持流计算的。Flink可以部署在各种集群环境,可以对各种大小规模的数据进行快速计算。
Flink框架具备强大的流式ETL的能力,依靠其丰富的算子实现。
2.1 Source算子
Flink可以使用StreamExecutionEnvironment.addSource(source)来为我们的程序添加数据来源。
Flink已经提供了若干实现好的source functions,当然也可通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source。
Flink在流处理上的source大致有4大类:
基于本地集合的source(Collection-based-source)
基于文件的source(File-based-source)- 读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回
基于网络套接字的source(Socket-based-source)- 从socket读取。元素可以用分隔符切分。
自定义的source(Custom-source)
使用自定义Source算子可实现丰富的数据抽取功能。
2.2 Transform转换算子
① map
将DataStream中的每一个元素转换为另外一个元素,如将元素x变为原来的5倍:
dataStream.map { x => x * 5 }
② FlatMap
采用一个数据元并生成零个,一个或多个数据元。如,将句子分割为单词的flatmap函数:
dataStream.flatMap { str => str.split(" ") }
③ Filter
计算每个数据元的布尔函数,并保存函数返回true的数据元。如,过滤掉零值的过滤器:
dataStream.filter { x != 0 }
当然flink还具备很多其他功能的转换算子,如KeyBy、Reduce、Aggregations等,通过丰富的转换算子,flink可实现对数据的清洗和转换功能。
2.3 Sink算子
Flink的sink算子支持将数据输出到:本地文件、本地集合、HDFS,除此之外,还支持:sink到kafka、sink到mysql、sink到redis以及自定义sink算子。
通过自定义sink算子将清洗转换完成的数据输入目标系统。
3、数字集成实现
实现过程如下:
第一步,抽象定义基础控件类
数字集成基于flink可抽象定义3类基础功能控件,每类控件又可根据不同的功能实现具体的子类功能控件;详细如下:
基础功能控件分为三类:数据源控件、数据输出控件、数据处理控件。
数据源控件:将Source算子抽象定义成具备抽取数据功能的数据源控件类,并制定相应的配置规范,使用时只需根据规范配置文件,系统根据配置文件创建具体的实例化对象,实现数据抽取功能;
数据操作控件:根据不同的基础功能需求将Transform算子抽象成数据处理控件类,制定相应的配置规范,使用时只需根据规范配置文件,系统根据配置创建相应的实例化对象实现数据处理功能;
数据输出控件:将Sink算子抽象成数据输出控件类,制定相应的配置规范,使用时只需根据规范配置文件,系统根据配置创建实例化对象实现数据输出功能。
同时系统内部明确定义flink算子之间流转的数据格式作为内部流转数据格式以及根据配置输出每个基础功能控件输出的数据格式。
第二步,根据抽象定义的基础功能控件,制定具体配置规范
基础功能控件规范如下:
通过以上两步规范定义后,在同一个系统中,同一个处理过程只需要定义一个基础功能控件规范。如Kafka消费者所需的配置如Kafka集群地址、消费群组、数据所在topic、数据所在分区key,消费位置等,只需要规定上述举例这样一个Kafka消费控件并开发实现,该控件就可以在该系统中复用,每次配置的数据处理工作流,复用Kafka消费控件类并根据新配置的源系统提供的Kafka集群地址、数据所在topic等配置即可实例化该工作流所需的kafka 消费者,实现过程从开发无数次Kafka Consumer的代码变为实现一次Kafka Consumer控件代码,大量节省开发时间和开发成本。
第三步,通过对基本功能的抽象,实现如HTTP请求、kafka生产、数据遍历、条件循环、数据映射、MySQL写操作等基础功能控件并实现,再根据各个基础功能运行的先后逻辑组装相应配置执行脚本来编排组建成一个完整flink流处理链路,即可完成不同系统间的数据集成功能。
如在私有化项目中有将设备厂商云平台中智能门锁状态信息同步至自有云平台进行智能门锁控制的需求,由于智能门锁设备协议与自有物联网平台数据采集协议不适配,无法直连,由设备厂商云平台提供智能门锁状态信息推送功能,由自有物联网平台提供推送数据接收接口,完成智能门锁状态信息的同步功能。
在此案例中,通过flink框架的自定义Source算子实现HTTP POST功能接口的HTTP监听控件完成设备厂商云平台的推送数据接收功能,将接收到的智能门锁状态信息根据智能门锁ID、状态status与自有云平台存储的状态进行比较的IF分支控件,将存在状态变化的智能门锁状态信息数据向后序Sink算子流转,通过自定义Sink算子实现自有云平台数据上传功能,完成智能门锁状态信息的跨平台更新功能。
第四步,根据组建好的执行逻辑生成有向无环图,提交Flink运行,具体如下:
通过对不同的基础功能控件,基于有向无环图,将基础功能控件放入有向无环图的顶点,其中整个图中只有一个数据源控件,且无其他基础功能控件可以将数据传输给它;数据输出控件和数据操作控件可以多个,对应多条分支处理逻辑。将数据传输方向作为有向无环图的边,以此连接和组织跨系统数据传输过程中针对数据的不同逻辑顺序,生成一条完整的数据传输处理链路,将此图完整实现,提交flink执行,即可实现完整的数据抽取、转换以及输出的数字集成功能。
4、总结
最后我们来总结下基于Flink的数字集成能力的实现。得益于flink在ETL数据集成上的丰富能力以及算子之间易于处理的基础功能,我们将flink的3类算子进行抽象定义实现3类基础功能控件,实现不同的数据处理过程。根据不同的功能需求,通过Source算子实现从消息队列、API、数据库等多种数据源抽取数据的功能;通过丰富的Transform算子实现数据的清洗、筛选、转换的功能;最后可通过Sink算子实现将目标格式数据输入目标系统接收数据的渠道如消息队列、数据库、API等。综上所述,基于Flink的数字集成能力是可以实现并且具备丰富功能和可扩展性的。