新都在

新都在

Java获取指定topic每个分区的当前偏移量

28
2023-02-02
Java获取指定topic每个分区的当前偏移量

Java获取指定topic每个分区的当前偏移量

首先引入pom.xml

<dependencies>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
		<!--<version>2.1.10.RELEASE</version>-->
	</dependency>
</dependencies>

配置properties.properties文件

因为我们需要获取的是每个消费者消费的topic的每个分区的当前偏移量,所以在properties配置文件中只需要配置消费者即可
不同的group_id消费相同的topic,当前的偏移量也会不一样的

#kafka消费者配置
spring.kafka.consumer.bootstrap-servers=
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit= false
spring.kafka.consumer.fetch-max-wait=30s
spring.kafka.consumer.fetch-min-size=
spring.kafka.consumer.group-id=
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=500

执行获取topic每个分区的当前偏移量代码

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/*
 * @version 1.0 created by LXW on 2019/11/20 10:20
 */
public class KafkaUtil {


    /**
     * 获取当前topic下的全部分区的偏移量信息
     *
     * @param properties 配置信息
     * @param partitions Collection<TopicPartition> partitions
     * @return {partition:offset}
     */
    public static Map<TopicPartition, Long> getPartitionsOffset(Map<String, Object> properties, Collection<TopicPartition>
            partitions) {
        KafkaConsumer consumer = new KafkaConsumer(properties);
        try {
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
            return endOffsets;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }finally {
            consumer.close();
        }
    }


    /**
     * 获取当前服务消费的topic的每个分区的当前偏移量
     *
     * @param properties 配置信息
     * @param topics Collection<String> topics
     * @return {
     *          topic:
     *           {
     *             partitionInfo:offset
     *           }
     *         }
     */
    public static Map<String, Map<TopicPartition, Long>> getTopicPartitionsOffset(Map<String, Object> properties, Set<String> topics){
        Map<String, Map<TopicPartition, Long>> topicPartitionMap = new HashMap<>();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            for (String topic : topics) {
                List<PartitionInfo> partitionsInfo = kafkaConsumer.partitionsFor(topic);
                Set<TopicPartition> topicPartitions = new HashSet<>();
                for (PartitionInfo partitionInfo : partitionsInfo) {
                    TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    topicPartitions.add(topicPartition);
                }
                Map<TopicPartition, Long> topicPartitionsOffset = getPartitionsOffset(properties, topicPartitions);
                topicPartitionMap.put(topic, topicPartitionsOffset);
            }
            return topicPartitionMap;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }finally {
            kafkaConsumer.close();
        }
    }

}

How to use

public static void main(String[] args) {
    Set topics = new HashSet(Arrays.asList("test1", "test2"));
    KafkaProperties kafkaProperties = new KafkaProperties();
    kafkaProperties.setBootstrapServers(Arrays.asList("127.0.0.1:9200"));
    kafkaProperties.setClientId("clientId");
    Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    Map<String, Map<TopicPartition, Long>> serviceTopicPartitionsOffset = KafkaUtil.getTopicPartitionsOffset(consumerProperties, topics);
    // TODO waht you want to do
}

其中properties可以直接通过properties文件自动注入的方式自动加载进去