package org.apache.inlong.sdk.sort.fetcher.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.sdk.sort.api.Seeker;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.class */
public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(AckOffsetOnRebalance.class);
    private static final long DEFAULT_MAX_WAIT_FOR_ACK_TIME = 15000;
    private final long maxWaitForAckTime;
    private final String clusterId;
    private final Seeker seeker;
    private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
    private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final AtomicLong revokedNum;
    private final AtomicLong assignedNum;

    public AckOffsetOnRebalance(String str, Seeker seeker, ConcurrentHashMap<TopicPartition, OffsetAndMetadata> concurrentHashMap, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this(str, seeker, concurrentHashMap, null, kafkaConsumer);
    }

    public AckOffsetOnRebalance(String str, Seeker seeker, ConcurrentHashMap<TopicPartition, OffsetAndMetadata> concurrentHashMap, ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> concurrentHashMap2, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this(str, seeker, concurrentHashMap, concurrentHashMap2, kafkaConsumer, DEFAULT_MAX_WAIT_FOR_ACK_TIME);
    }

    public AckOffsetOnRebalance(String str, Seeker seeker, ConcurrentHashMap<TopicPartition, OffsetAndMetadata> concurrentHashMap, ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> concurrentHashMap2, KafkaConsumer<byte[], byte[]> kafkaConsumer, long j) {
        this.revokedNum = new AtomicLong(0L);
        this.assignedNum = new AtomicLong(0L);
        this.clusterId = str;
        this.seeker = seeker;
        this.commitOffsetMap = concurrentHashMap;
        this.ackOffsetMap = concurrentHashMap2;
        this.consumer = kafkaConsumer;
        this.maxWaitForAckTime = j;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        LOGGER.info("*- in re-balance:onPartitionsRevoked, it's the {} time", Long.valueOf(this.revokedNum.incrementAndGet()));
        collection.forEach(topicPartition -> {
            LOGGER.debug("clusterId:{},onPartitionsRevoked:{}, position is {}", new Object[]{this.clusterId, topicPartition.toString(), Long.valueOf(this.consumer.position(topicPartition))});
        });
        try {
            if (Objects.nonNull(this.ackOffsetMap) && Objects.nonNull(this.commitOffsetMap)) {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < this.maxWaitForAckTime && !ackReady(collection)) {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                }
                ackRemovedTopicPartitions(collection);
            }
        } catch (Throwable th) {
            LOGGER.warn("got exception in onPartitionsRevoked : ", th);
        }
    }

    private boolean ackReady(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            ConcurrentSkipListMap<Long, Boolean> concurrentSkipListMap = this.ackOffsetMap.get(topicPartition);
            if (!Objects.isNull(concurrentSkipListMap)) {
                for (Map.Entry<Long, Boolean> entry : concurrentSkipListMap.entrySet()) {
                    if (!entry.getValue().booleanValue()) {
                        LOGGER.info("tp {}, offset {} has not been ack, wait", topicPartition, entry.getKey());
                        return false;
                    }
                }
            }
        }
        LOGGER.info("all revoked tp have been ack, re-balance right now.");
        return true;
    }

    private void ackRemovedTopicPartitions(Collection<TopicPartition> collection) {
        LOGGER.info("ack revoked topic partitions");
        prepareCommit();
        this.consumer.commitSync(this.commitOffsetMap);
        ConcurrentHashMap.KeySetView keySet = this.ackOffsetMap.keySet();
        Stream<TopicPartition> stream = collection.stream();
        keySet.getClass();
        Stream<TopicPartition> filter = stream.filter((v1) -> {
            return r1.contains(v1);
        });
        ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> concurrentHashMap = this.ackOffsetMap;
        concurrentHashMap.getClass();
        filter.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    private void prepareCommit() {
        ArrayList arrayList = new ArrayList();
        this.ackOffsetMap.forEach((topicPartition, concurrentSkipListMap) -> {
            long j = -1;
            for (Long l : concurrentSkipListMap.keySet()) {
                if (!((Boolean) concurrentSkipListMap.get(l)).booleanValue()) {
                    break;
                }
                arrayList.add(l);
                j = l.longValue();
            }
            if (CollectionUtils.isEmpty(arrayList)) {
                return;
            }
            concurrentSkipListMap.getClass();
            arrayList.forEach((v1) -> {
                r1.remove(v1);
            });
            arrayList.clear();
            this.commitOffsetMap.put(topicPartition, new OffsetAndMetadata(j));
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOGGER.info("*- in re-balance:onPartitionsAssigned, it is the {} time", Long.valueOf(this.assignedNum.incrementAndGet()));
        collection.forEach(topicPartition -> {
            long position = this.consumer.position(topicPartition);
            LOGGER.debug("clusterId:{},onPartitionsAssigned:{}, position is {}", new Object[]{this.clusterId, topicPartition.toString(), Long.valueOf(position)});
            this.consumer.seek(topicPartition, position + 1);
        });
        this.seeker.seek();
    }
}
