/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicTransactionBuffer
extends TopicTransactionBufferState
implements TransactionBuffer,
TimerTask {
    private static final Logger log = LoggerFactory.getLogger(TopicTransactionBuffer.class);
    private final PersistentTopic topic;
    private volatile PositionImpl maxReadPosition;
    private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap();
    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap();
    private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter;
    private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
    private final Timer timer;
    private final int takeSnapshotIntervalNumber;
    private final int takeSnapshotIntervalTime;
    private volatile long lastSnapshotTimestamps;
    private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture();

    public TopicTransactionBuffer(final PersistentTopic topic) {
        super(TopicTransactionBufferState.State.None);
        this.topic = topic;
        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
        this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar().getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
        this.maxReadPosition = (PositionImpl)topic.getManagedLedger().getLastConfirmedEntry();
        this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this).execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack(){

            @Override
            public void recoverComplete() {
                if (!TopicTransactionBuffer.this.changeToReadyState()) {
                    log.error("[{}]Transaction buffer recover fail", (Object)topic.getName());
                } else {
                    TopicTransactionBuffer.this.timer.newTimeout(TopicTransactionBuffer.this, TopicTransactionBuffer.this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
                    TopicTransactionBuffer.this.transactionBufferFuture.complete(null);
                }
            }

            @Override
            public void noNeedToRecover() {
                if (!TopicTransactionBuffer.this.changeToNoSnapshotState()) {
                    log.error("[{}]Transaction buffer recover fail", (Object)topic.getName());
                } else {
                    TopicTransactionBuffer.this.transactionBufferFuture.complete(null);
                }
            }

            @Override
            public void handleSnapshot(TransactionBufferSnapshot snapshot) {
                TopicTransactionBuffer.this.maxReadPosition = PositionImpl.get(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
                if (snapshot.getAborts() != null) {
                    snapshot.getAborts().forEach(abortTxnMetadata -> TopicTransactionBuffer.this.aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(), abortTxnMetadata.getTxnIdLeastBits()), PositionImpl.get(abortTxnMetadata.getLedgerId(), abortTxnMetadata.getEntryId())));
                }
            }

            @Override
            public void handleTxnEntry(Entry entry) {
                ByteBuf metadataAndPayload = entry.getDataBuffer();
                MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, "transaction-buffer-sub", -1L);
                if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                    TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
                    PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                    if (Markers.isTxnMarker(msgMetadata)) {
                        if (Markers.isTxnAbortMarker(msgMetadata)) {
                            TopicTransactionBuffer.this.aborts.put(txnID, position);
                        }
                        TopicTransactionBuffer.this.updateMaxReadPosition(txnID);
                    } else {
                        TopicTransactionBuffer.this.handleTransactionMessage(txnID, position);
                    }
                }
            }

            @Override
            public void recoverExceptionally(Exception e) {
                TopicTransactionBuffer.this.transactionBufferFuture.completeExceptionally(e);
            }
        }, this.topic, this));
    }

    @Override
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        return null;
    }

    @Override
    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxnEnabled) {
        if (!isTxnEnabled) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        ((CompletableFuture)this.transactionBufferFuture.thenRun(() -> {
            if (this.checkIfNoSnapshot()) {
                ((CompletableFuture)this.takeSnapshot().thenRun(() -> {
                    if (this.changeToReadyStateFromNoSnapshot()) {
                        this.timer.newTimeout(this, this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
                    }
                    completableFuture.complete(null);
                })).exceptionally(exception -> {
                    log.error("Topic {} failed to take snapshot", (Object)this.topic.getName());
                    completableFuture.completeExceptionally((Throwable)exception);
                    return null;
                });
            } else {
                completableFuture.complete(null);
            }
        })).exceptionally(exception -> {
            log.error("Topic {}: TransactionBuffer recover failed", (Object)this.topic.getName(), exception);
            completableFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return completableFuture;
    }

    @Override
    public CompletableFuture<Position> appendBufferToTxn(final TxnID txnId, long sequenceId, ByteBuf buffer) {
        final CompletableFuture<Position> completableFuture = new CompletableFuture<Position>();
        this.topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                TopicTransactionBuffer topicTransactionBuffer = TopicTransactionBuffer.this;
                synchronized (topicTransactionBuffer) {
                    TopicTransactionBuffer.this.handleTransactionMessage(txnId, position);
                }
                completableFuture.complete(position);
            }

            @Override
            public void addFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Failed to append buffer to txn {}", (Object)txnId, (Object)exception);
                completableFuture.completeExceptionally(exception);
            }
        }, null);
        return completableFuture;
    }

    private void handleTransactionMessage(TxnID txnId, Position position) {
        if (!this.ongoingTxns.containsKey(txnId) && !this.aborts.containsKey(txnId)) {
            this.ongoingTxns.put(txnId, (PositionImpl)position);
            PositionImpl firstPosition = (PositionImpl)this.ongoingTxns.get(this.ongoingTxns.firstKey());
            this.maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1L);
        }
    }

    @Override
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
        return null;
    }

    @Override
    public CompletableFuture<Void> commitTxn(final TxnID txnID, final long lowWaterMark) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} commit on topic {}.", (Object)txnID.toString(), (Object)this.topic.getName());
        }
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        ((CompletableFuture)this.transactionBufferFuture.thenRun(() -> {
            ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
            try {
                this.topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                        TopicTransactionBuffer topicTransactionBuffer = TopicTransactionBuffer.this;
                        synchronized (topicTransactionBuffer) {
                            TopicTransactionBuffer.this.updateMaxReadPosition(txnID);
                            TopicTransactionBuffer.this.handleLowWaterMark(txnID, lowWaterMark);
                            TopicTransactionBuffer.this.clearAbortedTransactions();
                            TopicTransactionBuffer.this.takeSnapshotByChangeTimes();
                        }
                        completableFuture.complete(null);
                    }

                    @Override
                    public void addFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("Failed to commit for txn {}", (Object)txnID, (Object)exception);
                        completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
                    }
                }, null);
            }
            finally {
                commitMarker.release();
            }
        })).exceptionally(exception -> {
            log.error("Transaction {} commit on topic {}.", new Object[]{txnID.toString(), this.topic.getName(), exception});
            completableFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return completableFuture;
    }

    @Override
    public CompletableFuture<Void> abortTxn(final TxnID txnID, final long lowWaterMark) {
        if (log.isDebugEnabled()) {
            log.debug("Transaction {} abort on topic {}.", (Object)txnID.toString(), (Object)this.topic.getName());
        }
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        ((CompletableFuture)this.transactionBufferFuture.thenRun(() -> {
            if (!this.checkIfReady()) {
                completableFuture.complete(null);
                return;
            }
            ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
            try {
                this.topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                        TopicTransactionBuffer topicTransactionBuffer = TopicTransactionBuffer.this;
                        synchronized (topicTransactionBuffer) {
                            TopicTransactionBuffer.this.aborts.put(txnID, (PositionImpl)position);
                            TopicTransactionBuffer.this.updateMaxReadPosition(txnID);
                            TopicTransactionBuffer.this.handleLowWaterMark(txnID, lowWaterMark);
                            TopicTransactionBuffer.this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
                            TopicTransactionBuffer.this.clearAbortedTransactions();
                            TopicTransactionBuffer.this.takeSnapshotByChangeTimes();
                        }
                        completableFuture.complete(null);
                    }

                    @Override
                    public void addFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("Failed to abort for txn {}", (Object)txnID, (Object)exception);
                        completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
                    }
                }, null);
            }
            finally {
                abortMarker.release();
            }
        })).exceptionally(exception -> {
            log.error("Transaction {} abort on topic {}.", (Object)txnID.toString(), (Object)this.topic.getName());
            completableFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return completableFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleLowWaterMark(final TxnID txnID, long lowWaterMark) {
        TxnID firstTxn;
        if (!this.ongoingTxns.isEmpty() && (firstTxn = (TxnID)this.ongoingTxns.firstKey()).getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
            ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
            try {
                this.topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                        TopicTransactionBuffer topicTransactionBuffer = TopicTransactionBuffer.this;
                        synchronized (topicTransactionBuffer) {
                            TopicTransactionBuffer.this.aborts.put(firstTxn, (PositionImpl)position);
                            TopicTransactionBuffer.this.updateMaxReadPosition(firstTxn);
                        }
                    }

                    @Override
                    public void addFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("Failed to abort low water mark for txn {}", (Object)txnID, (Object)exception);
                    }
                }, null);
            }
            finally {
                abortMarker.release();
            }
        }
    }

    private void takeSnapshotByChangeTimes() {
        if (this.changeMaxReadPositionAndAddAbortTimes.get() >= (long)this.takeSnapshotIntervalNumber) {
            this.takeSnapshot();
        }
    }

    private void takeSnapshotByTimeout() {
        if (this.changeMaxReadPositionAndAddAbortTimes.get() > 0L) {
            this.takeSnapshot();
        }
        this.timer.newTimeout(this, this.takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<Void> takeSnapshot() {
        this.changeMaxReadPositionAndAddAbortTimes.set(0L);
        return this.takeSnapshotWriter.thenCompose(writer -> {
            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
            TopicTransactionBuffer topicTransactionBuffer = this;
            synchronized (topicTransactionBuffer) {
                snapshot.setTopicName(this.topic.getName());
                snapshot.setMaxReadPositionLedgerId(this.maxReadPosition.getLedgerId());
                snapshot.setMaxReadPositionEntryId(this.maxReadPosition.getEntryId());
                ArrayList<AbortTxnMetadata> list = new ArrayList<AbortTxnMetadata>();
                this.aborts.forEach((k, v) -> {
                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
                    abortTxnMetadata.setLedgerId(v.getLedgerId());
                    abortTxnMetadata.setEntryId(v.getEntryId());
                    list.add(abortTxnMetadata);
                });
                snapshot.setAborts(list);
            }
            return ((CompletableFuture)writer.writeAsync(snapshot).thenAccept(messageId -> {
                this.lastSnapshotTimestamps = System.currentTimeMillis();
                if (log.isDebugEnabled()) {
                    log.debug("[{}]Transaction buffer take snapshot success! messageId : {}", (Object)this.topic.getName(), messageId);
                }
            })).exceptionally(e -> {
                log.warn("[{}]Transaction buffer take snapshot fail! ", (Object)this.topic.getName(), e);
                return null;
            });
        });
    }

    private void clearAbortedTransactions() {
        while (!this.aborts.isEmpty() && !((ManagedLedgerImpl)this.topic.getManagedLedger()).ledgerExists(((PositionImpl)this.aborts.get(this.aborts.firstKey())).getLedgerId())) {
            if (log.isDebugEnabled()) {
                this.aborts.firstKey();
                log.debug("[{}] Topic transaction buffer clear aborted transaction, TxnId : {}, Position : {}", new Object[]{this.topic.getName(), this.aborts.firstKey(), this.aborts.get(this.aborts.firstKey())});
            }
            this.aborts.remove(this.aborts.firstKey());
        }
    }

    void updateMaxReadPosition(TxnID txnID) {
        PositionImpl preMaxReadPosition = this.maxReadPosition;
        this.ongoingTxns.remove(txnID);
        if (!this.ongoingTxns.isEmpty()) {
            PositionImpl position = (PositionImpl)this.ongoingTxns.get(this.ongoingTxns.firstKey());
            this.maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1L);
        } else {
            this.maxReadPosition = (PositionImpl)this.topic.getManagedLedger().getLastConfirmedEntry();
        }
        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
        }
    }

    @Override
    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
        return null;
    }

    @Override
    public CompletableFuture<Void> clearSnapshot() {
        return ((CompletableFuture)this.takeSnapshotWriter.thenCompose(writer -> {
            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
            snapshot.setTopicName(this.topic.getName());
            return writer.deleteAsync(snapshot);
        })).thenCompose(__ -> CompletableFuture.completedFuture(null));
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.changeToCloseState();
        return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
    }

    @Override
    public boolean isTxnAborted(TxnID txnID) {
        return this.aborts.containsKey(txnID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
        TopicTransactionBuffer topicTransactionBuffer = this;
        synchronized (topicTransactionBuffer) {
            if (this.checkIfNoSnapshot()) {
                this.maxReadPosition = position;
                this.changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
            } else if (this.checkIfReady() && this.ongoingTxns.isEmpty()) {
                this.maxReadPosition = position;
                this.changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
            }
        }
    }

    @Override
    public PositionImpl getMaxReadPosition() {
        if (this.checkIfReady() || this.checkIfNoSnapshot()) {
            return this.maxReadPosition;
        }
        return PositionImpl.EARLIEST;
    }

    @Override
    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats();
        transactionInBufferStats.aborted = this.isTxnAborted(txnID);
        if (this.ongoingTxns.containsKey(txnID)) {
            transactionInBufferStats.startPosition = ((PositionImpl)this.ongoingTxns.get(txnID)).toString();
        }
        return transactionInBufferStats;
    }

    @Override
    public TransactionBufferStats getStats() {
        TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
        transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
        transactionBufferStats.state = this.getState().name();
        transactionBufferStats.maxReadPosition = this.maxReadPosition.toString();
        return transactionBufferStats;
    }

    @Override
    public void run(Timeout timeout) {
        if (this.checkIfReady()) {
            this.takeSnapshotByTimeout();
        }
    }

    static class FillEntryQueueCallback
    implements AsyncCallbacks.ReadEntriesCallback {
        private final AtomicLong outstandingReadsRequests = new AtomicLong(0L);
        private final SpscArrayQueue<Entry> entryQueue;
        private final ManagedCursor cursor;
        private final TopicTransactionBufferRecover recover;
        private volatile boolean isReadable = true;

        private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, ManagedCursor cursor, TopicTransactionBufferRecover recover) {
            this.entryQueue = entryQueue;
            this.cursor = cursor;
            this.recover = recover;
        }

        boolean fillQueue() {
            if (this.entryQueue.size() < this.entryQueue.capacity() && this.outstandingReadsRequests.get() == 0L) {
                if (this.cursor.hasMoreEntries()) {
                    this.outstandingReadsRequests.incrementAndGet();
                    this.cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.LATEST);
                } else if (this.entryQueue.size() == 0) {
                    this.isReadable = false;
                }
            }
            return this.isReadable;
        }

        @Override
        public void readEntriesComplete(final List<Entry> entries, Object ctx) {
            this.entryQueue.fill((MessagePassingQueue.Supplier)new MessagePassingQueue.Supplier<Entry>(){
                private int i = 0;

                public Entry get() {
                    Entry entry = (Entry)entries.get(this.i);
                    ++this.i;
                    return entry;
                }
            }, entries.size());
            this.outstandingReadsRequests.decrementAndGet();
        }

        @Override
        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
            if (this.recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() && exception instanceof ManagedLedgerException.NonRecoverableLedgerException || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                this.isReadable = false;
            }
            this.recover.callBackException(exception);
            this.outstandingReadsRequests.decrementAndGet();
        }
    }

    static class TopicTransactionBufferRecover
    implements Runnable {
        private final PersistentTopic topic;
        private final TopicTransactionBufferRecoverCallBack callBack;
        private Position startReadCursorPosition = PositionImpl.EARLIEST;
        private final SpscArrayQueue<Entry> entryQueue;
        private final AtomicLong exceptionNumber = new AtomicLong();
        public static final String SUBSCRIPTION_NAME = "transaction-buffer-sub";
        private final TopicTransactionBuffer topicTransactionBuffer;

        private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, PersistentTopic topic, TopicTransactionBuffer transactionBuffer) {
            this.topic = topic;
            this.callBack = callBack;
            this.entryQueue = new SpscArrayQueue(2000);
            this.topicTransactionBuffer = transactionBuffer;
        }

        @Override
        public void run() {
            if (!this.topicTransactionBuffer.changeToInitializingState()) {
                log.warn("TransactionBuffer {} of topic {} can not change state to Initializing", (Object)this, (Object)this.topic.getName());
                return;
            }
            ((CompletableFuture)this.topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService().createReader(TopicName.get(this.topic.getName())).thenAcceptAsync(reader -> {
                ManagedCursor managedCursor;
                try {
                    boolean hasSnapshot = false;
                    while (reader.hasMoreEvents()) {
                        TransactionBufferSnapshot transactionBufferSnapshot;
                        Message message = reader.readNext();
                        if (!this.topic.getName().equals(message.getKey()) || (transactionBufferSnapshot = (TransactionBufferSnapshot)message.getValue()) == null) continue;
                        hasSnapshot = true;
                        this.callBack.handleSnapshot(transactionBufferSnapshot);
                        this.startReadCursorPosition = PositionImpl.get(transactionBufferSnapshot.getMaxReadPositionLedgerId(), transactionBufferSnapshot.getMaxReadPositionEntryId());
                    }
                    if (!hasSnapshot) {
                        this.callBack.noNeedToRecover();
                        return;
                    }
                }
                catch (PulsarClientException pulsarClientException) {
                    log.error("[{}]Transaction buffer recover fail when read transactionBufferSnapshot!", (Object)this.topic.getName(), (Object)pulsarClientException);
                    this.callBack.recoverExceptionally(pulsarClientException);
                    reader.closeAsync().exceptionally(e -> {
                        log.error("[{}]Transaction buffer reader close error!", (Object)this.topic.getName(), e);
                        return null;
                    });
                    return;
                }
                reader.closeAsync().exceptionally(e -> {
                    log.error("[{}]Transaction buffer reader close error!", (Object)this.topic.getName(), e);
                    return null;
                });
                try {
                    managedCursor = this.topic.getManagedLedger().newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
                }
                catch (ManagedLedgerException e2) {
                    this.callBack.recoverExceptionally(e2);
                    log.error("[{}]Transaction buffer recover fail when open cursor!", (Object)this.topic.getName(), (Object)e2);
                    return;
                }
                PositionImpl lastConfirmedEntry = (PositionImpl)this.topic.getManagedLedger().getLastConfirmedEntry();
                PositionImpl currentLoadPosition = (PositionImpl)this.startReadCursorPosition;
                FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(this.entryQueue, managedCursor, this);
                if (lastConfirmedEntry.getEntryId() != -1L) {
                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) {
                        Entry entry = (Entry)this.entryQueue.poll();
                        if (entry != null) {
                            try {
                                currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                                this.callBack.handleTxnEntry(entry);
                                continue;
                            }
                            finally {
                                entry.release();
                                continue;
                            }
                        }
                        try {
                            Thread.sleep(1L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
                this.closeCursor(managedCursor);
                this.callBack.recoverComplete();
            }, (Executor)this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this))).exceptionally(e -> {
                this.callBack.recoverExceptionally(new Exception((Throwable)e));
                log.error("[{}]Transaction buffer new snapshot reader fail!", (Object)this.topic.getName(), e);
                return null;
            });
        }

        private void closeCursor(ManagedCursor cursor) {
            cursor.asyncClose(new AsyncCallbacks.CloseCallback(){

                @Override
                public void closeComplete(Object ctx) {
                    log.info("[{}]Transaction buffer snapshot recover cursor close complete.", (Object)topic.getName());
                }

                @Override
                public void closeFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("[{}]Transaction buffer snapshot recover cursor close fail.", (Object)topic.getName());
                }
            }, null);
        }

        private void callBackException(ManagedLedgerException e) {
            log.error("Transaction buffer recover fail when recover transaction entry!", (Throwable)e);
            this.exceptionNumber.getAndIncrement();
        }
    }
}

