package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.class */
public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceSplitEnumerator.class);
    private static final String CLIENT_ID_PREFIX = "seatunnel";
    private final ConsumerMetadata metadata;
    private final SourceSplitEnumerator.Context<KafkaSourceSplit> context;
    private long discoveryIntervalMillis;
    private AdminClient adminClient;
    private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
    private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
    private ScheduledExecutorService executor;
    private ScheduledFuture scheduledFuture;

    KafkaSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<KafkaSourceSplit> context) {
        this.metadata = consumerMetadata;
        this.context = context;
        this.assignedSplit = new HashMap();
        this.pendingSplit = new HashMap();
    }

    KafkaSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<KafkaSourceSplit> context, KafkaSourceState kafkaSourceState) {
        this(consumerMetadata, context);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<KafkaSourceSplit> context, long j) {
        this(consumerMetadata, context);
        this.discoveryIntervalMillis = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<KafkaSourceSplit> context, KafkaSourceState kafkaSourceState, long j) {
        this(consumerMetadata, context, kafkaSourceState);
        this.discoveryIntervalMillis = j;
    }

    public void open() {
        this.adminClient = initAdminClient(this.metadata.getProperties());
        if (this.discoveryIntervalMillis > 0) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("kafka-partition-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    discoverySplits();
                } catch (Exception e) {
                    log.error("Dynamic discovery failure:", e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws ExecutionException, InterruptedException {
        fetchPendingPartitionSplit();
        setPartitionStartOffset();
        assignSplit();
    }

    private void setPartitionStartOffset() throws ExecutionException, InterruptedException {
        Set<TopicPartition> keySet = this.pendingSplit.keySet();
        Map<TopicPartition, Long> map = null;
        switch (this.metadata.getStartMode()) {
            case EARLIEST:
                map = listOffsets(keySet, OffsetSpec.earliest());
                break;
            case GROUP_OFFSETS:
                map = listConsumerGroupOffsets(keySet);
                break;
            case LATEST:
                map = listOffsets(keySet, OffsetSpec.latest());
                break;
            case TIMESTAMP:
                map = listOffsets(keySet, OffsetSpec.forTimestamp(this.metadata.getStartOffsetsTimestamp().longValue()));
                break;
            case SPECIFIC_OFFSETS:
                map = this.metadata.getSpecificStartOffsets();
                break;
        }
        map.entrySet().forEach(entry -> {
            if (this.pendingSplit.containsKey(entry.getKey())) {
                this.pendingSplit.get(entry.getKey()).setStartOffset(((Long) entry.getValue()).longValue());
            }
        });
    }

    public void close() throws IOException {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void addSplitsBack(List<KafkaSourceSplit> list, int i) {
        if (list.isEmpty()) {
            return;
        }
        this.pendingSplit.putAll(convertToNextSplit(list));
        assignSplit();
    }

    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> list) {
        try {
            Map<TopicPartition, Long> listOffsets = listOffsets((Collection) list.stream().map((v0) -> {
                return v0.getTopicPartition();
            }).collect(Collectors.toList()), OffsetSpec.latest());
            list.forEach(kafkaSourceSplit -> {
                kafkaSourceSplit.setStartOffset(kafkaSourceSplit.getEndOffset() + 1);
                kafkaSourceSplit.setEndOffset(((Long) listOffsets.get(kafkaSourceSplit.getTopicPartition())).longValue());
            });
            return (Map) list.stream().collect(Collectors.toMap(kafkaSourceSplit2 -> {
                return kafkaSourceSplit2.getTopicPartition();
            }, kafkaSourceSplit3 -> {
                return kafkaSourceSplit3;
            }));
        } catch (Exception e) {
            throw new KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int i) {
    }

    public void registerReader(int i) {
        if (this.pendingSplit.isEmpty()) {
            return;
        }
        assignSplit();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public KafkaSourceState m936snapshotState(long j) throws Exception {
        return new KafkaSourceState((Set) this.assignedSplit.values().stream().collect(Collectors.toSet()));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }

    private AdminClient initAdminClient(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("bootstrap.servers", this.metadata.getBootstrapServers());
        properties2.setProperty("client.id", "seatunnel-enumerator-admin-client-" + hashCode());
        return AdminClient.create(properties2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.util.Collection] */
    private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, InterruptedException {
        List asList;
        if (this.metadata.isPattern()) {
            Pattern compile = Pattern.compile(this.metadata.getTopic());
            asList = (Collection) this.adminClient.listTopics().names().get().stream().filter(str -> {
                return compile.matcher(str).matches();
            }).collect(Collectors.toSet());
        } else {
            asList = Arrays.asList(this.metadata.getTopic().split(Config.DEFAULT_FIELD_DELIMITER));
        }
        log.info("Discovered topics: {}", asList);
        Collection<TopicPartition> collection = (Collection) this.adminClient.describeTopics(asList).all().get().values().stream().flatMap(topicDescription -> {
            return topicDescription.partitions().stream().map(topicPartitionInfo -> {
                return new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
            });
        }).collect(Collectors.toSet());
        Map<TopicPartition, Long> listOffsets = listOffsets(collection, OffsetSpec.latest());
        return (Set) collection.stream().map(topicPartition -> {
            KafkaSourceSplit kafkaSourceSplit = new KafkaSourceSplit(topicPartition);
            kafkaSourceSplit.setEndOffset(((Long) listOffsets.get(kafkaSourceSplit.getTopicPartition())).longValue());
            return kafkaSourceSplit;
        }).collect(Collectors.toSet());
    }

    private synchronized void assignSplit() {
        HashMap hashMap = new HashMap(16);
        for (int i = 0; i < this.context.currentParallelism(); i++) {
            hashMap.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ArrayList();
            });
        }
        this.pendingSplit.entrySet().forEach(entry -> {
            if (this.assignedSplit.containsKey(entry.getKey())) {
                return;
            }
            ((List) hashMap.get(Integer.valueOf(getSplitOwner((TopicPartition) entry.getKey(), this.context.currentParallelism())))).add(entry.getValue());
        });
        SourceSplitEnumerator.Context<KafkaSourceSplit> context = this.context;
        context.getClass();
        hashMap.forEach((v1, v2) -> {
            r1.assignSplit(v1, v2);
        });
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }

    private static int getSplitOwner(TopicPartition topicPartition, int i) {
        return ((((topicPartition.topic().hashCode() * 31) & Integer.MAX_VALUE) % i) + topicPartition.partition()) % i;
    }

    private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition> collection, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
        return (Map) this.adminClient.listOffsets((Map) collection.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return offsetSpec;
        }))).all().thenApply(map -> {
            HashMap hashMap = new HashMap();
            map.forEach((topicPartition3, listOffsetsResultInfo) -> {
                if (listOffsetsResultInfo != null) {
                    hashMap.put(topicPartition3, Long.valueOf(listOffsetsResultInfo.offset()));
                }
            });
            return hashMap;
        }).get();
    }

    public Map<TopicPartition, Long> listConsumerGroupOffsets(Collection<TopicPartition> collection) throws ExecutionException, InterruptedException {
        return (Map) this.adminClient.listConsumerGroupOffsets(this.metadata.getConsumerGroup(), new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList(collection))).partitionsToOffsetAndMetadata().thenApply(map -> {
            HashMap hashMap = new HashMap();
            map.forEach((topicPartition, offsetAndMetadata) -> {
                if (offsetAndMetadata != null) {
                    hashMap.put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
                }
            });
            return hashMap;
        }).get();
    }

    private void discoverySplits() throws ExecutionException, InterruptedException {
        fetchPendingPartitionSplit();
        assignSplit();
    }

    private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
        getTopicInfo().forEach(kafkaSourceSplit -> {
            if (this.assignedSplit.containsKey(kafkaSourceSplit.getTopicPartition()) || this.pendingSplit.containsKey(kafkaSourceSplit.getTopicPartition())) {
                return;
            }
            this.pendingSplit.put(kafkaSourceSplit.getTopicPartition(), kafkaSourceSplit);
        });
    }
}
