问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

请教flume如何将数据写入HDFS-Hadoop和大数据技术

发布网友 发布时间:2022-04-10 01:15

我来回答

2个回答

懂视网 时间:2022-04-10 05:37

 
  1. use test;  
  2.   
  3. create table  wlslog    
  4. (id         int not null,  
  5.  time_stamp varchar(40),  
  6.  category   varchar(40),  
  7.  type       varchar(40),  
  8.  servername varchar(40),  
  9.  code       varchar(40),  
  10.  msg        varchar(40),  
  11.  primary key ( id )  
  12. );  
  13.   
  14. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,‘apr-8-2014-7:06:16-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to standby‘);  
  15. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,‘apr-8-2014-7:06:17-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to starting‘);  
  16. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,‘apr-8-2014-7:06:18-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to admin‘);  
  17. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,‘apr-8-2014-7:06:19-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to resuming‘);  
  18. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,‘apr-8-2014-7:06:20-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000361‘,‘started weblogic adminserver‘);  
  19. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,‘apr-8-2014-7:06:21-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000365‘,‘server state changed to running‘);  
  20. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  21. commit;  

2. 建立相关目录与文件

(1)创建本地状态文件
[plain] view plain copy  
  1. mkdir -p /var/lib/flume  
  2. cd /var/lib/flume  
  3. touch sql-source.status  
  4. chmod -R 777 /var/lib/flume  

(2)建立HDFS目标目录
[plain] view plain copy  
  1. hdfs dfs -mkdir -p /flume/mysql  
  2. hdfs dfs -chmod -R 777 /flume/mysql  

3. 准备JAR包

        从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。
[plain] view plain copy  
  1. cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/  
        将MySQL JDBC驱动JAR包也复制到Flume库目录。
[plain] view plain copy  
  1. cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar  

4. 建立HAWQ外部表

[sql] view plain copy  
  1. create external table ext_wlslog  
  2. (id         int,  
  3.  time_stamp varchar(40),  
  4.  category   varchar(40),  
  5.  type       varchar(40),  
  6.  servername varchar(40),  
  7.  code       varchar(40),  
  8.  msg        varchar(40)  
  9. ) location (‘pxf://mycluster/flume/mysql?profile=hdfstextmulti‘) format ‘csv‘ (quote=e‘"‘);   

5. 配置Flume

        在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:
[plain] view plain copy  
  1. agent.channels.ch1.type = memory  
  2. agent.sources.sql-source.channels = ch1  
  3. agent.channels = ch1  
  4. agent.sinks = HDFS  
  5.   
  6. agent.sources = sql-source  
  7. agent.sources.sql-source.type = org.keedio.flume.source.SQLSource  
  8.   
  9. agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test  
  10. agent.sources.sql-source.user = root  
  11. agent.sources.sql-source.password = 123456  
  12. agent.sources.sql-source.table = wlslog  
  13. agent.sources.sql-source.columns.to.select = *  
  14.   
  15. agent.sources.sql-source.incremental.column.name = id  
  16. agent.sources.sql-source.incremental.value = 0  
  17.   
  18. agent.sources.sql-source.run.query.delay=5000  
  19.   
  20. agent.sources.sql-source.status.file.path = /var/lib/flume  
  21. agent.sources.sql-source.status.file.name = sql-source.status  
  22.   
  23. agent.sinks.HDFS.channel = ch1  
  24. agent.sinks.HDFS.type = hdfs  
  25. agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql  
  26. agent.sinks.HDFS.hdfs.fileType = DataStream  
  27. agent.sinks.HDFS.hdfs.writeFormat = Text  
  28. agent.sinks.HDFS.hdfs.rollSize = 268435456  
  29. agent.sinks.HDFS.hdfs.rollInterval = 0  
  30. agent.sinks.HDFS.hdfs.rollCount = 0  
        Flume在flume.conf文件中指定Source、Channel和Sink相关的配置,各属性描述如表1所示。

 

属性

描述

agent.channels.ch1.type

Agent的channel类型

agent.sources.sql-source.channels

Source对应的channel名称

agent.channels

Channel名称

agent.sinks

Sink名称

agent.sources

Source名称

agent.sources.sql-source.type

Source类型

agent.sources.sql-source.connection.url

数据库URL

agent.sources.sql-source.user

数据库用户名

agent.sources.sql-source.password

数据库密码

agent.sources.sql-source.table

数据库表名

agent.sources.sql-source.columns.to.select

查询的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

发起查询的时间间隔,单位是毫秒

agent.sources.sql-source.status.file.path

状态文件路径

agent.sources.sql-source.status.file.name

状态文件名称

agent.sinks.HDFS.channel

Sink对应的channel名称

agent.sinks.HDFS.type

Sink类型

agent.sinks.HDFS.hdfs.path

Sink路径

agent.sinks.HDFS.hdfs.fileType

流数据的文件类型

agent.sinks.HDFS.hdfs.writeFormat

数据写入格式

agent.sinks.HDFS.hdfs.rollSize

目标文件轮转大小,单位是字节

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件

agent.sinks.HDFS.hdfs.rollCount

当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

 

表1


6. 运行Flume代理

        保存上一步的设置,然后重启Flume服务,如图2所示。
技术分享 图2
        重启后,状态文件已经记录了将最新的id值7,如图3所示。
技术分享 图3
        查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。
技术分享 图4
        查询HAWQ外部表,结果也有全部7条数据,如图5所示。
技术分享 图5
        至此,初始数据抽取已经完成。

7. 测试准实时增量抽取

        在源表中新增id为8、9、10的三条记录。
[sql] view plain copy  
  1. use test;  
  2. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  3. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  4. insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,‘apr-8-2014-7:06:22-pm-pdt‘,‘notice‘,‘weblogicserver‘,‘adminserver‘,‘bea-000360‘,‘server started in running mode‘);  
  5. commit;  
        5秒之后查询HAWQ外部表,从图6可以看到,已经查询出全部10条数据,准实时增量抽取成功。
