Flume+Kafka+Flink+Redis构建大数据实时处理系统(PV、UV)
发布网友
发布时间:2024-10-02 11:43
我来回答
共1个回答
热心网友
时间:2024-10-07 03:57
大数据处理的常用方法目前流行两种:离线处理和在线实时流处理。在互联网应用中,无论是哪一种处理方式,基本数据来源都是日志数据,例如web应用的用户访问日志、点击日志等。
大数据处理目前流行的是离线处理和在线处理,基本处理架构如下:
对于数据分析结果时间要求严格的,可以采用在线实时处理方式。例如使用Flink、SparkStreaming进行处理。例如天猫双十一的成交额实时动态更新,就需要采用在线处理。接下来介绍实时数据处理方式,即基于Flink的在线处理,在以下完整案例中,我们将完成以下几项工作:
需要注意的是,本案例核心在于如何构建实时处理系统,让我们对大数据实时处理系统有一个基本的、清晰的了解与认识。
实时处理系统整体架构如下:
从以上架构可以看出,其由以下三个重要组成部分:
从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同集群系统之间打通,即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Flink的整合。当然,各个环境是否使用集群,依个人实际需要而定,在我们的环境中,Flume、Kafka、Flink都使用集群。
对于Flume而言,关键在于如何采集数据,并且将其发送到Kafka上,由于我们这里使用Flume集群的方式,Flume集群的配置也是十分关键的。对于Kafka,关键就是如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。
在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。
对于Kafka而言,关键在于如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。
在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。
在Kafka中,先创建一个topic,用于后面接收Flume采集过来的数据:
Flink提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafka topic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。
Flink的kafka consumer叫做FlinkKafkaConsumer08(对于Kafka 0.9.0.X来说是09 等),它提供了对一个或多个Kafka topic的访问。
Flink Kafka Consumer08、09等的构造函数接收以下参数:
1、topic名称或者名称列表
2、反序列化来自kafka的数据的DeserializationSchema/Keyed Deserialization Schema
3、Kafka consumer的一些配置,下面的配置是必需的: "bootstrap.servers"(以逗号分隔的Kafka brokers列表) "zookeeper.connect"(以逗号分隔的Zookeeper 服务器列表) "group.id"(consumer组的id)
例如:
Java 代码:
Scala 代码:
当前FlinkKafkaConsumer的实现会建立一个到Kafka客户端的连接来查询topic的列表和分区。
为此,consumer需要能够访问到从提交Job任务的服务器到Flink服务器的consumer,如果你在客户端遇到任何Kafka Consumer的问题,你都可以在客户端日志中看到关于请求失败的日志。
Flink Kafka Consumer将会从一个topic中消费记录并以一致性的方式周期性地检查所有Kafka偏移量以及其他操作的状态。Flink将保存流程序到状态的最新的checkpoint中,并重新从Kafka中读取记录,记录从保存在checkpoint中的偏移位置开始读取。
checkpoint的时间间隔定义了程序在发生故障时可以恢复多少。
同时需要注意的是Flink只能在有足够的slots时才会去重启topology,所以如果topology由于TaskManager丢失而失败时,任然需要有足够的slot可用。Flink on YARN支持YARN container丢失自动重启。
所谓Flink和Redis的整合,指的是在我们的实时处理系统中的数据的落地方式,即在Flink中包含了我们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有很多种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,我们使用Redis来进行最后分析数据的存储。
所以实际上做这一步的整合,其实就是开始写我们的业务处理代码了,因为通过前面Flume-Kafka-FLink的整合,已经打通了整个数据的流通路径,接下来关键要做的是,在Flink中,如何处理我们的数据并保存到Redis中。
Flink自带的connector提供了一种简洁的写入Redis的方式,只需要在项目中加入下面的依赖即可实现。
兼容版本:Redis 2.8.5 注意:Flink的connector并不是Flink的安装版本,需要写入用户的jar包并上传才能使用。
数据可视化处理目前我们需要完成两部分的工作:
对于Web项目的开发,因个人技术栈能力而异,选择的语言和技术也有所不同,只要能够达到我们最终数据可视化的目的,其实都行的。这个项目中我们要展示的是pv和uv数据,难度不大,因此可以选择Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我个人非常喜欢,因为开发非常快,但因为前面一直用的是Java,因此这里我还是选择使用SpringMVC来完成。
至于UI这一块,我前端能力一般,普通的开发没有问题,但是要做出像上面这种地图类型的UI界面来展示数据的话,确实有点*为力。好在现在第三方的UI框架比较多,对于图表类展示的,比如就有highcharts和echarts,其中echarts是百度开源的,有丰富的中文文档,非常容易上手,所以这里我选择使用echarts来作为UI,并且其刚好就有能够满足我们需求的地图类的UI组件。
对于页面数据的动态刷新有两种方案,一种是定时刷新页面,另外一种则是定时向后端异步请求数据。
目前我采用的是第一种,页面定时刷新,有兴趣的同学也可以尝试使用第二种方法,只需要在后端开发相关的返回JSON数据的API即可。
至此,从整个大数据实时处理系统的构建到最后的数据可视化处理工作,我们都已经完成了,可以看到整个过程下来涉及到的知识层面还是比较多的,不过我个人觉得,只要把核心的原理牢牢掌握了,对于大部分情况而言,环境的搭建以及基于业务的开发都能够很好地解决。