问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

谁整过sparkstreaming消费kafka低阶api时,如何同步offset到...

发布网友 发布时间:2024-10-01 23:27

我来回答

1个回答

热心网友 时间:2024-10-29 12:52

解决的方法是:分别从Kafka中获得某个Topic当前每个partition的offset,再从Zookeeper中获得某个consumer消费当前Topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了。
一、具体实现
1、程序实现,如下:
public class SparkStreamingOnKafkaDirect{

public static JavaStreamingContext createContext(){

SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
jsc.checkpoint("/checkpoint");

Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");

Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236", "kafka_direct");

Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("192.168.1.151:2181", "spark-group", "kafka_direct");
if(null!=consumerOffsets && consumerOffsets.size()>0){
topicOffsets.putAll(consumerOffsets);
}

热心网友 时间:2024-10-29 12:52

解决的方法是:分别从Kafka中获得某个Topic当前每个partition的offset,再从Zookeeper中获得某个consumer消费当前Topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了。
一、具体实现
1、程序实现,如下:
public class SparkStreamingOnKafkaDirect{

public static JavaStreamingContext createContext(){

SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
jsc.checkpoint("/checkpoint");

Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");

Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236", "kafka_direct");

Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("192.168.1.151:2181", "spark-group", "kafka_direct");
if(null!=consumerOffsets && consumerOffsets.size()>0){
topicOffsets.putAll(consumerOffsets);
}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
四万元没开发票税务局发多少钱 不给开发票这个情节要罚多少钱? 广东乌龙茶的种类 银行什么时候拉征信 600795国电电力,为什么在2010年4,5月突然从8块跌到4块呢?涨幅-50%... 学习很差怎么逆袭 高考文科差生五十天冲刺计划!!! ...漂亮女人,那些不大漂亮的还很善良有点丑的怎么办?她们不是很可怜吗... CS1.6 AWP经典的FRAG视频 求链接 ...大家谁有AWP的视频或者DEMO,要个看看,学学,谢谢!~ 10月是什么 有关工商银行网银购汇问题 通过中行个人网银办理结汇购汇有时间要求吗? 车过水坑后打不着火怎么回事? 古佛顶景区有什么好玩的 为什么三名女生围殴女同学 女生暴力为什么这么频烦. 宁陵初中事件始末 初中基础不怎么好,上到高中能吧成绩提升上来吗?中考只得400多分,有没... 涉县重大件运输值得推荐 我有一台比较精密的机械设备需要搬迁,不知道哪家搬迁公司服务比较... 民法典规定赡养义务有哪些? 监护人依法负担被监护人抚养费赡养费的规定有什么? 扬子热水器多少钱一个 佛家怎么解释双胞胎 佛教说自然双胞胎是福气前世积累善良与美德 组装机找人装的win10,下了360跟新修复补丁漏洞,从新启动就这样了。没... 如果女人发心情说:很多话只想说一次。什么意思? 吃了蓝莓不能吃什么 吃蓝莓的禁忌 吃蓝莓有哪些注意事项 python比spark慢多少 怎样做冰淇淋简单做法 高考理科260分可以上哪些公办专科 2011河北文科考生260多分,谁能推荐几个河北省内公办的专科学校? 我到买火车票的地方去了两次,还待检核怎么回事? 未成年人火车票待检核该怎么办? 新家装修要买哪些家具?如何选择 新家装修要买什么 装修买家具需要哪些 房子装修需要买什么家具 装修需要买哪些家具 有杀气童话新手如何快速升级 有杀气童话刷图队分析介绍_有杀气童话刷图队分析是什么 有杀气童话2平民资源获取攻略介绍_有杀气童话2平民资源获取攻略是什么... 有杀气童话副本怎么过 副本闯关实用技巧攻略 WP8装内存卡需要怎么弄?格式化? 诺基亚WP7的系统能自己格式化吗 如果WP7手机处于U盘模式,在对它进行格式化,结果会怎样? 水卡正确使用方法 在邮件中用英语问候“很高兴认识您”应怎样说 带婴儿出行需要准备哪些用品?