谁整过sparkstreaming消费kafka低阶api时,如何同步offset到...
发布网友
发布时间:2024-10-01 23:27
我来回答
共1个回答
热心网友
时间:2024-10-29 12:52
解决的方法是:分别从Kafka中获得某个Topic当前每个partition的offset,再从Zookeeper中获得某个consumer消费当前Topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了。
一、具体实现
1、程序实现,如下:
public class SparkStreamingOnKafkaDirect{
public static JavaStreamingContext createContext(){
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
jsc.checkpoint("/checkpoint");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");
Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236", "kafka_direct");
Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("192.168.1.151:2181", "spark-group", "kafka_direct");
if(null!=consumerOffsets && consumerOffsets.size()>0){
topicOffsets.putAll(consumerOffsets);
}
热心网友
时间:2024-10-29 12:52
解决的方法是:分别从Kafka中获得某个Topic当前每个partition的offset,再从Zookeeper中获得某个consumer消费当前Topic中每个partition的offset,最后再这两个根据项目情况进行合并,就可以了。
一、具体实现
1、程序实现,如下:
public class SparkStreamingOnKafkaDirect{
public static JavaStreamingContext createContext(){
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
jsc.checkpoint("/checkpoint");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list","192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236");
Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("192.168.1.151:1234,192.168.1.151:1235,192.168.1.151:1236", "kafka_direct");
Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("192.168.1.151:2181", "spark-group", "kafka_direct");
if(null!=consumerOffsets && consumerOffsets.size()>0){
topicOffsets.putAll(consumerOffsets);
}