问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

Springboot集成Kafka

发布网友 发布时间:2022-09-01 03:50

我来回答

1个回答

热心网友 时间:2024-11-06 16:03

最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。

1. 安装JDK ,具体版本可以根据项目实际情况选择,目前使用最多的为jdk8

2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8

3. 安装Kafka  ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1

4. 安装Kafka Manage   (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.3.12.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<dependencies>

<!--springboot web依赖 -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<!--kafka依赖 -->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

</dependencies>

spring:

  kafka:

    bootstrap-servers: 127.0.0.1:9092

    procer: #生产者

      # 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序

      #比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)

      retries: 1

      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。模式时16k

      batch-size: 16384 #16k

      # 设置生产者内存缓冲区的大小

      buffer-memory: 33554432

      acks: 1

      # 键的序列化方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的序列化方式

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:

      #group-id: default-group

      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效

      auto-commit-interval: 1S

      enable-auto-commit: false

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)

      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录

      auto-offset-reset: earliest

      # 键的反序列化方式

      key-deserializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的反序列化方式

      value-deserializer: org.apache.kafka.common.serialization.StringSerializer

    listener:

      # 当每一条记录被消费者*(ListenerConsumer)处理之后提交

      # RECORD

      # 当每一批poll()的数据被消费者*(ListenerConsumer)处理之后提交

      # BATCH

      # 当每一批poll()的数据被消费者*(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

      # TIME

      # 当每一批poll()的数据被消费者*(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

      # COUNT

      # TIME | COUNT 有一个条件满足时提交

      # COUNT_TIME

      # 当每一批poll()的数据被消费者*(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

      # MANUAL

      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种

      # MANUAL_IMMEDIATE

      ack-mode: manual_immediate

      # 在侦听器容器中运行的线程数

      concurrency: 5

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

 -- 127.0.0.1:2181  zookeeper 服务器地址

--  replication-factor partitions 副本数量

--partitions partition数量

点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:

输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API

*/

@Component

@Slf4j

public class QueueProcer {

@Autowired

    private KafkaTemplatekafkaTemplate;

public void sendQueue(String topic,Object msgContent) {

String obj2String =JSON.toJSONString(msgContent);

log.info("kafka service 准备发送消息为:{}",obj2String);

//发送消息

        ListenableFuture>future =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);

future.addCallback(new ListenableFutureCallback>() {

//消息发送成功

            @Override

            public void onSuccess(SendResult stringObjectSendResult) {

log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());

}

//消息发送失败

            @Override

            public void onFailure(Throwable throwable) {

//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理

                log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());

}

});

}

}

package com.charlie.cloudconsumer.service.impl.kafka;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.stereotype.Component;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : 消费端实际的业务处理对象

*/

@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册

public class QueueDataProcess {

public boolean doExec(Object obj) {

// to 具体的业务逻辑

        if (ObjectUtils.isNotEmpty(obj)) {

return true;

}else {

return false;

}

}

}

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import com.charlie.cloudconsumer.model.Order;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.exception.ExceptionUtils;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;

import org.springframework.kafka.support.KafkaHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

import java.util.Optional;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息消费端,负责消费特定topic消息

*/

@Component

@Slf4j

@SuppressWarnings("all")

public class QueueConsumer {

@Autowired

    private QueueDataProcess queueDataProcess;

/**

*

*/

    @KafkaListener(topics ="test", groupId ="consumer")

public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {

Optional message =Optional.ofNullable(record.value());

if (message.isPresent()) {

try {

Object msg =message.get();

log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);

boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));

if (res) {

ack.acknowledge();

}

}catch (Exception ex) {

log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));

}

}

}

}

package com.charlie.cloudconsumer.controller;

import com.alibaba.fastjson.JSON;

import com.charlie.cloudconsumer.common.utils.AjaxResult;

import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;

import com.charlie.cloudconsumer.model.Order;

import com.charlie.cloudconsumer.service.impl.kafka.QueueProcer;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RestController;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息

*/

@RestController

@RequestMapping(value ="/kafka",proces =MediaType.APPLICATION_JSON_VALUE)

public class KafkaController {

    @Autowired

    private QueueProcer queueProcer;

    @RequestMapping(value = "/send",method = RequestMethod.POST)

    public  AjaxResult<?> sendMsg(@RequestBody Order order) {

        AjaxResult<?> ajaxResult= null;

        if (ObjectUtils.isNotEmpty(order)) {

          queueProcer.sendQueue("test",order);

            ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");

        } else {

            ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");

        }

        return ajaxResult;

    }

}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
白色T 恤如何搭配才能穿出优雅的韵味? 白色T 恤衫怎样搭配才能显得时髦又好看? 白色紧身正肩T 恤想要穿出休闲感,该如何搭配? 手机怎么连接笔记本电脑的wifi上网 电热水器为什么用着用着就没压力了 用什么拖地最干净又耐脏 爱奇艺会员的等级是怎么划分的? 爱奇艺星钻会员怎么开通 怎样才是正确的接吻呢 网页上鼠标点住右键拖动后出的蓝色线条是什么?干什么用的?为什么一松开... kafkacluster类 为什么不能调用 自强的近义词 仙客来水浇大了不生长,修根重新上盆后叶子掉光了,怎么救活? 仙客来花茎发蔫弯下来 用苏软造句!!! 求青年文摘2010第五期文摘《一条鱼需要自行车吗》原文 新按键精灵有一个一键申请QQ帐号,但他自动写的密码我都不知道,还有生日... 按键精灵里一键申请qq的密码是多少呀 按键精灵怎么绑定QQ浏览器,因为填写单必须用QQ浏览器? 三星s9有人工智能芯片吗 使用按键精灵怎么申请6位QQ 相机是什么 索尼RX1RM2的升级与RX1有什么变化? 索尼RX1RM2的升级大吗?跟RX1具体有什么区别? 描写女巫被火刑的句子 干煸芸豆的家常做法 干煸芸豆怎么做好吃 清明上坟可以用什么花 淅淅沥沥 绵绵细雨 霏霏细雨 润如土膏造句 红薯叶的功效 7大治病良方 空气中的什么可用作火箭的燃料 kafkacluster 在kafka0.10.1中用什么代替 芬满婷洗衣液怎么样 吻芬洗衣液生产厂址- 问一问 急求,一首四个女生合唱的英文歌曲,要在毕业欢送会上唱的,我们是大一的... 有一首英文歌曲特好听,是关于毕业的,请大家知道歌名的告诉我一声? 崩坏3买的账号,只绑定了身份证,送邮箱,安全吗?- 问一问 京都薇薇 方庄先锋店怎么样 青岛市城阳区西园庄物流园地址 京都薇薇方庄新店开业了吗? 浙江淳安西园山庄怎么样? 苹果ios13.4.1版本可用于6P吗? 为什么iPad可以登录知网 电脑登不进去 夏季甲鱼垂钓方法 为什么我睡觉睡觉突然流鼻血? finalcam搜索不到设备 vivo不支持finalcam软件吗?未能搜索视频设备 finalcam无法加入网络 手机银行可以激活银行卡吗 盐酸纳洛酮舌下片吃后可以继续喝酒吗?喝酒前使用可以吗?有没有副... ...最近流行一种新的解酒的方法,盐酸纳洛酮舌下片?是否真如大家所说的...