package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnexpectedTxnStatusException;
import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionMetaImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.class */
public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
    private PulsarService pulsar;
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PersistentTransactionBufferTest.class);
    private MockZooKeeper mockZk;
    private PersistentTransactionBuffer buffer;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic_txn";
    private final TxnID txnID = new TxnID(1234, 5678);

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest$NonClosableMockBookKeeper.class */
    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
        public NonClosableMockBookKeeper(ZooKeeper zooKeeper, ExecutorService executorService) throws Exception {
            super(zooKeeper, executorService);
        }

        public void close() {
        }

        public void shutdown() {
        }

        public void reallyShutdown() {
            super.shutdown();
        }
    }

    @BeforeMethod
    public void setup() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.pulsar = (PulsarService) Mockito.spy(new PulsarService(serviceConfiguration));
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn(Mockito.mock(Compactor.class)).when(this.pulsar)).getCompactor();
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsar)).getManagedLedgerFactory();
        this.mockZk = createMockZooKeeper();
        ((PulsarService) Mockito.doReturn(this.mockZk).when(this.pulsar)).getZkClient();
        ((PulsarService) Mockito.doReturn(createMockBookKeeper(this.mockZk, this.pulsar.getExecutor())).when(this.pulsar)).getBookKeeperClient();
        ZooKeeperCache zooKeeperCache = (ZooKeeperCache) Mockito.mock(ZooKeeperCache.class);
        ((ZooKeeperCache) Mockito.doReturn(30).when(zooKeeperCache)).getZkOperationTimeoutSeconds();
        ((PulsarService) Mockito.doReturn(zooKeeperCache).when(this.pulsar)).getLocalZkCache();
        this.configCacheService = (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.empty()).when(zooKeeperDataCache)).get(ArgumentMatchers.anyString());
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zooKeeperDataCache)).getAsync((String) ArgumentMatchers.any());
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperDataCache).when(localZooKeeperCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        this.brokerService = (BrokerService) Mockito.spy(new BrokerService(this.pulsar));
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        this.serverCnx = (ServerCnx) Mockito.spy(new ServerCnx(this.pulsar));
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnx)).clientAddress();
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitOwned((ServiceUnitId) ArgumentMatchers.any(NamespaceBundle.class));
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitActive((TopicName) ArgumentMatchers.any(TopicName.class));
        setupMLAsyncCallbackMocks();
    }

    public static MockZooKeeper createMockZooKeeper() throws Exception {
        MockZooKeeper newInstance = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
        ArrayList arrayList = new ArrayList(0);
        ZkUtils.createFullPathOptimistic(newInstance, "/ledgers/available/192.168.1.1:5000", "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), arrayList, CreateMode.PERSISTENT);
        newInstance.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), arrayList, CreateMode.PERSISTENT);
        return newInstance;
    }

    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zooKeeper, ExecutorService executorService) throws Exception {
        return (NonClosableMockBookKeeper) Mockito.spy(new NonClosableMockBookKeeper(zooKeeper, executorService));
    }

    void setupMLAsyncCallbackMocks() throws BrokerServiceException.NamingException, ManagedLedgerException, InterruptedException {
        this.ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        final CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(this.ledgerMock)).getCursors();
        ((ManagedCursor) Mockito.doReturn("mockCursor").when(this.cursorMock)).getName();
        ((ManagedCursor) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Boolean.valueOf(completableFuture.complete(null));
            }
        }).when(this.cursorMock)).asyncClose(new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.1
            public void closeComplete(Object obj) {
                PersistentTransactionBufferTest.log.info("[{}] Successfully closed cursor ledger", "mockCursor");
                completableFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTransactionBufferTest.log.error("Error closing cursor for subscription", (Throwable) managedLedgerException);
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, (Object) null);
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentTransactionBufferTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(ArgumentMatchers.matches(".*success.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(ArgumentMatchers.matches(".*fail.*"), (ManagedLedgerConfig) ArgumentMatchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.OpenLedgerCallback.class), (Supplier) ArgumentMatchers.any(Supplier.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.5
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1L, 1L), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(this.ledgerMock)).asyncAddEntry((ByteBuf) ArgumentMatchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) ArgumentMatchers.any(AsyncCallbacks.AddEntryCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.6
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(PersistentTransactionBufferTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(ArgumentMatchers.matches(".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition) ArgumentMatchers.any(PulsarApi.CommandSubscribe.InitialPosition.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.7
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(PersistentTransactionBufferTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(ArgumentMatchers.matches(".*success.*"), (PulsarApi.CommandSubscribe.InitialPosition) ArgumentMatchers.any(PulsarApi.CommandSubscribe.InitialPosition.class), (Map) ArgumentMatchers.any(Map.class), (AsyncCallbacks.OpenCursorCallback) ArgumentMatchers.any(AsyncCallbacks.OpenCursorCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.8
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteLedgerCallback.class), ArgumentMatchers.any());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.transaction.buffer.PersistentTransactionBufferTest.9
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(ArgumentMatchers.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) ArgumentMatchers.any(AsyncCallbacks.DeleteCursorCallback.class), ArgumentMatchers.any());
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock.getArguments()[2]).markDeleteComplete(invocationOnMock.getArguments()[3]);
            return null;
        }).when(this.cursorMock)).asyncMarkDelete((Position) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (AsyncCallbacks.MarkDeleteCallback) ArgumentMatchers.any(AsyncCallbacks.MarkDeleteCallback.class), ArgumentMatchers.any());
        this.buffer = new PersistentTransactionBuffer("persistent://prop/use/ns-abc/successTopic_txn", this.factory.open("hello"), this.brokerService);
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.brokerService.getTopics().clear();
        this.brokerService.close();
        try {
            this.pulsar.close();
            this.mockZk.shutdown();
        } catch (Exception e) {
            log.warn("Failed to close pulsar service", (Throwable) e);
            throw e;
        }
    }

    @Test
    public void testGetANonExistTxn() throws InterruptedException {
        try {
            this.buffer.getTransactionMeta(this.txnID).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
        }
    }

    @Test
    public void testOpenReaderOnNonExistentTxn() throws InterruptedException {
        try {
            this.buffer.openTransactionBufferReader(this.txnID, 0L).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
        }
    }

    @Test
    public void testOpenReadOnAnOpenTxn() throws InterruptedException {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = null;
        try {
            transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        } catch (ExecutionException e) {
            Assert.fail("Should not failed at here");
        }
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta.status());
        try {
            this.buffer.openTransactionBufferReader(this.txnID, 0L).get();
            Assert.fail("Should failed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof TransactionNotSealedException);
        }
    }

    @Test
    public void testOpenReaderOnCommittedTxn() throws ExecutionException, InterruptedException {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta.status());
        this.buffer.commitTxn(this.txnID, 22L, 33L).get();
        TransactionMeta transactionMeta2 = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta2.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta2.status());
        try {
            TransactionBufferReader transactionBufferReader = (TransactionBufferReader) this.buffer.openTransactionBufferReader(this.txnID, 0L).get();
            Throwable th = null;
            try {
                try {
                    verifyAndReleaseEntries((List) transactionBufferReader.readNext(10).get(), this.txnID, 0L, 10);
                    transactionBufferReader.readNext(1).get();
                    if (transactionBufferReader != null) {
                        if (0 != 0) {
                            try {
                                transactionBufferReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            transactionBufferReader.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof EndOfTransactionException);
        }
    }

    @Test
    public void testCommitNonExistentTxn() throws ExecutionException, InterruptedException {
        try {
            this.buffer.commitTxn(this.txnID, 22L, 33L).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
        }
    }

    @Test
    public void testCommitTxn() throws Exception {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(transactionMeta.status(), TxnStatus.OPEN);
        this.buffer.commitTxn(this.txnID, 22L, 33L).get();
        TransactionMeta transactionMeta2 = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta2.id());
        Assert.assertEquals(transactionMeta2.status(), TxnStatus.COMMITTED);
    }

    @Test
    public void testCommitTxnMultiTimes() throws ExecutionException, InterruptedException {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(transactionMeta.status(), TxnStatus.OPEN);
        this.buffer.commitTxn(this.txnID, 22L, 33L).get();
        try {
            this.buffer.commitTxn(this.txnID, 23L, 34L).get();
            this.buffer.commitTxn(this.txnID, 24L, 34L).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof UnexpectedTxnStatusException);
        }
        TransactionMeta transactionMeta2 = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta2.id());
        Assert.assertEquals(transactionMeta2.status(), TxnStatus.COMMITTED);
        Assert.assertEquals(transactionMeta2.committedAtLedgerId(), 22L);
        Assert.assertEquals(transactionMeta2.committedAtEntryId(), 33L);
        Assert.assertEquals(transactionMeta2.numEntries(), 10);
    }

    @Test
    public void testAbortNonExistentTxn() throws Exception {
        try {
            this.buffer.abortTxn(this.txnID).get();
            Assert.fail("Should fail to abort a transaction if it doesn't exist");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
        }
    }

    @Test
    public void testAbortCommittedTxn() throws Exception {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta.status());
        this.buffer.commitTxn(this.txnID, 22L, 33L).get();
        TransactionMeta transactionMeta2 = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta2.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta2.status());
        try {
            this.buffer.abortTxn(this.txnID).get();
            Assert.fail("Should fail to abort a committed transaction");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof UnexpectedTxnStatusException);
        }
        TransactionMeta transactionMeta3 = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta3.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta3.status());
    }

    @Test
    public void testAbortTxn() throws Exception {
        appendEntries(this.buffer, this.txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(this.txnID, transactionMeta.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta.status());
        this.buffer.abortTxn(this.txnID).get();
        verifyTxnNotExist(this.txnID);
    }

    @Test
    public void testPurgeTxns() throws Exception {
        TxnID txnID = new TxnID(1234L, 2345L);
        appendEntries(this.buffer, txnID, 10, 0L);
        TransactionMeta transactionMeta = (TransactionMeta) this.buffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(txnID, transactionMeta.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta.status());
        TxnID txnID2 = new TxnID(1234L, 3456L);
        appendEntries(this.buffer, txnID2, 10, 0L);
        this.buffer.commitTxn(txnID2, 22L, 0L).get();
        TransactionMeta transactionMeta2 = (TransactionMeta) this.buffer.getTransactionMeta(txnID2).get();
        Assert.assertEquals(txnID2, transactionMeta2.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta2.status());
        TxnID txnID3 = new TxnID(1234L, 4567L);
        appendEntries(this.buffer, txnID3, 10, 0L);
        this.buffer.commitTxn(txnID3, 23L, 0L).get();
        TransactionMeta transactionMeta3 = (TransactionMeta) this.buffer.getTransactionMeta(txnID3).get();
        Assert.assertEquals(txnID3, transactionMeta3.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta3.status());
        this.buffer.purgeTxns(Lists.newArrayList(22L)).get();
        verifyTxnNotExist(txnID2);
        TransactionMeta transactionMeta4 = (TransactionMeta) this.buffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(txnID, transactionMeta4.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta4.status());
        TransactionMeta transactionMeta5 = (TransactionMeta) this.buffer.getTransactionMeta(txnID3).get();
        Assert.assertEquals(txnID3, transactionMeta5.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta5.status());
        try {
            this.buffer.purgeTxns(Lists.newArrayList(1L)).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NoTxnsCommittedAtLedgerException);
        }
        verifyTxnNotExist(txnID2);
        TransactionMeta transactionMeta6 = (TransactionMeta) this.buffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(txnID, transactionMeta6.id());
        Assert.assertEquals(TxnStatus.OPEN, transactionMeta6.status());
        TransactionMeta transactionMeta7 = (TransactionMeta) this.buffer.getTransactionMeta(txnID3).get();
        Assert.assertEquals(txnID3, transactionMeta7.id());
        Assert.assertEquals(TxnStatus.COMMITTED, transactionMeta7.status());
    }

    @Test
    public void testAppendEntry() throws ExecutionException, InterruptedException, ManagedLedgerException, BrokerServiceException.NamingException {
        ManagedLedger open = this.factory.open("test_ledger");
        PersistentTransactionBuffer persistentTransactionBuffer = new PersistentTransactionBuffer("persistent://prop/use/ns-abc/successTopic_txn", open, this.brokerService);
        TxnID txnID = new TxnID(1111L, 2222L);
        List<ByteBuf> appendEntries = appendEntries(persistentTransactionBuffer, txnID, 10, 0L);
        ArrayList arrayList = new ArrayList(appendEntries);
        TransactionMetaImpl transactionMetaImpl = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(transactionMetaImpl.id(), txnID);
        Assert.assertEquals(10, transactionMetaImpl.numEntries());
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.OPEN);
        verifyEntries(open, appendEntries, transactionMetaImpl.getEntries());
        persistentTransactionBuffer.commitTxn(txnID, 22L, 33L).get();
        TransactionMetaImpl transactionMetaImpl2 = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(transactionMetaImpl2.id(), txnID);
        Assert.assertEquals(transactionMetaImpl2.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl2.status(), TxnStatus.COMMITTED);
        verifyEntries(open, arrayList, transactionMetaImpl2.getEntries());
    }

    @Test
    public void testCommitMarker() throws Exception {
        ManagedLedger open = this.factory.open("test_commit_ledger");
        PersistentTransactionBuffer persistentTransactionBuffer = new PersistentTransactionBuffer("persistent://prop/use/ns-abc/successTopic_txn", open, this.brokerService);
        List<ByteBuf> appendEntries = appendEntries(persistentTransactionBuffer, this.txnID, 10, 0L);
        TransactionMetaImpl transactionMetaImpl = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(transactionMetaImpl.id(), this.txnID);
        Assert.assertEquals(transactionMetaImpl.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.OPEN);
        verifyEntries(open, appendEntries, transactionMetaImpl.getEntries());
        persistentTransactionBuffer.commitTxn(this.txnID, 22L, 33L).get();
        Assert.assertEquals(transactionMetaImpl.id(), this.txnID);
        Assert.assertEquals(transactionMetaImpl.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.COMMITTED);
        Assert.assertTrue(Markers.isTxnCommitMarker(Commands.parseMessageMetadata(getEntry(open.newNonDurableCursor(PositionImpl.earliest), open.getLastConfirmedEntry()).getDataBuffer())));
    }

    @Test
    public void testAbortMarker() throws Exception {
        ManagedLedger open = this.factory.open("test_abort_ledger");
        PersistentTransactionBuffer persistentTransactionBuffer = new PersistentTransactionBuffer("persistent://prop/use/ns-abc/successTopic_txn", open, this.brokerService);
        List<ByteBuf> appendEntries = appendEntries(persistentTransactionBuffer, this.txnID, 10, 0L);
        TransactionMetaImpl transactionMetaImpl = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(this.txnID).get();
        Assert.assertEquals(transactionMetaImpl.id(), this.txnID);
        Assert.assertEquals(transactionMetaImpl.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.OPEN);
        verifyEntries(open, appendEntries, transactionMetaImpl.getEntries());
        persistentTransactionBuffer.abortTxn(this.txnID).get();
        Assert.assertEquals(transactionMetaImpl.id(), this.txnID);
        Assert.assertEquals(transactionMetaImpl.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.ABORTED);
        Assert.assertTrue(Markers.isTxnAbortMarker(Commands.parseMessageMetadata(getEntry(open.newNonDurableCursor(PositionImpl.earliest), open.getLastConfirmedEntry()).getDataBuffer())));
    }

    private void verifyEntries(ManagedLedger managedLedger, List<ByteBuf> list, SortedMap<Long, Position> sortedMap) throws ManagedLedgerException, InterruptedException {
        ManagedCursor newNonDurableCursor = managedLedger.newNonDurableCursor(PositionImpl.earliest);
        Assert.assertNotNull(newNonDurableCursor);
        Iterator<Map.Entry<Long, Position>> it = sortedMap.entrySet().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(list.remove(getEntry(newNonDurableCursor, it.next().getValue()).getDataBuffer()));
        }
    }

    private Entry getEntry(ManagedCursor managedCursor, Position position) throws ManagedLedgerException, InterruptedException {
        Assert.assertNotNull(managedCursor);
        managedCursor.seek(position);
        List readEntries = managedCursor.readEntries(1);
        Assert.assertEquals(readEntries.size(), 1);
        return (Entry) readEntries.get(0);
    }

    @Test
    public void testNoDeduplicateMessage() throws ManagedLedgerException, InterruptedException, BrokerServiceException.NamingException, ExecutionException {
        ManagedLedger open = this.factory.open("test_deduplicate");
        PersistentTransactionBuffer persistentTransactionBuffer = new PersistentTransactionBuffer("persistent://prop/use/ns-abc/successTopic_txn", open, this.brokerService);
        TxnID txnID = new TxnID(1234L, 5678L);
        List<ByteBuf> appendEntries = appendEntries(persistentTransactionBuffer, txnID, 10, 0L);
        TransactionMetaImpl transactionMetaImpl = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(transactionMetaImpl.id(), txnID);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.OPEN);
        Assert.assertEquals(transactionMetaImpl.numEntries(), appendEntries.size());
        verifyEntries(open, appendEntries, transactionMetaImpl.getEntries());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            long j = i;
            ByteBuf copiedBuffer = Unpooled.copiedBuffer("message-deduplicate-" + j, StandardCharsets.UTF_8);
            persistentTransactionBuffer.appendBufferToTxn(txnID, j, copiedBuffer);
            arrayList.add(copiedBuffer);
        }
        TransactionMetaImpl transactionMetaImpl2 = (TransactionMetaImpl) persistentTransactionBuffer.getTransactionMeta(txnID).get();
        Assert.assertEquals(transactionMetaImpl2.id(), txnID);
        Assert.assertEquals(transactionMetaImpl2.numEntries(), 10);
        Assert.assertEquals(transactionMetaImpl.status(), TxnStatus.OPEN);
        List readEntries = open.newNonDurableCursor(PositionImpl.earliest).readEntries(100);
        List<ByteBuf> list = (List) readEntries.stream().map(entry -> {
            return entry.getDataBuffer();
        }).collect(Collectors.toList());
        Assert.assertEquals(readEntries.size(), 10);
        verifyEntries(open, list, transactionMetaImpl2.getEntries());
    }

    private void verifyTxnNotExist(TxnID txnID) throws Exception {
        try {
            this.buffer.getTransactionMeta(txnID).get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
        }
    }

    private List<ByteBuf> appendEntries(PersistentTransactionBuffer persistentTransactionBuffer, TxnID txnID, int i, long j) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            long j2 = j + i2;
            persistentTransactionBuffer.appendBufferToTxn(txnID, j2, Unpooled.copiedBuffer("message-" + j2, StandardCharsets.UTF_8)).join();
            arrayList.add(Unpooled.copiedBuffer("message-" + j2, StandardCharsets.UTF_8));
        }
        return arrayList;
    }

    private void verifyAndReleaseEntries(List<TransactionEntry> list, TxnID txnID, long j, int i) {
        Assert.assertEquals(list.size(), i);
        for (int i2 = 0; i2 < i; i2++) {
            TransactionEntry transactionEntry = list.get(i2);
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals(transactionEntry.committedAtLedgerId(), 22L);
                    Assert.assertEquals(transactionEntry.committedAtEntryId(), 33L);
                    Assert.assertEquals(transactionEntry.txnId(), txnID);
                    Assert.assertEquals(transactionEntry.sequenceId(), j + i2);
                    Assert.assertEquals(new String(ByteBufUtil.getBytes(transactionEntry.getEntryBuffer()), StandardCharsets.UTF_8), "message-" + i2);
                    if (transactionEntry != null) {
                        if (0 != 0) {
                            try {
                                transactionEntry.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            transactionEntry.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (transactionEntry != null) {
                    if (th != null) {
                        try {
                            transactionEntry.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        transactionEntry.close();
                    }
                }
                throw th3;
            }
        }
    }
}
