/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.split;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarUnorderedPartitionSplitReader
extends PulsarPartitionSplitReaderBase {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
    private final TransactionCoordinatorClient coordinatorClient;
    @Nullable
    private Transaction uncommittedTransaction;

    public PulsarUnorderedPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, Schema<byte[]> schema, @Nullable CryptoKeyReader cryptoKeyReader, TransactionCoordinatorClient coordinatorClient) {
        super(pulsarClient, pulsarAdmin, sourceConfiguration, schema, cryptoKeyReader);
        this.coordinatorClient = coordinatorClient;
    }

    @Override
    protected Message<byte[]> pollMessage(Duration timeout) throws ExecutionException, InterruptedException, PulsarClientException {
        Message<byte[]> message = this.pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
        if (message == null) {
            return null;
        }
        if (!this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            if (this.uncommittedTransaction == null) {
                this.uncommittedTransaction = this.newTransaction();
            }
            try {
                this.pulsarConsumer.acknowledgeAsync(message.getMessageId(), this.uncommittedTransaction).get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            }
        }
        return message;
    }

    @Override
    protected void finishedPollMessage(Message<?> message) {
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            PulsarExceptionUtils.sneakyClient(() -> this.pulsarConsumer.acknowledge(message));
        }
        message.release();
    }

    @Override
    protected void startConsumer(PulsarPartitionSplit split, Consumer<?> consumer) {
        TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
        if (uncommittedTransactionId != null) {
            block4: {
                if (this.coordinatorClient != null) {
                    try {
                        this.coordinatorClient.abort(uncommittedTransactionId);
                    }
                    catch (TransactionCoordinatorClientException e) {
                        TransactionCoordinatorClientException exception = PulsarTransactionUtils.unwrap(e);
                        if (exception instanceof TransactionCoordinatorClientException.TransactionNotFoundException) break block4;
                        LOG.error("Failed to abort the uncommitted transaction {} when restart the reader", (Object)uncommittedTransactionId, (Object)e);
                    }
                }
            }
            consumer.redeliverUnacknowledgedMessages();
        }
    }

    public PulsarPartitionSplitState snapshotState(long checkpointId) {
        PulsarPartitionSplitState state = new PulsarPartitionSplitState(this.registeredSplit);
        if (this.uncommittedTransaction != null) {
            TxnID txnID = this.uncommittedTransaction.getTxnID();
            this.uncommittedTransaction = this.newTransaction();
            state.setUncommittedTransactionId(txnID);
        }
        return state;
    }

    private Transaction newTransaction() {
        long timeoutMillis = this.sourceConfiguration.getTransactionTimeoutMillis();
        return PulsarTransactionUtils.createTransaction(this.pulsarClient, timeoutMillis);
    }
}

