关于怎么获取kafka指定位置offset消息
发布网友
发布时间:2022-04-25 15:49
我来回答
共1个回答
热心网友
时间:2023-10-14 14:02
For finding the start offset to read in Kafka 0.8 Simple Consumer example they say
Kafka includes two constants to help,
kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
in the logs and starts streaming from there,
kafka.api.OffsetRequest.LatestTime() will only stream new messages.
You can also find the example code there for managing the offset at your consumer end.
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}