kafka怎么获得最后一条消息的offset
发布网友
发布时间:2022-04-25 15:49
我来回答
共1个回答
热心网友
时间:2023-10-14 14:02
我不生产答案,我只是当一回Stackoverflow的搬运工。今天刚好在Stackoverflow查Kafka的一个问题,顺带看到的。
For finding the start offset to read in Kafka 0.8 Simple Consumer example they say
Kafka includes two constants to help,
kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
in the logs and starts streaming from there,
kafka.api.OffsetRequest.LatestTime() will only stream new messages.
You can also find the example code there for managing the offset at your consumer end.
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}