/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferReader;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;

class InMemTransactionBuffer
implements TransactionBuffer {
    final ConcurrentMap<TxnID, TxnBuffer> buffers = new ConcurrentHashMap<TxnID, TxnBuffer>();
    final Map<Long, Set<TxnID>> txnIndex = new HashMap<Long, Set<TxnID>>();

    public InMemTransactionBuffer(Topic topic) {
    }

    @Override
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<TransactionMeta>();
        try {
            getFuture.complete(this.getTxnBufferOrThrowNotFoundException(txnID));
        }
        catch (TransactionBufferException.TransactionNotFoundException e) {
            getFuture.completeExceptionally(e);
        }
        return getFuture;
    }

    private TxnBuffer getTxnBufferOrThrowNotFoundException(TxnID txnID) throws TransactionBufferException.TransactionNotFoundException {
        TxnBuffer buffer = (TxnBuffer)this.buffers.get(txnID);
        if (null == buffer) {
            throw new TransactionBufferException.TransactionNotFoundException("Transaction `" + txnID + "` doesn't exist in the transaction buffer");
        }
        return buffer;
    }

    private TxnBuffer getTxnBufferOrCreateIfNotExist(TxnID txnID) {
        TxnBuffer buffer = (TxnBuffer)this.buffers.get(txnID);
        if (null == buffer) {
            TxnBuffer newBuffer = new TxnBuffer(txnID);
            TxnBuffer oldBuffer = this.buffers.putIfAbsent(txnID, newBuffer);
            if (null != oldBuffer) {
                newBuffer.close();
                return oldBuffer;
            }
            return newBuffer;
        }
        return buffer;
    }

    @Override
    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
        TxnBuffer txnBuffer = this.getTxnBufferOrCreateIfNotExist(txnId);
        CompletableFuture<Position> appendFuture = new CompletableFuture<Position>();
        try {
            txnBuffer.appendEntry(sequenceId, buffer);
            appendFuture.complete(null);
        }
        catch (TransactionException.TransactionSealedException e) {
            appendFuture.completeExceptionally(e);
        }
        return appendFuture;
    }

    @Override
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
        CompletableFuture<TransactionBufferReader> openFuture = new CompletableFuture<TransactionBufferReader>();
        try {
            TxnBuffer txnBuffer = this.getTxnBufferOrThrowNotFoundException(txnID);
            TransactionBufferReader reader = txnBuffer.newReader(startSequenceId);
            openFuture.complete(reader);
        }
        catch (TransactionException.TransactionNotSealedException | TransactionBufferException.TransactionNotFoundException e) {
            openFuture.completeExceptionally(e);
        }
        return openFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
        CompletableFuture<Void> commitFuture = new CompletableFuture<Void>();
        try {
            TxnBuffer txnBuffer;
            TxnBuffer txnBuffer2 = txnBuffer = this.getTxnBufferOrThrowNotFoundException(txnID);
            synchronized (txnBuffer2) {
                long committedAtLedgerId = -1L;
                long committedAtEntryId = -1L;
                txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId);
                this.addTxnToTxnIdex(txnID, committedAtLedgerId);
            }
            commitFuture.complete(null);
        }
        catch (TransactionException.TransactionStatusException | TransactionBufferException.TransactionNotFoundException e) {
            commitFuture.completeExceptionally(e);
        }
        return commitFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTxnToTxnIdex(TxnID txnId, long committedAtLedgerId) {
        Map<Long, Set<TxnID>> map = this.txnIndex;
        synchronized (map) {
            this.txnIndex.computeIfAbsent(committedAtLedgerId, ledgerId -> new HashSet()).add(txnId);
        }
    }

    @Override
    public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
        CompletableFuture<Void> abortFuture = new CompletableFuture<Void>();
        try {
            TxnBuffer txnBuffer = this.getTxnBufferOrThrowNotFoundException(txnID);
            txnBuffer.abort();
            this.buffers.remove(txnID, txnBuffer);
            abortFuture.complete(null);
        }
        catch (TransactionException.TransactionStatusException | TransactionBufferException.TransactionNotFoundException e) {
            abortFuture.completeExceptionally(e);
        }
        return abortFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
        ArrayList buffersToPurge = new ArrayList();
        Map<Long, Set<TxnID>> map = this.txnIndex;
        synchronized (map) {
            dataLedgers.forEach(ledger -> {
                Set<TxnID> txns = this.txnIndex.remove(ledger);
                txns.forEach(txnId -> {
                    TxnBuffer tb = (TxnBuffer)this.buffers.remove(txnId);
                    if (null != tb) {
                        buffersToPurge.add(tb);
                    }
                });
            });
        }
        buffersToPurge.forEach(TxnBuffer::close);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> clearSnapshot() {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.buffers.values().forEach(TxnBuffer::close);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean isTxnAborted(TxnID txnID) {
        return false;
    }

    @Override
    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
    }

    @Override
    public PositionImpl getMaxReadPosition() {
        return PositionImpl.LATEST;
    }

    @Override
    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        return null;
    }

    @Override
    public TransactionBufferStats getStats(boolean lowWaterMarks) {
        return null;
    }

    @Override
    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public long getOngoingTxnCount() {
        return this.buffers.values().stream().filter(txnBuffer -> txnBuffer.status.equals((Object)TxnStatus.OPEN) || txnBuffer.status.equals((Object)TxnStatus.COMMITTING) || txnBuffer.status.equals((Object)TxnStatus.ABORTING)).count();
    }

    @Override
    public long getAbortedTxnCount() {
        return this.buffers.values().stream().filter(txnBuffer -> txnBuffer.status.equals((Object)TxnStatus.ABORTED)).count();
    }

    @Override
    public long getCommittedTxnCount() {
        return this.buffers.values().stream().filter(txnBuffer -> txnBuffer.status.equals((Object)TxnStatus.COMMITTED)).count();
    }

    private static class TxnBuffer
    implements TransactionMeta,
    AutoCloseable {
        private final TxnID txnid;
        private final SortedMap<Long, ByteBuf> entries;
        private TxnStatus status;
        private long committedAtLedgerId = -1L;
        private long committedAtEntryId = -1L;

        TxnBuffer(TxnID txnid) {
            this.txnid = txnid;
            this.entries = new TreeMap<Long, ByteBuf>();
            this.status = TxnStatus.OPEN;
        }

        @Override
        public TxnID id() {
            return this.txnid;
        }

        @Override
        public synchronized TxnStatus status() {
            return this.status;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int numEntries() {
            SortedMap<Long, ByteBuf> sortedMap = this.entries;
            synchronized (sortedMap) {
                return this.entries.size();
            }
        }

        @Override
        public int numMessageInTxn() throws TransactionException.TransactionStatusException {
            return -1;
        }

        @Override
        public long committedAtLedgerId() {
            return this.committedAtLedgerId;
        }

        @Override
        public long committedAtEntryId() {
            return this.committedAtEntryId;
        }

        @Override
        public long lastSequenceId() {
            return this.entries.lastKey();
        }

        @Override
        public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
            return FutureUtil.failedFuture(new UnsupportedOperationException());
        }

        @Override
        public CompletableFuture<Position> appendEntry(long sequenceId, Position position, int batchSize) {
            return FutureUtil.failedFuture(new UnsupportedOperationException());
        }

        @Override
        public CompletableFuture<TransactionMeta> committingTxn() {
            this.status = TxnStatus.COMMITTING;
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
            try {
                return CompletableFuture.completedFuture(this.commitAt(committedAtLedgerId, committedAtEntryId));
            }
            catch (TransactionException.TransactionStatusException e) {
                return FutureUtil.failedFuture(e);
            }
        }

        @Override
        public CompletableFuture<TransactionMeta> abortTxn() {
            try {
                return CompletableFuture.completedFuture(this.abort());
            }
            catch (TransactionException.TransactionStatusException e) {
                return FutureUtil.failedFuture(e);
            }
        }

        synchronized TxnBuffer abort() throws TransactionException.TransactionStatusException {
            if (TxnStatus.OPEN != this.status) {
                throw new TransactionException.TransactionStatusException(this.txnid, TxnStatus.OPEN, this.status);
            }
            this.status = TxnStatus.ABORTED;
            return this;
        }

        synchronized TxnBuffer commitAt(long committedAtLedgerId, long committedAtEntryId) throws TransactionException.TransactionStatusException {
            if (TxnStatus.OPEN != this.status) {
                throw new TransactionException.TransactionStatusException(this.txnid, TxnStatus.OPEN, this.status);
            }
            this.committedAtLedgerId = committedAtLedgerId;
            this.committedAtEntryId = committedAtEntryId;
            this.status = TxnStatus.COMMITTED;
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            SortedMap<Long, ByteBuf> sortedMap = this.entries;
            synchronized (sortedMap) {
                this.entries.forEach((sequenceId, buffer) -> buffer.release());
                this.entries.clear();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void appendEntry(long sequenceId, ByteBuf entry) throws TransactionException.TransactionSealedException {
            Object object = this;
            synchronized (object) {
                if (TxnStatus.OPEN != this.status) {
                    throw new TransactionException.TransactionSealedException("Transaction `" + this.txnid + "` is already sealed");
                }
            }
            object = this.entries;
            synchronized (object) {
                this.entries.put(sequenceId, entry);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public TransactionBufferReader newReader(long sequenceId) throws TransactionException.TransactionNotSealedException {
            TxnBuffer txnBuffer = this;
            synchronized (txnBuffer) {
                if (TxnStatus.COMMITTED != this.status) {
                    throw new TransactionException.TransactionNotSealedException("Transaction `" + this.txnid + "` is not sealed yet");
                }
            }
            TreeMap<Long, ByteBuf> entriesToRead = new TreeMap<Long, ByteBuf>();
            SortedMap<Long, ByteBuf> sortedMap = this.entries;
            synchronized (sortedMap) {
                SortedMap<Long, ByteBuf> subEntries = this.entries.tailMap(sequenceId);
                subEntries.values().forEach(value -> value.retain());
                entriesToRead.putAll(subEntries);
            }
            return new InMemTransactionBufferReader(this.txnid, entriesToRead.entrySet().iterator(), this.committedAtLedgerId, this.committedAtEntryId);
        }
    }
}

