问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

kafka怎么样接收数据保存到MYSQL数据库

发布网友 发布时间:2022-04-24 04:50

我来回答

3个回答

懂视网 时间:2022-05-02 18:50

一、软件环境:

操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

二、环境部署

1、安装jdk

export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH

2、安装maven

参考:https://www.cnblogs.com/wcwen1990/p/7227278.html

3、安装zookeeper

1)下载软件:

wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar zxvf zookeeper-3.4.11.tar.gz 
mv zookeeper-3.4.11 /usr/local/zookeeper

2)修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

# ZooKeeper Env
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile

3)重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg

mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg


4)单机模式--修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

5)启动zookeeper

# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

6)验证zukeeper服务

# telnet chavin.king 2181
Trying 192.168.72.130...
Connected to chavin.king.
Escape character is ‘^]‘.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:
  /192.168.72.130:44054[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.

4、安装zkui

git clone https://github.com/DeemOpen/zkui.git
cd zkui
mvn clean install

修改配置文件默认值
#vim config.cfg
     serverPort=9090     #指定端口
     zkServer=192.168.1.110:2181
     sessionTimeout=300000


启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &


用浏览器访问:
http://chavin.king:9090/


5、安装kafka

wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafka

mkdir -p /usr/local/kafka/data-logs


修改配置文件 

vim server.properties

log.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &


创建topic

bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell


查看所有topic

bin/kafka-topics.sh --list --zookeeper chavin.king:2181

启动producer

bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell

启动consumer

bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning

或者

bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092  --from-beginning --topic maxwell

6、开启mysql binlog

more /etc/my.cnf
[client]
default_character_set = utf8

[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.err

binlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10

server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

7、安装maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell

启动maxwell

nohup bin/maxwell --user=‘canal‘ --password=‘canal‘ --host=‘chavin.king‘ --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &


8、开发kafka消费程序

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaTest {

    public static void main(String[] args){

        String topicName = "maxwell";
         String groupID = "example-group";

        Properties props = new Properties();
         props.put("bootstrap.servers","192.168.72.130:9092");
         props.put("group.id",groupID);
         props.put("auto.offset.reset","earliest");
         props.put("serializer.encoding","utf-8");
         props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);
        
         consumer.subscribe(Arrays.asList(topicName));

        try{
             while(true){
                 ConsumerRecords<String,String> records = consumer.poll(1000);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s ",
                         record.offset(), record.key(), record.value());
             }
         }finally{
             consumer.close();
         }
     }
}

ideal启动以上消费程序

9、测试

offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}

至此数据同步已经可以正常进行了,是不是很简单。

使用maxwell实时同步mysql数据到kafka

标签:ica   .gz   jdk1.8   port   ...   默认   format   open   eem   

热心网友 时间:2022-05-02 15:58

private function loaderHandler(event:*):void {
switch(event.type) {
case Event.COMPLETE:
trace(_loader.data.result);
break;
case Event.OPEN:
trace("open: " + event);
break;
case ProgressEvent.PROGRESS:
trace("progress: " + event);
break;

热心网友 时间:2022-05-02 17:16

这种答案都能成为最佳答案?知道“百度知道”为什么落寞了吧?吃枣药丸
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
关于第一次经典语录 ...三个月后打开包装发现里面零件全部损坏,请问能去超市退换 微单相机哪个品牌好 一首歌在酷狗上查找猪猪非主流歌曲club 沙拉拉就有求歌名 一首英文DJ,歌词是"刷啦啦哩,刷啦啦啦哩"还有歌词听起来好像是:"I wa... sha la la la sha la la lei 是什么歌 男孩起名字大全免费2025年 姓李的男孩起名字大全 ...九月十二(阴历)下午16点 出生 姓李 希望大师给起个好点的名字... 电脑开机提示先激活windows电脑上出现要激活WINDOWS是什么意思_百度知 ... 中国建设银行从邮政储蓄银行提钱会产生多少费用 大客户管理对企业而言具有什么意义 怎么做大客户维护? 大客户营销的步骤 大客户管理的6大步骤是什么 简单叙述大客户管理的原则 如何进行外贸大客户管理 大客户管理的工作规范 [企业管理100问]如何管理大客户,有哪些技巧? 有知道康巴赫不粘锅两百多元和肆百多元的有多大区别? 大客户管理如何做到一户一策 c语言中为什么栈的初始化时候栈顶指针要指向-1? 大客户管理的介绍 工商银行电话如何做好大客户管理 康巴赫蜂窝不粘锅好用不?用过的来说说。 男生宽脚适合穿什么鞋 康巴赫炒锅怎么样?用过的来说说。 大客户管理的解决方案 大客户管理最有效的方法是什么?怎样提高大客的忠诚度? 脚宽、脚肥的人群应该怎样挑选适合自己的鞋子? 如何管理大客户 kafka mysql hbase mongodb怎样组合 在数组中搜索给定的值,如果成功则返回首个相应的键名? 如何监控Mysql里插入一条数据 kafka生产者就生产这条数据 猕猴桃有一点点坏了还能吃吗 猕猴桃烂了一点能吃吗 猕猴桃有点烂的能吃吗 猕猴桃烂了一块还能吃吗 猕猴桃坏了能吃吗有毒吗 猕猴桃一半硬一半烂还能吃吗? 猕猴桃局部烂还能吃吗 猕猴桃有一点坏了的,好地方还能吃吗? 猕猴桃烂了真的能吃吗 猕猴桃有斑点还能吃吗 猕猴桃坏了吃了会怎么样 猕猴桃一半硬一半烂还能吃吗 猕猴桃坏了一半还能吃吗 猕猴桃软的外面是好的,里面感觉有点坏啦能吃吗 男孩性器官几岁开始发育? 小男孩的生殖器什么时候开始发育? 男生一般在什么时候开始发育