package org.apache.flink.connector.pulsar.source.enumerator.assigner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.class */
public class SharedSplitAssigner implements SplitAssigner {
    private static final long serialVersionUID = 8468503133499402491L;
    private final StopCursor stopCursor;
    private final boolean enablePartitionDiscovery;
    private final Set<TopicPartition> appendedPartitions;
    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
    private final Map<Integer, Set<String>> readerAssignedSplits;
    private boolean initialized;

    public SharedSplitAssigner(StopCursor stopCursor, SourceConfiguration sourceConfiguration, PulsarSourceEnumState pulsarSourceEnumState) {
        this.stopCursor = stopCursor;
        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
        this.appendedPartitions = pulsarSourceEnumState.getAppendedPartitions();
        this.sharedPendingPartitionSplits = pulsarSourceEnumState.getSharedPendingPartitionSplits();
        this.readerAssignedSplits = pulsarSourceEnumState.getReaderAssignedSplits();
        this.initialized = pulsarSourceEnumState.isInitialized();
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> set) {
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : set) {
            if (!this.appendedPartitions.contains(topicPartition)) {
                this.appendedPartitions.add(topicPartition);
                arrayList.add(topicPartition);
            }
        }
        if (!this.initialized) {
            this.initialized = true;
        }
        return arrayList;
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        this.sharedPendingPartitionSplits.computeIfAbsent(Integer.valueOf(i), num -> {
            return new HashSet();
        }).addAll(list);
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> list) {
        if (list.isEmpty()) {
            return Optional.empty();
        }
        HashMap hashMap = new HashMap();
        for (Integer num : list) {
            Set<PulsarPartitionSplit> remove = this.sharedPendingPartitionSplits.remove(num);
            if (remove == null) {
                remove = new HashSet();
            }
            Set<String> computeIfAbsent = this.readerAssignedSplits.computeIfAbsent(num, num2 -> {
                return new HashSet();
            });
            for (TopicPartition topicPartition : this.appendedPartitions) {
                String topicPartition2 = topicPartition.toString();
                if (!computeIfAbsent.contains(topicPartition2)) {
                    remove.add(new PulsarPartitionSplit(topicPartition, this.stopCursor));
                    computeIfAbsent.add(topicPartition2);
                }
            }
            if (!remove.isEmpty()) {
                hashMap.put(num, new ArrayList(remove));
            }
        }
        return hashMap.isEmpty() ? Optional.empty() : Optional.of(new SplitsAssignment(hashMap));
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public boolean noMoreSplits(Integer num) {
        Set<PulsarPartitionSplit> set = this.sharedPendingPartitionSplits.get(num);
        Set<String> set2 = this.readerAssignedSplits.get(num);
        return !this.enablePartitionDiscovery && this.initialized && (set == null || set.isEmpty()) && set2 != null && set2.size() == this.appendedPartitions.size();
    }

    @Override // org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner
    public PulsarSourceEnumState snapshotState() {
        return new PulsarSourceEnumState(this.appendedPartitions, new HashSet(), this.sharedPendingPartitionSplits, this.readerAssignedSplits, this.initialized);
    }
}
