问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

如何用flume实时获取mysql数据

发布网友 发布时间:2022-05-02 02:13

我来回答

2个回答

懂视网 时间:2022-05-02 06:34

 
  1. use test;  
  2.   
  3. create table  wlslog    
  4. (id         int not null,  
  5.  time_stamp varchar(40),  
  6.  category   varchar(40),  
  7.  type       varchar(40),  
  8.  servername varchar(40),  
  9.  code       varchar(40),  
  10.  msg        varchar(40),  
  11.  primary key ( id )  
  12. );  
  13.   
  14. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,‘apr-8-2014-7:06:16-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to standby‘);  
  15. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,‘apr-8-2014-7:06:17-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to starting‘);  
  16. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,‘apr-8-2014-7:06:18-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to admin‘);  
  17. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,‘apr-8-2014-7:06:19-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to resuming‘);  
  18. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,‘apr-8-2014-7:06:20-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000361‘,‘started weblogic adminserver‘);  
  19. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,‘apr-8-2014-7:06:21-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to running‘);  
  20. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  21. commit;  

2. 建立相关目录与文件

(1)创建本地状态文件
[plain] view plain copy  
  1. mkdir -p /var/lib/flume  
  2. cd /var/lib/flume  
  3. touch sql-source.status  
  4. chmod -R 777 /var/lib/flume  

(2)建立HDFS目标目录
[plain] view plain copy  
  1. hdfs dfs -mkdir -p /flume/mysql  
  2. hdfs dfs -chmod -R 777 /flume/mysql  

3. 准备JAR包

        从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。
[plain] view plain copy  
  1. cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/  
        将MySQL JDBC驱动JAR包也复制到Flume库目录。
[plain] view plain copy  
  1. cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar  

4. 建立HAWQ外部表