技术分享 图6

五、方案优缺点

        利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。
        这种方案的缺点与其优点一样突出,主要体现在以下几方面。
  • 在源库上执行了查询,具有入侵性。
  • 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
  • 只能识别新增数据,检测不到删除与更新。
  • 要求源库必须有用于表示增量的字段。
  •         即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

    参考:

    Flume架构以及应用介绍
    Streaming MySQL Database Table Data to HDFS with Flume
    how to read data from oracle using FLUME to kafka broker
    https://github.com/keedio/flume-ng-sql-source   v

     

    利用Flume将MySQL表数据准实时抽取到HDFS

    标签:avr   服务   java   种类   post   into   复制   ora   更新   

    热心网友 时间:2022-04-10 02:45

    @echo off
    set rar="C:\Program Files\WinRAR\rar.exe"
    setlocal enabledelayedexpansion
    for /f "delims=" %%a in ('dir /ad/b') do (
    set /a n=%%~a%%2
    if !n! equ 0 (
    %rar% a -hp12345678 test2 "%%~a"
    ) else (
    %rar% a -hp12345678 test1 "%%~a"
    )
    )
    pause
    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    想买红米但没银行卡 开美宜佳便利店是加盟好还是直接转一个美宜佳店好 ...就给你一个交易号,能到美宜佳付款,请问美宜佳是怎样付款的_百度知 ... 用美宜佳代购会不会看到我买的东西 重返帝国弓弩营对诸葛连弩有加成吗 重返帝国城市风格选什么好 重返帝国城市风格选择推荐 重返帝国城市风格怎么切换 重返帝国城市如何切换风格 重返帝国城市风格大全 特色兵种及增益介绍 重返帝国特殊兵种怎么生产-特殊兵种生产攻略 重返帝国怎么训练诸葛弩 怎么用手机刷卡坐公交,下载什么软件在公交车上刷可以坐车 华硕笔记本外接显示器问题 mac mini可以接华硕电脑当显示屏吗 华硕笔记本拆掉屏幕外接显示器行吗? 华硕笔记本可以外接显示器吗???具体说下都需要什么!!! 华硕笔记本电脑HD口可以当输入?我想当显示器用。 手机电视都可上网,手机可以进入路由设置界面。电脑右下角是感叹号!输入192.168.0.1打不开。 华硕笔记本电脑怎么当显示器用 电脑出现感叹号,手机,电脑都连不上。 手机无线网络上不去有个叹号,电脑右下方小电脑有个*叹号,怎么办?求助 “感叹号”在电脑上应该怎么打? 5v2A手机冲电器不坏了怎么修 移动IP电话是什么 为什么excel中插入的Microsoft3.0公式都是图片格式??如何去掉Microsoft3.0公式的边框 商场的标识系统设计使用哪些色彩会看起来高端大气一些? Microsoft Excel 工作表填充颜色之后看不到边框怎么办? ip电话是啥东西? 我用Microsoft Excel做表格,如果我要去掉多余的线该点什么啊,最好能留下QQ,可能我还会有问题的,谢谢! Microsoft Excel 中粘贴出现图片中的框框,怎么删除 Microsoft Excel 怎样将两个表格中相邻的竖线去掉 推荐一个简单的嵌入式开发项目。 定期年金保险年金可以领取多少年 英文字符是什么 小金猪年金险优缺点?每年花多少钱? 什么是英文字符 国寿鑫如意年金保险(白金版)女29岁年交3000每年可以领多少钱 一个英文字符是什么意思? 6000个英文字符是什么意思 @的英文字符是什么 英文字母是什么意思 英文字符是什么意思 26个英文字母书写是什么? 第10课圆明园的毁灭读后感 海南东方市码头为什么没有卖鱼的? 急求海南东方市的海鲜市场在什么地方? 海南东方市最大的水果市场、海鲜市场、大超市、大商场,是哪里? 谁有200字的读书笔记,是读书笔记,不是读后感,求了! 海南省东方市附近有什么淡水钓点.求救 老舍的散文集《一些印象》中是否对春夏秋冬都做了描写? 寻找一些,神魔或怪物的名字?