package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.class */
public class PulsarSplitEnumerator implements SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<PulsarPartitionSplit> context;
    private final PulsarAdminConfig adminConfig;
    private final PulsarDiscoverer partitionDiscoverer;
    private final long partitionDiscoveryIntervalMs;
    private final StartCursor startCursor;
    private final StopCursor stopCursor;
    private final String subscriptionName;
    private final Set<TopicPartition> assignedPartitions;
    private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
    private PulsarAdmin pulsarAdmin;
    private boolean noMoreNewPartitionSplits;
    private ScheduledThreadPoolExecutor executor;

    public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context, PulsarAdminConfig pulsarAdminConfig, PulsarDiscoverer pulsarDiscoverer, long j, StartCursor startCursor, StopCursor stopCursor, String str) {
        this(context, pulsarAdminConfig, pulsarDiscoverer, j, startCursor, stopCursor, str, Collections.emptySet());
    }

    public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context, PulsarAdminConfig pulsarAdminConfig, PulsarDiscoverer pulsarDiscoverer, long j, StartCursor startCursor, StopCursor stopCursor, String str, Set<TopicPartition> set) {
        this.noMoreNewPartitionSplits = false;
        this.executor = null;
        if ((pulsarDiscoverer instanceof TopicPatternDiscoverer) && j > 0 && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
            throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
        }
        this.context = context;
        this.adminConfig = pulsarAdminConfig;
        this.partitionDiscoverer = pulsarDiscoverer;
        this.partitionDiscoveryIntervalMs = j;
        this.startCursor = startCursor;
        this.stopCursor = stopCursor;
        this.subscriptionName = str;
        this.assignedPartitions = new HashSet(set);
        this.pendingPartitionSplits = new HashMap();
    }

    public void open() {
        this.pulsarAdmin = PulsarConfigUtil.createAdmin(this.adminConfig);
    }

    public void run() throws Exception {
        if (this.partitionDiscoveryIntervalMs <= 0) {
            discoverySplits();
        } else {
            this.executor = new ScheduledThreadPoolExecutor(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("pulsar-split-discovery-executor");
                return thread;
            });
            this.executor.scheduleAtFixedRate(this::discoverySplits, 0L, this.partitionDiscoveryIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    private void discoverySplits() {
        checkPartitionChanges(this.partitionDiscoverer.getSubscribedTopicPartitions(this.pulsarAdmin));
    }

    private void checkPartitionChanges(Set<TopicPartition> set) {
        Set<TopicPartition> newPartitions = getNewPartitions(set);
        if (this.partitionDiscoveryIntervalMs <= 0 && !this.noMoreNewPartitionSplits) {
            LOG.debug("Partition discovery is disabled.");
            this.noMoreNewPartitionSplits = true;
        }
        if (newPartitions.isEmpty()) {
            return;
        }
        addPartitionSplitChangeToPendingAssignments((List) newPartitions.stream().map(this::createPulsarPartitionSplit).collect(Collectors.toList()));
        assignPendingPartitionSplits(this.context.registeredReaders());
    }

    private PulsarPartitionSplit createPulsarPartitionSplit(TopicPartition topicPartition) {
        StopCursor copy = this.stopCursor.copy();
        PulsarPartitionSplit pulsarPartitionSplit = new PulsarPartitionSplit(topicPartition, copy);
        if (copy instanceof LatestMessageStopCursor) {
            ((LatestMessageStopCursor) copy).prepare(this.pulsarAdmin, topicPartition);
        }
        if (this.startCursor instanceof SubscriptionStartCursor) {
            ((SubscriptionStartCursor) this.startCursor).ensureSubscription(this.subscriptionName, topicPartition, this.pulsarAdmin);
        }
        return pulsarPartitionSplit;
    }

    private Set<TopicPartition> getNewPartitions(Set<TopicPartition> set) {
        set.getClass();
        Consumer consumer = (v1) -> {
            r0.remove(v1);
        };
        this.assignedPartitions.forEach(consumer);
        this.pendingPartitionSplits.forEach((num, set2) -> {
            set2.forEach(pulsarPartitionSplit -> {
                consumer.accept(pulsarPartitionSplit.getPartition());
            });
        });
        if (!set.isEmpty()) {
            LOG.info("Discovered new partitions: {}", set);
        }
        return set;
    }

    private void addPartitionSplitChangeToPendingAssignments(Collection<PulsarPartitionSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (PulsarPartitionSplit pulsarPartitionSplit : collection) {
            this.pendingPartitionSplits.computeIfAbsent(Integer.valueOf(getSplitOwner(pulsarPartitionSplit.getPartition(), currentParallelism)), num -> {
                return new HashSet();
            }).add(pulsarPartitionSplit);
        }
        LOG.debug("Assigned {} to {} readers of subscription {}.", new Object[]{collection, Integer.valueOf(currentParallelism), this.subscriptionName});
    }

    static int getSplitOwner(TopicPartition topicPartition, int i) {
        return ((((topicPartition.getTopic().hashCode() * 31) & Integer.MAX_VALUE) % i) + topicPartition.getPartition()) % i;
    }

    private void assignPendingPartitionSplits(Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Set<PulsarPartitionSplit> remove = this.pendingPartitionSplits.remove(Integer.valueOf(intValue));
            if (remove != null && !remove.isEmpty()) {
                remove.forEach(pulsarPartitionSplit -> {
                    this.assignedPartitions.add(pulsarPartitionSplit.getPartition());
                });
                LOG.info("Assigning splits to readers {}", remove);
                this.context.assignSplit(intValue, new ArrayList(remove));
            }
        }
        if (this.noMoreNewPartitionSplits && this.stopCursor.getBoundedness() == Boundedness.BOUNDED) {
            LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", set, this.subscriptionName);
            SourceSplitEnumerator.Context<PulsarPartitionSplit> context = this.context;
            context.getClass();
            set.forEach((v1) -> {
                r1.signalNoMoreSplits(v1);
            });
        }
    }

    public void close() throws IOException {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    public void addSplitsBack(List<PulsarPartitionSplit> list, int i) {
        addPartitionSplitChangeToPendingAssignments(list);
        if (this.context.registeredReaders().contains(Integer.valueOf(i))) {
            assignPendingPartitionSplits(Collections.singleton(Integer.valueOf(i)));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingPartitionSplits.size();
    }

    public void handleSplitRequest(int i) {
    }

    public void registerReader(int i) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", Integer.valueOf(i), this.subscriptionName);
        assignPendingPartitionSplits(Collections.singleton(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PulsarSplitEnumeratorState m2950snapshotState(long j) throws Exception {
        return new PulsarSplitEnumeratorState(this.assignedPartitions);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