[sql] view plain copy  
  1. create external table ext_wlslog  
  2. (id         int,  
  3.  time_stamp varchar(40),  
  4.  category   varchar(40),  
  5.  type       varchar(40),  
  6.  servername varchar(40),  
  7.  code       varchar(40),  
  8.  msg        varchar(40)  
  9. ) location (‘pxf://mycluster/flume/mysql?profile=hdfstextmulti‘) format ‘csv‘ (quote=e‘"‘);   

5. 配置Flume

        在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:
[plain] view plain copy  
  1. agent.channels.ch1.type = memory  
  2. agent.sources.sql-source.channels = ch1  
  3. agent.channels = ch1  
  4. agent.sinks = HDFS  
  5.   
  6. agent.sources = sql-source  
  7. agent.sources.sql-source.type = org.keedio.flume.source.SQLSource  
  8.   
  9. agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test  
  10. agent.sources.sql-source.user = root  
  11. agent.sources.sql-source.password = 123456  
  12. agent.sources.sql-source.table = wlslog  
  13. agent.sources.sql-source.columns.to.select = *  
  14.   
  15. agent.sources.sql-source.incremental.column.name = id  
  16. agent.sources.sql-source.incremental.value = 0  
  17.   
  18. agent.sources.sql-source.run.query.delay=5000  
  19.   
  20. agent.sources.sql-source.status.file.path = /var/lib/flume  
  21. agent.sources.sql-source.status.file.name = sql-source.status  
  22.   
  23. agent.sinks.HDFS.channel = ch1  
  24. agent.sinks.HDFS.type = hdfs  
  25. agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql  
  26. agent.sinks.HDFS.hdfs.fileType = DataStream  
  27. agent.sinks.HDFS.hdfs.writeFormat = Text  
  28. agent.sinks.HDFS.hdfs.rollSize = 268435456  
  29. agent.sinks.HDFS.hdfs.rollInterval = 0  
  30. agent.sinks.HDFS.hdfs.rollCount = 0  
        Flume在flume.conf文件中指定Source、Channel和Sink相关的配置,各属性描述如表1所示。

 

属性

描述

agent.channels.ch1.type

Agent的channel类型

agent.sources.sql-source.channels

Source对应的channel名称

agent.channels

Channel名称

agent.sinks

Sink名称

agent.sources

Source名称

agent.sources.sql-source.type

Source类型

agent.sources.sql-source.connection.url

数据库URL

agent.sources.sql-source.user

数据库用户名

agent.sources.sql-source.password

数据库密码

agent.sources.sql-source.table

数据库表名

agent.sources.sql-source.columns.to.select

查询的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

发起查询的时间间隔,单位是毫秒

agent.sources.sql-source.status.file.path

状态文件路径

agent.sources.sql-source.status.file.name

状态文件名称

agent.sinks.HDFS.channel

Sink对应的channel名称

agent.sinks.HDFS.type

Sink类型

agent.sinks.HDFS.hdfs.path

Sink路径

agent.sinks.HDFS.hdfs.fileType

流数据的文件类型

agent.sinks.HDFS.hdfs.writeFormat

数据写入格式

agent.sinks.HDFS.hdfs.rollSize

目标文件轮转大小,单位是字节

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件

agent.sinks.HDFS.hdfs.rollCount

当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

 

表1


6. 运行Flume代理

        保存上一步的设置,然后重启Flume服务,如图2所示。
技术分享 图2
        重启后,状态文件已经记录了将最新的id值7,如图3所示。
技术分享 图3
        查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。
技术分享 图4
        查询HAWQ外部表,结果也有全部7条数据,如图5所示。
技术分享 图5
        至此,初始数据抽取已经完成。

7. 测试准实时增量抽取

        在源表中新增id为8、9、10的三条记录。
[sql] view plain copy  
  1. use test;  
  2. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  3. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  4. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  5. commit;  
        5秒之后查询HAWQ外部表,从图6可以看到,已经查询出全部10条数据,准实时增量抽取成功。
技术分享 图6

五、方案优缺点

        利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。
        这种方案的缺点与其优点一样突出,主要体现在以下几方面。
  • 在源库上执行了查询,具有入侵性。
  • 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
  • 只能识别新增数据,检测不到删除与更新。
  • 要求源库必须有用于表示增量的字段。
  •         即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

    参考:

    Flume架构以及应用介绍
    Streaming MySQL Database Table Data to HDFS with Flume
    how to read data from oracle using FLUME to kafka broker
    https://github.com/keedio/flume-ng-sql-source   v

     

    利用Flume将MySQL表数据准实时抽取到HDFS

    标签:avr   服务   java   种类   post   into   复制   ora   更新   

    热心网友 时间:2022-05-02 03:42

    写一个SOCKET服务器,取出数据库里所有的票,根据客户端买票减去。并通知所有客户端 客户端也是SOCKET连上服务器,等服务器的更新信息 这个和聊天差不多原理。
    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    容易发财的房地产中介公司名字 我想对自己的房子进行房产评估,去哪里办理,需要... 改字体的时候不小心改错了,全变成格仔了怎么办 手机文件不小心改错名字了怎么办 女生每天早晚各跑步一个小时,配合饮食,一个月瘦多少? 每天早晚各慢跑一小时.一个月能瘦多少斤 爱剪辑怎么去水印 给视频去水印方法 游戏本关机时可以晃吗? 笔记本玩游戏CPU温度高,会花屏 打剑网三一小时声音大,发热,还能继续吗? 用需要收费的beat做自制视频的BGM算侵权吗 自己做的视频用网络上的音乐做背景音乐,但是发布时不赚钱,算侵权吗? 诺华面料公司给哪些大品牌供货 西服进口面料高档品牌 中国经编面料十大知名品牌是哪些公司。 什么面料品牌 JHRJ布料是什么品牌? 什么是奥丝布?? 棉布布料品牌有哪些 布料都有哪些知名品牌类? 温州注册代理记账公司需要什么手续 温州公司怎么注册 在当当里买了书,用的是微信支付。后来取消订单后,为什么微信里的钱却未退还? 本人想去淘宝买东西但未满16岁没有银行卡怎么才可以买东西, 用微信零钱支付的当当,取消订单后钱将退回哪里 那些购物软件可以微信支付 《和平精英》里哪四把武器是来自中国的? 如何评价鲁格Mini14自动步枪呢? 和平精英迷你两枪能击倒三级头吗? 绝地求生里面,5.56子弹的枪有什么(全部说出来) 你觉得有哪些项目可以和辅导班合作? LM7805稳压芯片稳压后的电压为什么驱动不了adc0832和单片机芯片工作? 求求讲解lm7805芯片内部详细 为什么LM7805芯片很烫 请解释一下三端稳压器芯片(例如LM7805)中的参数Junction-case(结至环境热阻)? 英文名字,女的 LM7805最大输入电压是多少?是根据什么计算的? 求《魔鬼情人》2002年免费高清百度云资源,康妮·尼尔森主演的 全波整流没有中间抽头线能用lm7805吗? 求 白莲花度假村,2021年穆雷·巴特利特、康妮·布里登、詹妮佛·库里奇主演的美国电视剧免费百度云资源? LM2575与LM7805做辅助电源那个效率高 求丈夫的秘密1932年百度网盘在线观看资源,克拉克·盖博主演的 康艳丽翻译成英文怎么写 我要取英文名字我叫刘婷,女 女生穿黄色衣服时,会让她显得更白还是更黑? 黄颜色衣服显白还是黑 夏天穿黄颜色衣服是不是很招蚊子啊?为什么? “日值红纱,大事勿用”是什么意思? 黄颜色衣服显肤色黑吗 选了一个结婚日子,老黄历上说【日值红纱 大事勿用】什么意思啊,这天结婚好吗_百度问一问