package org.apache.pulsar.broker.transaction;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.ClientCnxTest;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.class */
public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TopicTransactionBufferRecoverTest.class);
    private static final String RECOVER_COMMIT = "tnx/ns1/recover-commit";
    private static final String RECOVER_ABORT = "tnx/ns1/recover-abort";
    private static final String SUBSCRIPTION_NAME = "test-recover";
    private static final String TAKE_SNAPSHOT = "tnx/ns1/take-snapshot";
    private static final String ABORT_DELETE = "tnx/ns1/abort-delete";
    private static final int NUM_PARTITIONS = 16;

    @BeforeMethod
    protected void setup() throws Exception {
        Integer num = 10000;
        this.conf.getProperties().setProperty("brokerClient_operationTimeoutMs", num.toString());
        setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
        this.admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
        this.admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        if (this.pulsarClient != null) {
            this.pulsarClient.shutdown();
            this.pulsarClient = null;
        }
        super.internalCleanup();
    }

    @DataProvider(name = "testTopic")
    public Object[] testTopic() {
        return new Object[]{RECOVER_ABORT, RECOVER_COMMIT};
    }

    @Test(dataProvider = "testTopic")
    private void recoverTest(String str) throws Exception {
        PulsarClient pulsarClient = this.pulsarClient;
        Transaction transaction = (Transaction) pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Transaction transaction2 = (Transaction) pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
        Consumer subscribe = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
        try {
            Producer create = pulsarClient.newProducer(Schema.STRING).topic(str).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            for (int i = 0; i < 10; i++) {
                try {
                    String str2 = "Hello Txn - " + i;
                    if (i % 2 == 0) {
                        log.info("Txn1 send message : {}, messageId : {}", str2, create.newMessage(transaction).value(str2).send());
                    } else {
                        log.info("Txn2 send message : {}, messageId : {}", str2, create.newMessage(transaction2).value(str2).send());
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            transaction.commit();
            Message receive = subscribe.receive(2, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            log.info("Txn1 commit receive message : {}, messageId : {}", receive.getValue(), receive.getMessageId());
            subscribe.acknowledge(receive);
            Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            this.admin.topics().unload(str);
            Awaitility.await().until(() -> {
                for (int i2 = 0; i2 < getPulsarServiceList().size(); i2++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i2).getBrokerService())).get("persistent://" + str);
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentTopic persistentTopic = (PersistentTopic) optional.get();
                            Field declaredField2 = PersistentTopic.class.getDeclaredField("transactionBuffer");
                            declaredField2.setAccessible(true);
                            return ((TopicTransactionBuffer) declaredField2.get(persistentTopic)).checkIfReady();
                        }
                    }
                }
                return false;
            });
            if (str.equals(RECOVER_COMMIT)) {
                transaction2.commit().get();
                for (int i2 = 10; i2 > 1; i2--) {
                    Message receive2 = subscribe.receive();
                    log.info("Txn2 commit receive message : {}, messageId : {}", receive2.getValue(), receive2.getMessageId());
                    subscribe.acknowledge(receive2);
                }
                Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            } else {
                transaction2.abort().get();
                for (int i3 = 10 / 2; i3 > 1; i3--) {
                    Message receive3 = subscribe.receive();
                    log.info("Txn2 commit receive message : {}, messageId : {}", receive3.getValue(), receive3.getMessageId());
                    subscribe.acknowledge(receive3);
                }
                Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
            }
            subscribe.close();
            create.close();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception {
        SystemTopicClient.Reader reader = (SystemTopicClient.Reader) Mockito.mock(SystemTopicClient.Reader.class);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        AtomicBoolean atomicBoolean3 = new AtomicBoolean();
        ((SystemTopicClient.Reader) Mockito.doAnswer(invocationOnMock -> {
            return atomicBoolean.compareAndSet(false, true);
        }).when(reader)).hasMoreEvents();
        ((SystemTopicClient.Reader) Mockito.doAnswer(invocationOnMock2 -> {
            if (!atomicBoolean2.compareAndSet(false, true)) {
                return null;
            }
            Thread.sleep(3600000L);
            return null;
        }).when(reader)).readNext();
        ((SystemTopicClient.Reader) Mockito.doAnswer(invocationOnMock3 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            new Thread(() -> {
                if (!atomicBoolean3.compareAndSet(false, true)) {
                    completableFuture.complete(null);
                } else {
                    try {
                        Thread.sleep(3600000L);
                    } catch (InterruptedException e) {
                    }
                    completableFuture.complete(null);
                }
            }).start();
            return completableFuture;
        }).when(reader)).readNextAsync();
        Mockito.when(reader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
        for (PulsarService pulsarService : this.pulsarServiceList) {
            TransactionBufferSnapshotService transactionBufferSnapshotService = (TransactionBufferSnapshotService) Mockito.spy(pulsarService.getTransactionBufferSnapshotService());
            ((TransactionBufferSnapshotService) Mockito.doAnswer(invocationOnMock4 -> {
                return CompletableFuture.completedFuture(reader);
            }).when(transactionBufferSnapshotService)).createReader(topicName);
            Field declaredField = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
            declaredField.setAccessible(true);
            declaredField.set(pulsarService, transactionBufferSnapshotService);
        }
    }

    @Test(timeOut = 60000)
    public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
        String format = String.format("persistent://%s/%s", ClientCnxTest.NAMESPACE, "tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_"));
        makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(format));
        this.pulsarClient.newProducer(Schema.STRING).topic(format).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).batchingMaxMessages(2).create().close();
        this.admin.topics().delete(format, false);
    }

    @Test
    private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(TAKE_SNAPSHOT).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            TransactionImpl transactionImpl = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(30L, TimeUnit.SECONDS).build().get();
            Reader create2 = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).startMessageId(MessageId.earliest).topic("tnx/ns1/__transaction_buffer_snapshot").create();
            MessageId send = create.newMessage(transaction).value("test").send();
            transaction.commit().get();
            Awaitility.await().untilAsserted(() -> {
                TransactionBufferSnapshot transactionBufferSnapshot = (TransactionBufferSnapshot) create2.readNext().getValue();
                Assert.assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), -1L);
                Assert.assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), ((MessageIdImpl) send).getLedgerId());
                TransactionBufferSnapshot transactionBufferSnapshot2 = (TransactionBufferSnapshot) create2.readNext().getValue();
                Assert.assertEquals(transactionBufferSnapshot2.getMaxReadPositionEntryId(), ((MessageIdImpl) send).getEntryId() + 1);
                Assert.assertEquals(transactionBufferSnapshot2.getMaxReadPositionLedgerId(), ((MessageIdImpl) send).getLedgerId());
                Assert.assertFalse(create2.hasMessageAvailable());
            });
            MessageIdImpl send2 = create.newMessage(transaction2).value("test").send();
            transaction2.commit().get();
            TransactionBufferSnapshot transactionBufferSnapshot = (TransactionBufferSnapshot) create2.readNext().getValue();
            Assert.assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), send2.getEntryId() + 1);
            Assert.assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), send2.getLedgerId());
            Assert.assertEquals(transactionBufferSnapshot.getAborts().size(), 0);
            Assert.assertFalse(create2.hasMessageAvailable());
            MessageIdImpl send3 = create.newMessage(transactionImpl).value("test").send();
            transactionImpl.abort().get();
            TransactionBufferSnapshot transactionBufferSnapshot2 = (TransactionBufferSnapshot) create2.readNext().getValue();
            Assert.assertEquals(transactionBufferSnapshot2.getMaxReadPositionEntryId(), send3.getEntryId() + 1);
            Assert.assertEquals(transactionBufferSnapshot2.getMaxReadPositionLedgerId(), send3.getLedgerId());
            Assert.assertEquals(transactionBufferSnapshot2.getAborts().size(), 1);
            Assert.assertEquals(((AbortTxnMetadata) transactionBufferSnapshot2.getAborts().get(0)).getTxnIdLeastBits(), transactionImpl.getTxnIdLeastBits());
            Assert.assertEquals(((AbortTxnMetadata) transactionBufferSnapshot2.getAborts().get(0)).getTxnIdMostBits(), transactionImpl.getTxnIdMostBits());
            Assert.assertFalse(create2.hasMessageAvailable());
            create2.close();
            create.close();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    private void testTopicTransactionBufferDeleteAbort() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(ABORT_DELETE).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{ABORT_DELETE}).subscriptionName(SUBSCRIPTION_NAME).subscribe();
            try {
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                MessageIdImpl send = create.newMessage(transaction).value("Hello Pulsar!").send();
                transaction.abort().get();
                this.admin.topics().unload(ABORT_DELETE);
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                create.newMessage(transaction2).value("Hello").send();
                transaction2.commit().get();
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                System.out.println("consumer receive message" + receive.getMessageId());
                Assert.assertNotNull(receive.getValue(), "Hello");
                subscribe.acknowledge(receive);
                Transaction transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                MessageIdImpl send2 = create.newMessage(transaction3).value("Hello").send();
                transaction3.abort().get();
                Assert.assertTrue(send2.getLedgerId() != send.getLedgerId());
                boolean z = false;
                for (int i = 0; i < getPulsarServiceList().size(); i++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i).getBrokerService())).get("persistent://tnx/ns1/abort-delete");
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentTopic persistentTopic = (PersistentTopic) optional.get();
                            Field declaredField2 = ManagedLedgerImpl.class.getDeclaredField("ledgers");
                            declaredField2.setAccessible(true);
                            ((NavigableMap) declaredField2.get(persistentTopic.getManagedLedger())).remove(Long.valueOf(send.getLedgerId()));
                            Transaction transaction4 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                            create.newMessage(transaction4).value("Hello").send();
                            transaction4.commit().get();
                            Field declaredField3 = PersistentTopic.class.getDeclaredField("transactionBuffer");
                            declaredField3.setAccessible(true);
                            TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) declaredField3.get(persistentTopic);
                            Field declaredField4 = TopicTransactionBuffer.class.getDeclaredField("aborts");
                            declaredField4.setAccessible(true);
                            LinkedMap linkedMap = (LinkedMap) declaredField4.get(topicTransactionBuffer);
                            Assert.assertEquals(linkedMap.size(), 1);
                            Assert.assertEquals(((PositionImpl) linkedMap.get(linkedMap.firstKey())).getLedgerId(), receive.getMessageId().getLedgerId());
                            z = true;
                        }
                    }
                }
                Assert.assertTrue(z);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void clearTransactionBufferSnapshotTest() throws Exception {
        String str = "tnx/ns1/tb-snapshot-delete-" + RandomUtils.nextInt();
        Producer create = this.pulsarClient.newProducer().topic(str).sendTimeout(0, TimeUnit.SECONDS).create();
        Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        create.newMessage(transaction).value("test".getBytes()).sendAsync();
        create.newMessage(transaction).value("test".getBytes()).sendAsync();
        transaction.commit().get();
        create.close();
        TopicTransactionBuffer transactionBuffer = ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get(str).toString(), false).get()).get()).getTransactionBuffer();
        Method declaredMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot", new Class[0]);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(transactionBuffer, new Object[0]);
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(TopicName.get(str).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(systemTopicName.toString(), false).get()).get();
        Field declaredField = PersistentTopic.class.getDeclaredField("currentCompaction");
        declaredField.setAccessible(true);
        checkSnapshotCount(systemTopicName, true, persistentTopic, declaredField);
        this.admin.topics().delete(str, true);
        checkSnapshotCount(systemTopicName, false, persistentTopic, declaredField);
    }

    private void checkSnapshotCount(TopicName topicName, boolean z, PersistentTopic persistentTopic, Field field) throws Exception {
        persistentTopic.triggerCompaction();
        CompletableFuture completableFuture = (CompletableFuture) field.get(persistentTopic);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(completableFuture.isDone());
        });
        Reader create = this.pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)).readCompacted(true).startMessageId(MessageId.earliest).startMessageIdInclusive().topic(topicName.toString()).create();
        int i = 0;
        while (create.readNext(2, TimeUnit.SECONDS) != null) {
            i++;
        }
        Assert.assertTrue(z ? i > 0 : i == 0);
        create.close();
    }

    @Test(timeOut = 30000)
    public void testTransactionBufferRecoverThrowException() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("tnx/ns1/testTransactionBufferRecoverThrowPulsarClientException").sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            create.newMessage(transaction).value("test".getBytes()).sendAsync();
            create.newMessage(transaction).value("test".getBytes()).sendAsync();
            transaction.commit().get();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get("tnx/ns1/testTransactionBufferRecoverThrowPulsarClientException").toString(), false).get()).get();
            TransactionBufferSnapshotService transactionBufferSnapshotService = (TransactionBufferSnapshotService) Mockito.mock(TransactionBufferSnapshotService.class);
            SystemTopicClient.Reader reader = (SystemTopicClient.Reader) Mockito.mock(SystemTopicClient.Reader.class);
            SystemTopicClient.Writer writer = (SystemTopicClient.Writer) Mockito.mock(SystemTopicClient.Writer.class);
            ((TransactionBufferSnapshotService) Mockito.doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService)).createReader((TopicName) ArgumentMatchers.any());
            ((TransactionBufferSnapshotService) Mockito.doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService)).createWriter((TopicName) ArgumentMatchers.any());
            ((SystemTopicClient.Reader) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(reader)).closeAsync();
            ((SystemTopicClient.Writer) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(writer)).closeAsync();
            Field declaredField = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
            declaredField.setAccessible(true);
            TransactionBufferSnapshotService transactionBufferSnapshotService2 = (TransactionBufferSnapshotService) declaredField.get(getPulsarServiceList().get(0));
            ((SystemTopicClient.Reader) Mockito.doThrow(new Throwable[]{new RuntimeException("test")}).when(reader)).hasMoreEvents();
            checkCloseTopic(this.pulsarClient, transactionBufferSnapshotService2, transactionBufferSnapshotService, persistentTopic, declaredField, create);
            ((SystemTopicClient.Reader) Mockito.doReturn(true).when(reader)).hasMoreEvents();
            ((SystemTopicClient.Reader) Mockito.doThrow(new Throwable[]{new PulsarClientException("test")}).when(reader)).hasMoreEvents();
            checkCloseTopic(this.pulsarClient, transactionBufferSnapshotService2, transactionBufferSnapshotService, persistentTopic, declaredField, create);
            ((SystemTopicClient.Reader) Mockito.doReturn(true).when(reader)).hasMoreEvents();
            ((TransactionBufferSnapshotService) Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))).when(transactionBufferSnapshotService)).createReader((TopicName) ArgumentMatchers.any());
            checkCloseTopic(this.pulsarClient, transactionBufferSnapshotService2, transactionBufferSnapshotService, (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get("tnx/ns1/testTransactionBufferRecoverThrowPulsarClientException").toString(), false).get()).get(), declaredField, create);
            ((TransactionBufferSnapshotService) Mockito.doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService)).createReader((TopicName) ArgumentMatchers.any());
            PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get("tnx/ns1/testTransactionBufferRecoverThrowPulsarClientException").toString(), false).get()).get();
            ((TransactionBufferSnapshotService) Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))).when(transactionBufferSnapshotService)).createWriter((TopicName) ArgumentMatchers.any());
            checkCloseTopic(this.pulsarClient, transactionBufferSnapshotService2, transactionBufferSnapshotService, persistentTopic2, declaredField, create);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void checkCloseTopic(PulsarClient pulsarClient, TransactionBufferSnapshotService transactionBufferSnapshotService, TransactionBufferSnapshotService transactionBufferSnapshotService2, PersistentTopic persistentTopic, Field field, Producer<byte[]> producer) throws Exception {
        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService2);
        new TopicTransactionBuffer(persistentTopic);
        Awaitility.await().untilAsserted(() -> {
            Field declaredField = AbstractTopic.class.getDeclaredField("isFenced");
            declaredField.setAccessible(true);
            Assert.assertTrue(((Boolean) declaredField.get(persistentTopic)).booleanValue());
        });
        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
        Transaction transaction = (Transaction) pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        producer.newMessage(transaction).value("test".getBytes()).send();
        transaction.commit().get();
    }

    @Test
    public void testTransactionBufferNoSnapshotCloseReader() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer").topic("tnx/ns1/test").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            this.admin.topics().unload("tnx/ns1/test");
            create.send("test");
            TopicStats stats = this.admin.topics().getStats("tnx/ns1/__transaction_buffer_snapshot");
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            Assert.assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
