/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl.transaction;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionImpl
implements Transaction {
    private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class);
    private final PulsarClientImpl client;
    private final long transactionTimeoutMs;
    private final long txnIdLeastBits;
    private final long txnIdMostBits;
    private final Map<String, CompletableFuture<Void>> registerPartitionMap;
    private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
    private final TransactionCoordinatorClientImpl tcClient;
    private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
    private final ArrayList<CompletableFuture<Void>> ackFutureList;
    private volatile State state = State.OPEN;

    TransactionImpl(PulsarClientImpl client, long transactionTimeoutMs, long txnIdLeastBits, long txnIdMostBits) {
        this.client = client;
        this.transactionTimeoutMs = transactionTimeoutMs;
        this.txnIdLeastBits = txnIdLeastBits;
        this.txnIdMostBits = txnIdMostBits;
        this.registerPartitionMap = new ConcurrentHashMap<String, CompletableFuture<Void>>();
        this.registerSubscriptionMap = new ConcurrentHashMap<Pair<String, String>, CompletableFuture<Void>>();
        this.tcClient = client.getTcClient();
        this.sendFutureList = new ArrayList();
        this.ackFutureList = new ArrayList();
    }

    public CompletableFuture<Void> registerProducedTopic(String topic) {
        return this.checkIfOpen().thenCompose(value -> {
            TransactionImpl transactionImpl = this;
            synchronized (transactionImpl) {
                return this.registerPartitionMap.compute(topic, (key, future) -> {
                    if (future != null) {
                        return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
                    }
                    return this.tcClient.addPublishPartitionToTxnAsync(new TxnID(this.txnIdMostBits, this.txnIdLeastBits), Lists.newArrayList((Object[])new String[]{topic})).thenCompose(ignored -> CompletableFuture.completedFuture(null));
                });
            }
        });
    }

    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
        this.sendFutureList.add(sendFuture);
    }

    public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
        return this.checkIfOpen().thenCompose(value -> {
            TransactionImpl transactionImpl = this;
            synchronized (transactionImpl) {
                return this.registerSubscriptionMap.compute((Pair<String, String>)Pair.of((Object)topic, (Object)subscription), (key, future) -> {
                    if (future != null) {
                        return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
                    }
                    return this.tcClient.addSubscriptionToTxnAsync(new TxnID(this.txnIdMostBits, this.txnIdLeastBits), topic, subscription).thenCompose(ignored -> CompletableFuture.completedFuture(null));
                });
            }
        });
    }

    public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
        this.ackFutureList.add(ackFuture);
    }

    public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
        if (this.cumulativeAckConsumers == null) {
            this.cumulativeAckConsumers = new HashMap();
        }
        this.cumulativeAckConsumers.put(consumer, 0);
    }

    public CompletableFuture<Void> commit() {
        return this.checkIfOpen().thenCompose(value -> {
            CompletableFuture commitFuture = new CompletableFuture();
            this.state = State.COMMITTING;
            this.allOpComplete().whenComplete((v, e) -> {
                if (e != null) {
                    this.abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally((Throwable)e));
                } else {
                    this.tcClient.commitAsync(new TxnID(this.txnIdMostBits, this.txnIdLeastBits)).whenComplete((vx, ex) -> {
                        if (ex != null) {
                            if (ex instanceof TransactionCoordinatorClientException.TransactionNotFoundException || ex instanceof TransactionCoordinatorClientException.InvalidTxnStatusException) {
                                this.state = State.ERROR;
                            }
                            commitFuture.completeExceptionally((Throwable)ex);
                        } else {
                            this.state = State.COMMITTED;
                            commitFuture.complete(vx);
                        }
                    });
                }
            });
            return commitFuture;
        });
    }

    public CompletableFuture<Void> abort() {
        return this.checkIfOpen().thenCompose(value -> {
            CompletableFuture abortFuture = new CompletableFuture();
            this.state = State.ABORTING;
            this.allOpComplete().whenComplete((v, e) -> {
                if (e != null) {
                    log.error(e.getMessage());
                }
                if (this.cumulativeAckConsumers != null) {
                    this.cumulativeAckConsumers.forEach((consumer, integer) -> this.cumulativeAckConsumers.putIfAbsent((ConsumerImpl<?>)consumer, consumer.clearIncomingMessagesAndGetMessageNumber()));
                }
                this.tcClient.abortAsync(new TxnID(this.txnIdMostBits, this.txnIdLeastBits)).whenComplete((vx, ex) -> {
                    if (this.cumulativeAckConsumers != null) {
                        this.cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
                        this.cumulativeAckConsumers.clear();
                    }
                    if (ex != null) {
                        if (ex instanceof TransactionCoordinatorClientException.TransactionNotFoundException || ex instanceof TransactionCoordinatorClientException.InvalidTxnStatusException) {
                            this.state = State.ERROR;
                        }
                        abortFuture.completeExceptionally((Throwable)ex);
                    } else {
                        this.state = State.ABORTED;
                        abortFuture.complete(null);
                    }
                });
            });
            return abortFuture;
        });
    }

    private CompletableFuture<Void> checkIfOpen() {
        if (this.state == State.OPEN) {
            return CompletableFuture.completedFuture(null);
        }
        return FutureUtil.failedFuture((Throwable)new TransactionCoordinatorClientException.InvalidTxnStatusException("[" + this.txnIdMostBits + ":" + this.txnIdLeastBits + "] with unexpected state : " + this.state.name() + ", expect " + (Object)((Object)State.OPEN) + " state!"));
    }

    private CompletableFuture<Void> allOpComplete() {
        ArrayList<CompletableFuture<Object>> futureList = new ArrayList<CompletableFuture<Object>>();
        futureList.addAll(this.sendFutureList);
        futureList.addAll(this.ackFutureList);
        return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
    }

    public PulsarClientImpl getClient() {
        return this.client;
    }

    public long getTransactionTimeoutMs() {
        return this.transactionTimeoutMs;
    }

    public long getTxnIdLeastBits() {
        return this.txnIdLeastBits;
    }

    public long getTxnIdMostBits() {
        return this.txnIdMostBits;
    }

    public Map<String, CompletableFuture<Void>> getRegisterPartitionMap() {
        return this.registerPartitionMap;
    }

    public Map<Pair<String, String>, CompletableFuture<Void>> getRegisterSubscriptionMap() {
        return this.registerSubscriptionMap;
    }

    public TransactionCoordinatorClientImpl getTcClient() {
        return this.tcClient;
    }

    public Map<ConsumerImpl<?>, Integer> getCumulativeAckConsumers() {
        return this.cumulativeAckConsumers;
    }

    public ArrayList<CompletableFuture<MessageId>> getSendFutureList() {
        return this.sendFutureList;
    }

    public ArrayList<CompletableFuture<Void>> getAckFutureList() {
        return this.ackFutureList;
    }

    public State getState() {
        return this.state;
    }

    public static enum State {
        OPEN,
        COMMITTING,
        ABORTING,
        COMMITTED,
        ABORTED,
        ERROR;

    }
}

