package org.apache.inlong.sdk.sort.fetcher.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.inlong.sdk.sort.api.Seeker;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.util.TimeUtil;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.class */
public class KafkaSeeker implements Seeker {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSeeker.class);
    private long seekTime = -1;
    private String topic;
    private KafkaConsumer<byte[], byte[]> consumer;

    public KafkaSeeker(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    @Override // org.apache.inlong.sdk.sort.api.Configurable
    public void configure(InLongTopic inLongTopic) {
        this.seekTime = TimeUtil.parseStartTime(inLongTopic);
        this.topic = inLongTopic.getTopic();
        LOGGER.info("start to config kafka seeker, topic is {}, seek time is {}", this.topic, Long.valueOf(this.seekTime));
    }

    @Override // org.apache.inlong.sdk.sort.api.Seeker
    public void seek() {
        if (this.seekTime < 0) {
            return;
        }
        LOGGER.info("start to seek kafka topic {}, seek time is {}", this.topic, Long.valueOf(this.seekTime));
        try {
            Set assignment = this.consumer.assignment();
            if (assignment.isEmpty()) {
                LOGGER.error("haven't assigned any topic partitions, do nothing");
                return;
            }
            Map offsetsForTimes = this.consumer.offsetsForTimes((Map) assignment.stream().collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return Long.valueOf(this.seekTime);
            })));
            ArrayList arrayList = new ArrayList();
            offsetsForTimes.forEach((topicPartition3, offsetAndTimestamp) -> {
                resetOffset(topicPartition3, offsetAndTimestamp, arrayList);
            });
            LOGGER.info("topic partition {} should be seek to end", arrayList);
            if (!arrayList.isEmpty()) {
                this.consumer.seekToEnd(arrayList);
            }
            LOGGER.info("finish to seek kafka topic {}", this.topic);
        } catch (Throwable th) {
            LOGGER.error("failed to seek kafka topic, ex is {}", th.getMessage(), th);
        }
    }

    private void resetOffset(TopicPartition topicPartition, OffsetAndTimestamp offsetAndTimestamp, List<TopicPartition> list) {
        if (offsetAndTimestamp == null) {
            LOGGER.info("tp {} has null offsetAndTimestamp, reset to end", topicPartition);
            list.add(topicPartition);
        }
        long offset = offsetAndTimestamp.offset();
        long position = this.consumer.position(topicPartition);
        LOGGER.info("for tp {}, expected offset is {}, last offset is {}", new Object[]{topicPartition, Long.valueOf(offset), Long.valueOf(position)});
        if (position < offset) {
            LOGGER.info("do seek for tp {}", topicPartition);
            this.consumer.seek(topicPartition, offsetAndTimestamp.offset());
            LOGGER.info("after seek, the offset for tp {} is {}", topicPartition, Long.valueOf(this.consumer.position(topicPartition)));
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.Seeker
    public long getSeekTime() {
        return this.seekTime;
    }
}
