/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarSourceEnumerator
implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
    private final PulsarAdmin pulsarAdmin;
    private final PulsarSubscriber subscriber;
    private final StartCursor startCursor;
    private final RangeGenerator rangeGenerator;
    private final SourceConfiguration sourceConfiguration;
    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
    private final SplitAssigner splitAssigner;

    public PulsarSourceEnumerator(PulsarSubscriber subscriber, StartCursor startCursor, RangeGenerator rangeGenerator, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> context, SplitAssigner splitAssigner) {
        this.pulsarAdmin = PulsarClientFactory.createAdmin(sourceConfiguration);
        this.subscriber = subscriber;
        this.startCursor = startCursor;
        this.rangeGenerator = rangeGenerator;
        this.sourceConfiguration = sourceConfiguration;
        this.context = context;
        this.splitAssigner = splitAssigner;
    }

    public void start() {
        this.rangeGenerator.open(this.sourceConfiguration);
        if (this.sourceConfiguration.isEnablePartitionDiscovery()) {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} with partition discovery interval of {} ms.", (Object)this.subscriptionDesc(), (Object)this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0L, this.sourceConfiguration.getPartitionDiscoveryIntervalMs());
        } else {
            LOG.info("Starting the PulsarSourceEnumerator for subscription {} without periodic partition discovery.", (Object)this.subscriptionDesc());
            this.context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
        }
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    }

    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
        this.splitAssigner.addSplitsBack(splits, subtaskId);
        if (this.context.registeredReaders().containsKey(subtaskId)) {
            this.assignPendingPartitionSplits(Collections.singletonList(subtaskId));
        }
    }

    public void addReader(int subtaskId) {
        LOG.debug("Adding reader {} to PulsarSourceEnumerator for subscription {}.", (Object)subtaskId, (Object)this.subscriptionDesc());
        this.assignPendingPartitionSplits(Collections.singletonList(subtaskId));
    }

    public PulsarSourceEnumState snapshotState(long checkpointId) {
        return this.splitAssigner.snapshotState();
    }

    public void close() {
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
    }

    private String subscriptionDesc() {
        return String.format("%s(%s,%s)", new Object[]{this.sourceConfiguration.getSubscriptionName(), this.sourceConfiguration.getSubscriptionType(), this.sourceConfiguration.getSubscriptionMode()});
    }

    private Set<TopicPartition> getSubscribedTopicPartitions() {
        int parallelism = this.context.currentParallelism();
        return this.subscriber.getSubscribedTopicPartitions(this.pulsarAdmin, this.rangeGenerator, parallelism);
    }

    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable throwable) {
        if (throwable != null) {
            throw new FlinkRuntimeException("Failed to list subscribed topic partitions due to ", throwable);
        }
        List<TopicPartition> newPartitions = this.splitAssigner.registerTopicPartitions(fetchedPartitions);
        this.createSubscription(newPartitions);
        ArrayList<Integer> registeredReaders = new ArrayList<Integer>(this.context.registeredReaders().keySet());
        this.assignPendingPartitionSplits(registeredReaders);
    }

    private void createSubscription(List<TopicPartition> newPartitions) {
        for (TopicPartition partition : newPartitions) {
            String topicName = partition.getFullTopicName();
            String subscriptionName = this.sourceConfiguration.getSubscriptionName();
            List subscriptions = (List)PulsarExceptionUtils.sneakyAdmin(() -> this.pulsarAdmin.topics().getSubscriptions(topicName));
            if (subscriptions.contains(subscriptionName)) continue;
            CursorPosition position = this.startCursor.position(partition.getTopic(), partition.getPartitionId());
            MessageId initialPosition = this.queryInitialPosition(topicName, position);
            PulsarExceptionUtils.sneakyAdmin(() -> this.pulsarAdmin.topics().createSubscription(topicName, subscriptionName, initialPosition));
        }
    }

    private MessageId queryInitialPosition(String topicName, CursorPosition position) {
        CursorPosition.Type type = position.getType();
        if (type == CursorPosition.Type.TIMESTAMP) {
            return (MessageId)PulsarExceptionUtils.sneakyAdmin(() -> this.pulsarAdmin.topics().getMessageIdByTimestamp(topicName, position.getTimestamp()));
        }
        if (type == CursorPosition.Type.MESSAGE_ID) {
            return position.getMessageId();
        }
        throw new UnsupportedOperationException("We don't support this seek type " + (Object)((Object)type));
    }

    private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
        pendingReaders.forEach(reader -> {
            if (!this.context.registeredReaders().containsKey(reader)) {
                throw new IllegalStateException("Reader " + reader + " is not registered to source coordinator");
            }
        });
        this.splitAssigner.createAssignment(pendingReaders).ifPresent(arg_0 -> this.context.assignSplits(arg_0));
        for (Integer reader2 : pendingReaders) {
            if (!this.splitAssigner.noMoreSplits(reader2)) continue;
            LOG.debug("No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {} in subscription {}.", (Object)reader2, (Object)this.subscriptionDesc());
            this.context.signalNoMoreSplits(reader2.intValue());
        }
    }
}

