问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

Flume+Kafka+Flink+Redis构建大数据实时处理系统(PV、UV)

发布网友 发布时间:2024-10-02 11:43

我来回答

1个回答

热心网友 时间:2024-10-07 03:57

大数据处理的常用方法目前流行两种:离线处理和在线实时流处理。在互联网应用中,无论是哪一种处理方式,基本数据来源都是日志数据,例如web应用的用户访问日志、点击日志等。

大数据处理目前流行的是离线处理和在线处理,基本处理架构如下:

对于数据分析结果时间要求严格的,可以采用在线实时处理方式。例如使用Flink、SparkStreaming进行处理。例如天猫双十一的成交额实时动态更新,就需要采用在线处理。接下来介绍实时数据处理方式,即基于Flink的在线处理,在以下完整案例中,我们将完成以下几项工作:

需要注意的是,本案例核心在于如何构建实时处理系统,让我们对大数据实时处理系统有一个基本的、清晰的了解与认识。

实时处理系统整体架构如下:

从以上架构可以看出,其由以下三个重要组成部分:

从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同集群系统之间打通,即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Flink的整合。当然,各个环境是否使用集群,依个人实际需要而定,在我们的环境中,Flume、Kafka、Flink都使用集群。

对于Flume而言,关键在于如何采集数据,并且将其发送到Kafka上,由于我们这里使用Flume集群的方式,Flume集群的配置也是十分关键的。对于Kafka,关键就是如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。

在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。

对于Kafka而言,关键在于如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。

在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。

在Kafka中,先创建一个topic,用于后面接收Flume采集过来的数据:

Flink提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafka topic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。

Flink的kafka consumer叫做FlinkKafkaConsumer08(对于Kafka 0.9.0.X来说是09 等),它提供了对一个或多个Kafka topic的访问。

Flink Kafka Consumer08、09等的构造函数接收以下参数:

1、topic名称或者名称列表

2、反序列化来自kafka的数据的DeserializationSchema/Keyed Deserialization Schema

3、Kafka consumer的一些配置,下面的配置是必需的: "bootstrap.servers"(以逗号分隔的Kafka brokers列表) "zookeeper.connect"(以逗号分隔的Zookeeper 服务器列表) "group.id"(consumer组的id)

例如:

Java 代码:

Scala 代码:

当前FlinkKafkaConsumer的实现会建立一个到Kafka客户端的连接来查询topic的列表和分区。

为此,consumer需要能够访问到从提交Job任务的服务器到Flink服务器的consumer,如果你在客户端遇到任何Kafka Consumer的问题,你都可以在客户端日志中看到关于请求失败的日志。

Flink Kafka Consumer将会从一个topic中消费记录并以一致性的方式周期性地检查所有Kafka偏移量以及其他操作的状态。Flink将保存流程序到状态的最新的checkpoint中,并重新从Kafka中读取记录,记录从保存在checkpoint中的偏移位置开始读取。

checkpoint的时间间隔定义了程序在发生故障时可以恢复多少。

同时需要注意的是Flink只能在有足够的slots时才会去重启topology,所以如果topology由于TaskManager丢失而失败时,任然需要有足够的slot可用。Flink on YARN支持YARN container丢失自动重启。

所谓Flink和Redis的整合,指的是在我们的实时处理系统中的数据的落地方式,即在Flink中包含了我们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有很多种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,我们使用Redis来进行最后分析数据的存储。

所以实际上做这一步的整合,其实就是开始写我们的业务处理代码了,因为通过前面Flume-Kafka-FLink的整合,已经打通了整个数据的流通路径,接下来关键要做的是,在Flink中,如何处理我们的数据并保存到Redis中。

Flink自带的connector提供了一种简洁的写入Redis的方式,只需要在项目中加入下面的依赖即可实现。

兼容版本:Redis 2.8.5 注意:Flink的connector并不是Flink的安装版本,需要写入用户的jar包并上传才能使用。

数据可视化处理目前我们需要完成两部分的工作:

对于Web项目的开发,因个人技术栈能力而异,选择的语言和技术也有所不同,只要能够达到我们最终数据可视化的目的,其实都行的。这个项目中我们要展示的是pv和uv数据,难度不大,因此可以选择Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我个人非常喜欢,因为开发非常快,但因为前面一直用的是Java,因此这里我还是选择使用SpringMVC来完成。

