package org.apache.flink.connector.pulsar.source.reader.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.class */
public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);

    @Nullable
    private final TransactionCoordinatorClient coordinatorClient;
    private final SortedMap<Long, List<TxnID>> transactionsToCommit;
    private final List<TxnID> transactionsOfFinishedSplits;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PulsarUnorderedSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> futureCompletingBlockingQueue, Supplier<PulsarUnorderedPartitionSplitReader<OUT>> supplier, Configuration configuration, SourceReaderContext sourceReaderContext, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, @Nullable TransactionCoordinatorClient transactionCoordinatorClient) {
        super(futureCompletingBlockingQueue, new PulsarUnorderedFetcherManager(futureCompletingBlockingQueue, supplier::get), configuration, sourceReaderContext, sourceConfiguration, pulsarClient, pulsarAdmin);
        supplier.getClass();
        this.coordinatorClient = transactionCoordinatorClient;
        this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList());
    }

    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> map) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", map);
        }
        if (this.coordinatorClient != null) {
            Iterator<Map.Entry<String, PulsarPartitionSplitState>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                TxnID uncommittedTransactionId = it.next().getValue().getUncommittedTransactionId();
                if (uncommittedTransactionId != null) {
                    this.transactionsOfFinishedSplits.add(uncommittedTransactionId);
                }
            }
        }
    }

    public List<PulsarPartitionSplit> snapshotState(long j) {
        LOG.debug("Trigger the new transaction for downstream readers.");
        List<PulsarPartitionSplit> snapshotState = this.splitFetcherManager.snapshotState(j);
        if (this.coordinatorClient != null) {
            List<TxnID> computeIfAbsent = this.transactionsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
                return new ArrayList();
            });
            Iterator<PulsarPartitionSplit> it = snapshotState.iterator();
            while (it.hasNext()) {
                TxnID uncommittedTransactionId = it.next().getUncommittedTransactionId();
                if (uncommittedTransactionId != null) {
                    computeIfAbsent.add(uncommittedTransactionId);
                }
            }
        }
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Committing transactions for checkpoint {}", Long.valueOf(j));
        if (this.coordinatorClient != null) {
            for (Map.Entry<Long, List<TxnID>> entry : this.transactionsToCommit.entrySet()) {
                Long key = entry.getKey();
                if (key.longValue() <= j) {
                    for (TxnID txnID : entry.getValue()) {
                        this.coordinatorClient.commit(txnID);
                        this.transactionsOfFinishedSplits.remove(txnID);
                    }
                    this.transactionsToCommit.remove(key);
                }
            }
        }
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.source.PulsarSourceReaderBase
    public /* bridge */ /* synthetic */ void close() throws Exception {
        super.close();
    }
}