至于UI这一块,我前端能力一般,普通的开发没有问题,但是要做出像上面这种地图类型的UI界面来展示数据的话,确实有点*为力。好在现在第三方的UI框架比较多,对于图表类展示的,比如就有highcharts和echarts,其中echarts是百度开源的,有丰富的中文文档,非常容易上手,所以这里我选择使用echarts来作为UI,并且其刚好就有能够满足我们需求的地图类的UI组件。

对于页面数据的动态刷新有两种方案,一种是定时刷新页面,另外一种则是定时向后端异步请求数据。

目前我采用的是第一种,页面定时刷新,有兴趣的同学也可以尝试使用第二种方法,只需要在后端开发相关的返回JSON数据的API即可。

至此,从整个大数据实时处理系统的构建到最后的数据可视化处理工作,我们都已经完成了,可以看到整个过程下来涉及到的知识层面还是比较多的,不过我个人觉得,只要把核心的原理牢牢掌握了,对于大部分情况而言,环境的搭建以及基于业务的开发都能够很好地解决。
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
宝宝被蜱虫叮咬后怎么办? 宝宝被蜱虫咬后该怎么办? 宝宝被蜱虫咬了,要怎么办啊? 宝宝被蜱虫咬了该怎么办? 携程旅行飞机票怎么退 携程旅行退机票手续费介绍 苹果七用ios10.0.01为什么打字 翻照片都炒鸡卡?刚刚买了几天 而且是1... 苹果7怎么老是感觉一顿一顿的 有没有人把苹果手机系统换成ios10之后,老死 QQ会员怎样成为VIP2 已知函数f(x)=Asin(ωx+φ)(A>0,ω>0,0<φ<π,),其导函数y=f‘(x)的... 已知函数f(x)=Asin(wx+φ)(其中A>0,w>0,0<φ<π/2)的图象如图所示_百度... 已知函数f(x)= Asin(wx+φ)(x属于R,A>0,w>0,|φ|<π/2)的部分图象如图... 为什么 我16岁时候办的第二代身份证有效期是5年,第二次办的是10年。 已知函数f(x)=Acos(ωx+φ)(A>0,ω>0,0<φ<π)为奇函数,该函数的部分... ...φ) (x∈R,A>0,w>0,0<φ<π/2)的部分图像如图所示。 已知函数f(x)=Asin(ωx+φ)(A>0,φ>0,|φ|<π/2)的部分函数图象如图所 ... 哪些竹子会开花 南航包括什么航 腿关节积液怎么办 解梦 我没女朋友 却梦见我和女朋友分手 非常痛苦 从不喝酒的我 喝了... 求问 这是什么虫子?咬人不?在家里发现了它的尸首 感觉好恶心 请问现在适合买什么样的属于潜力股的邮票? 这是什么虫子?好恶心啊! 庚申年庚申年:猴票发行与投资 猴票的真假辨别 小米手机的电池健康度怎么查询? 怎样查电池寿命 成都博成知识产权代理有限责任公司企业产品与服务 有一艘质量为2.6x10(6次方)kg的运输船,在一次海难中沉入海底,打捞船利 ... flume客户端支持不同服务器同一日志文件名 数据中台技术架构简述 macos解压命令是什么? Apple Watch Series 9手表的起售价是多少? 求花美男连锁恐怖袭击事件下载 ...是6个螺栓但用了12角的套筒会出现什么问题? 《全面战争·三国》攻略(五)心中的三国游戏 机械革命x6s和t6s外观一样吗,性能差多少 机械革命MR X6S-LE01有用过的吗?建议买吗? 机械革命MR X6S-M 总共有几个固态硬盘的接口,接口类型是什么_百度知 ... oppo find 7去日本是否可以使用 oppofind7轻装版出泰国是否可以使用? ...则需赛多少场次?如果是单循环比赛,一共要多少场?求解答 系统重装后,请问装什么杀毒防毒工具最好 NOD32 金山 卡巴哪个好一点? 什么杀毒软件完全杀到毒 (专科生)想问一下,这个全媒体广告策划与营销究竟是干什么 关于卡巴和ewido 我用的是金山毒霸6,必须充值才能升级,不知大家都用哪种杀毒工具,哪种用... 请ewido的高手进来帮我解答一下!!